242 lines
5.8 KiB
Go
242 lines
5.8 KiB
Go
package webui
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"bee/audit/internal/app"
|
|
"bee/audit/internal/platform"
|
|
)
|
|
|
|
// kmsgWatcher reads /dev/kmsg and accumulates hardware error events.
|
|
// It supports multiple concurrent SAT tasks: a shared event window is open
|
|
// while any SAT task is running, and flushed when all tasks complete.
|
|
type kmsgWatcher struct {
|
|
mu sync.Mutex
|
|
activeCount int // number of in-flight SAT tasks
|
|
window *kmsgWindow
|
|
statusDB *app.ComponentStatusDB
|
|
}
|
|
|
|
type kmsgWindow struct {
|
|
targets []string // SAT targets running concurrently
|
|
startedAt time.Time
|
|
seen map[kmsgEventKey]bool
|
|
events []kmsgEvent
|
|
}
|
|
|
|
type kmsgEventKey struct {
|
|
id string // BDF or device name
|
|
category string
|
|
}
|
|
|
|
type kmsgEvent struct {
|
|
timestamp time.Time
|
|
raw string
|
|
ids []string // BDF addresses or device names extracted
|
|
category string
|
|
}
|
|
|
|
func newKmsgWatcher(statusDB *app.ComponentStatusDB) *kmsgWatcher {
|
|
return &kmsgWatcher{statusDB: statusDB}
|
|
}
|
|
|
|
// start launches the background kmsg reading goroutine.
|
|
func (w *kmsgWatcher) start() {
|
|
goRecoverLoop("kmsg watcher", 5*time.Second, w.run)
|
|
}
|
|
|
|
func (w *kmsgWatcher) run() {
|
|
for {
|
|
f, err := os.Open("/dev/kmsg")
|
|
if err != nil {
|
|
slog.Warn("kmsg watcher unavailable", "err", err)
|
|
time.Sleep(30 * time.Second)
|
|
continue
|
|
}
|
|
// Best-effort seek to end so we only capture events from now forward.
|
|
_, _ = f.Seek(0, io.SeekEnd)
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
scanner.Buffer(make([]byte, 64*1024), 64*1024)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
evt, ok := parseKmsgLine(line)
|
|
if !ok {
|
|
continue
|
|
}
|
|
w.mu.Lock()
|
|
if w.window != nil {
|
|
w.recordEvent(evt)
|
|
}
|
|
w.mu.Unlock()
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
slog.Warn("kmsg watcher stopped", "err", err)
|
|
}
|
|
_ = f.Close()
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
}
|
|
|
|
// recordEvent appends evt to the active window, deduplicating by (id, category).
|
|
// Must be called with w.mu held.
|
|
func (w *kmsgWatcher) recordEvent(evt kmsgEvent) {
|
|
if len(evt.ids) == 0 {
|
|
key := kmsgEventKey{id: "", category: evt.category}
|
|
if !w.window.seen[key] {
|
|
w.window.seen[key] = true
|
|
w.window.events = append(w.window.events, evt)
|
|
}
|
|
return
|
|
}
|
|
for _, id := range evt.ids {
|
|
key := kmsgEventKey{id: id, category: evt.category}
|
|
if !w.window.seen[key] {
|
|
w.window.seen[key] = true
|
|
w.window.events = append(w.window.events, evt)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NotifyTaskStarted increments the active task counter and opens a shared event window
|
|
// if this is the first task starting.
|
|
func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
if w.activeCount == 0 {
|
|
w.window = &kmsgWindow{
|
|
startedAt: time.Now(),
|
|
seen: make(map[kmsgEventKey]bool),
|
|
}
|
|
}
|
|
w.activeCount++
|
|
if w.window != nil {
|
|
w.window.targets = append(w.window.targets, target)
|
|
}
|
|
}
|
|
|
|
// NotifyTaskFinished decrements the active task counter. When all tasks finish,
|
|
// it flushes the accumulated events to the status DB.
|
|
func (w *kmsgWatcher) NotifyTaskFinished(taskID string) {
|
|
w.mu.Lock()
|
|
w.activeCount--
|
|
var window *kmsgWindow
|
|
if w.activeCount <= 0 {
|
|
w.activeCount = 0
|
|
window = w.window
|
|
w.window = nil
|
|
}
|
|
w.mu.Unlock()
|
|
|
|
if window == nil || len(window.events) == 0 {
|
|
return
|
|
}
|
|
goRecoverOnce("kmsg watcher flush", func() { w.flushWindow(window) })
|
|
}
|
|
|
|
func (w *kmsgWatcher) flushWindow(window *kmsgWindow) {
|
|
if w.statusDB == nil {
|
|
return
|
|
}
|
|
source := "watchdog:kmsg"
|
|
// Collect unique component keys from events.
|
|
seen := map[string]string{} // componentKey → first raw line
|
|
for _, evt := range window.events {
|
|
if len(evt.ids) == 0 {
|
|
// MCE or un-identified error.
|
|
key := "cpu:all"
|
|
if evt.category == "memory" {
|
|
key = "memory:all"
|
|
}
|
|
if _, exists := seen[key]; !exists {
|
|
seen[key] = evt.raw
|
|
}
|
|
continue
|
|
}
|
|
for _, id := range evt.ids {
|
|
var key string
|
|
switch evt.category {
|
|
case "gpu", "pcie":
|
|
key = "pcie:" + normalizeBDF(id)
|
|
case "storage":
|
|
key = "storage:" + id
|
|
default:
|
|
key = "pcie:" + normalizeBDF(id)
|
|
}
|
|
if _, exists := seen[key]; !exists {
|
|
seen[key] = evt.raw
|
|
}
|
|
}
|
|
}
|
|
for key, detail := range seen {
|
|
detail = "kernel error during SAT (" + strings.Join(window.targets, ",") + "): " + truncate(detail, 120)
|
|
w.statusDB.Record(key, source, "Warning", detail)
|
|
}
|
|
}
|
|
|
|
// parseKmsgLine parses a single /dev/kmsg line and returns an event if it matches
|
|
// any pattern in platform.HardwareErrorPatterns.
|
|
// kmsg format: "<priority>,<sequence>,<timestamp_usec>,-;message text"
|
|
func parseKmsgLine(raw string) (kmsgEvent, bool) {
|
|
msg := raw
|
|
if idx := strings.Index(raw, ";"); idx >= 0 {
|
|
msg = strings.TrimSpace(raw[idx+1:])
|
|
}
|
|
if msg == "" {
|
|
return kmsgEvent{}, false
|
|
}
|
|
|
|
for _, p := range platform.HardwareErrorPatterns {
|
|
m := p.Re.FindStringSubmatch(msg)
|
|
if m == nil {
|
|
continue
|
|
}
|
|
evt := kmsgEvent{
|
|
timestamp: time.Now(),
|
|
raw: msg,
|
|
category: p.Category,
|
|
}
|
|
if p.BDFGroup > 0 && p.BDFGroup < len(m) {
|
|
evt.ids = append(evt.ids, normalizeBDF(m[p.BDFGroup]))
|
|
}
|
|
if p.DevGroup > 0 && p.DevGroup < len(m) {
|
|
evt.ids = append(evt.ids, m[p.DevGroup])
|
|
}
|
|
return evt, true
|
|
}
|
|
return kmsgEvent{}, false
|
|
}
|
|
|
|
// normalizeBDF normalizes a PCIe BDF to the 4-part form "0000:c8:00.0".
|
|
func normalizeBDF(bdf string) string {
|
|
bdf = strings.ToLower(strings.TrimSpace(bdf))
|
|
if strings.Count(bdf, ":") == 1 {
|
|
return "0000:" + bdf
|
|
}
|
|
return bdf
|
|
}
|
|
|
|
func truncate(s string, max int) string {
|
|
if len(s) <= max {
|
|
return s
|
|
}
|
|
return s[:max] + "..."
|
|
}
|
|
|
|
// isSATTarget returns true for task targets that run hardware acceptance tests.
|
|
func isSATTarget(target string) bool {
|
|
switch target {
|
|
case "nvidia", "nvidia-benchmark", "nvidia-stress", "memory", "memory-stress", "storage",
|
|
"cpu", "sat-stress", "amd", "amd-mem", "amd-bandwidth", "amd-stress",
|
|
"platform-stress":
|
|
return true
|
|
}
|
|
return false
|
|
}
|