package webui import ( "bufio" "io" "log/slog" "os" "strings" "sync" "time" "bee/audit/internal/app" "bee/audit/internal/platform" ) // kmsgWatcher reads /dev/kmsg and accumulates hardware error events. // It supports multiple concurrent SAT tasks: a shared event window is open // while any SAT task is running, and flushed when all tasks complete. type kmsgWatcher struct { mu sync.Mutex activeCount int // number of in-flight SAT tasks window *kmsgWindow statusDB *app.ComponentStatusDB } type kmsgWindow struct { targets []string // SAT targets running concurrently startedAt time.Time seen map[kmsgEventKey]bool events []kmsgEvent } type kmsgEventKey struct { id string // BDF or device name category string } type kmsgEvent struct { timestamp time.Time raw string ids []string // BDF addresses or device names extracted category string } func newKmsgWatcher(statusDB *app.ComponentStatusDB) *kmsgWatcher { return &kmsgWatcher{statusDB: statusDB} } // start launches the background kmsg reading goroutine. func (w *kmsgWatcher) start() { goRecoverLoop("kmsg watcher", 5*time.Second, w.run) } func (w *kmsgWatcher) run() { for { f, err := os.Open("/dev/kmsg") if err != nil { slog.Warn("kmsg watcher unavailable", "err", err) time.Sleep(30 * time.Second) continue } // Best-effort seek to end so we only capture events from now forward. _, _ = f.Seek(0, io.SeekEnd) scanner := bufio.NewScanner(f) scanner.Buffer(make([]byte, 64*1024), 64*1024) for scanner.Scan() { line := scanner.Text() evt, ok := parseKmsgLine(line) if !ok { continue } w.mu.Lock() if w.window != nil { w.recordEvent(evt) } w.mu.Unlock() } if err := scanner.Err(); err != nil { slog.Warn("kmsg watcher stopped", "err", err) } _ = f.Close() time.Sleep(2 * time.Second) } } // recordEvent appends evt to the active window, deduplicating by (id, category). // Must be called with w.mu held. func (w *kmsgWatcher) recordEvent(evt kmsgEvent) { if len(evt.ids) == 0 { key := kmsgEventKey{id: "", category: evt.category} if !w.window.seen[key] { w.window.seen[key] = true w.window.events = append(w.window.events, evt) } return } for _, id := range evt.ids { key := kmsgEventKey{id: id, category: evt.category} if !w.window.seen[key] { w.window.seen[key] = true w.window.events = append(w.window.events, evt) } } } // NotifyTaskStarted increments the active task counter and opens a shared event window // if this is the first task starting. func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) { w.mu.Lock() defer w.mu.Unlock() if w.activeCount == 0 { w.window = &kmsgWindow{ startedAt: time.Now(), seen: make(map[kmsgEventKey]bool), } } w.activeCount++ if w.window != nil { w.window.targets = append(w.window.targets, target) } } // NotifyTaskFinished decrements the active task counter. When all tasks finish, // it flushes the accumulated events to the status DB. func (w *kmsgWatcher) NotifyTaskFinished(taskID string) { w.mu.Lock() w.activeCount-- var window *kmsgWindow if w.activeCount <= 0 { w.activeCount = 0 window = w.window w.window = nil } w.mu.Unlock() if window == nil || len(window.events) == 0 { return } goRecoverOnce("kmsg watcher flush", func() { w.flushWindow(window) }) } func (w *kmsgWatcher) flushWindow(window *kmsgWindow) { if w.statusDB == nil { return } source := "watchdog:kmsg" // Collect unique component keys from events. seen := map[string]string{} // componentKey → first raw line for _, evt := range window.events { if len(evt.ids) == 0 { // MCE or un-identified error. key := "cpu:all" if evt.category == "memory" { key = "memory:all" } if _, exists := seen[key]; !exists { seen[key] = evt.raw } continue } for _, id := range evt.ids { var key string switch evt.category { case "gpu", "pcie": key = "pcie:" + normalizeBDF(id) case "storage": key = "storage:" + id default: key = "pcie:" + normalizeBDF(id) } if _, exists := seen[key]; !exists { seen[key] = evt.raw } } } for key, detail := range seen { detail = "kernel error during SAT (" + strings.Join(window.targets, ",") + "): " + truncate(detail, 120) w.statusDB.Record(key, source, "Warning", detail) } } // parseKmsgLine parses a single /dev/kmsg line and returns an event if it matches // any pattern in platform.HardwareErrorPatterns. // kmsg format: ",,,-;message text" func parseKmsgLine(raw string) (kmsgEvent, bool) { msg := raw if idx := strings.Index(raw, ";"); idx >= 0 { msg = strings.TrimSpace(raw[idx+1:]) } if msg == "" { return kmsgEvent{}, false } for _, p := range platform.HardwareErrorPatterns { m := p.Re.FindStringSubmatch(msg) if m == nil { continue } evt := kmsgEvent{ timestamp: time.Now(), raw: msg, category: p.Category, } if p.BDFGroup > 0 && p.BDFGroup < len(m) { evt.ids = append(evt.ids, normalizeBDF(m[p.BDFGroup])) } if p.DevGroup > 0 && p.DevGroup < len(m) { evt.ids = append(evt.ids, m[p.DevGroup]) } return evt, true } return kmsgEvent{}, false } // normalizeBDF normalizes a PCIe BDF to the 4-part form "0000:c8:00.0". func normalizeBDF(bdf string) string { bdf = strings.ToLower(strings.TrimSpace(bdf)) if strings.Count(bdf, ":") == 1 { return "0000:" + bdf } return bdf } func truncate(s string, max int) string { if len(s) <= max { return s } return s[:max] + "..." } // isSATTarget returns true for task targets that run hardware acceptance tests. func isSATTarget(target string) bool { switch target { case "nvidia", "nvidia-targeted-stress", "nvidia-benchmark", "nvidia-compute", "nvidia-targeted-power", "nvidia-pulse", "nvidia-interconnect", "nvidia-bandwidth", "nvidia-stress", "memory", "memory-stress", "storage", "cpu", "sat-stress", "amd", "amd-mem", "amd-bandwidth", "amd-stress", "platform-stress": return true } return false }