Files
bee/audit/internal/webui/kmsg_watcher.go
Mikhail Chusavitin fd722692a4 feat(watchdog): hardware error monitor + unified component status store
- Add platform/error_patterns.go: pluggable table of kernel log patterns
  (NVIDIA/GPU, PCIe AER, storage I/O, MCE, EDAC) — extend by adding one struct
- Add app/component_status_db.go: persistent JSON store (component-status.json)
  keyed by "pcie:BDF", "storage:dev", "cpu:all", "memory:all"; OK never
  downgrades Warning or Critical
- Add webui/kmsg_watcher.go: goroutine reads /dev/kmsg during SAT tasks,
  writes Warning to DB for matched hardware errors
- Fix task status: overall_status=FAILED in summary.txt now marks task failed
- Audit routine overlays component DB statuses into bee-audit.json on every read

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-02 19:20:59 +03:00

231 lines
5.5 KiB
Go

package webui
import (
"bufio"
"io"
"log/slog"
"os"
"strings"
"sync"
"time"
"bee/audit/internal/app"
"bee/audit/internal/platform"
)
// kmsgWatcher reads /dev/kmsg and accumulates hardware error events.
// During an active SAT task window it records matching lines; on task finish
// it writes Warning status records to the component status DB.
type kmsgWatcher struct {
mu sync.Mutex
activeWindow *kmsgWindow
statusDB *app.ComponentStatusDB
}
type kmsgWindow struct {
taskID string
target string
startedAt time.Time
seen map[kmsgEventKey]bool
events []kmsgEvent
}
type kmsgEventKey struct {
id string // BDF or device name
category string
}
type kmsgEvent struct {
timestamp time.Time
raw string
ids []string // BDF addresses or device names extracted
category string
}
func newKmsgWatcher(statusDB *app.ComponentStatusDB) *kmsgWatcher {
return &kmsgWatcher{statusDB: statusDB}
}
// start launches the background kmsg reading goroutine.
func (w *kmsgWatcher) start() {
go w.run()
}
func (w *kmsgWatcher) run() {
f, err := os.Open("/dev/kmsg")
if err != nil {
slog.Warn("kmsg watcher unavailable", "err", err)
return
}
defer f.Close()
// Best-effort seek to end so we only capture events from now forward.
_, _ = f.Seek(0, io.SeekEnd)
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 64*1024), 64*1024)
for scanner.Scan() {
line := scanner.Text()
evt, ok := parseKmsgLine(line)
if !ok {
continue
}
w.mu.Lock()
if w.activeWindow != nil {
w.recordEvent(evt)
}
w.mu.Unlock()
}
if err := scanner.Err(); err != nil {
slog.Warn("kmsg watcher stopped", "err", err)
}
}
// recordEvent appends evt to the active window, deduplicating by (id, category).
// Must be called with w.mu held.
func (w *kmsgWatcher) recordEvent(evt kmsgEvent) {
if len(evt.ids) == 0 {
// Events without a device ID (e.g. MCE) — deduplicate by category.
key := kmsgEventKey{id: "", category: evt.category}
if !w.activeWindow.seen[key] {
w.activeWindow.seen[key] = true
w.activeWindow.events = append(w.activeWindow.events, evt)
}
return
}
for _, id := range evt.ids {
key := kmsgEventKey{id: id, category: evt.category}
if !w.activeWindow.seen[key] {
w.activeWindow.seen[key] = true
w.activeWindow.events = append(w.activeWindow.events, evt)
}
}
}
// NotifyTaskStarted opens a new event window for the given SAT task.
func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) {
w.mu.Lock()
defer w.mu.Unlock()
w.activeWindow = &kmsgWindow{
taskID: taskID,
target: target,
startedAt: time.Now(),
seen: make(map[kmsgEventKey]bool),
}
}
// NotifyTaskFinished closes the event window and asynchronously writes status records.
func (w *kmsgWatcher) NotifyTaskFinished(taskID string) {
w.mu.Lock()
window := w.activeWindow
if window != nil && window.taskID == taskID {
w.activeWindow = nil
}
w.mu.Unlock()
if window == nil || len(window.events) == 0 {
return
}
go w.flushWindow(window)
}
func (w *kmsgWatcher) flushWindow(window *kmsgWindow) {
if w.statusDB == nil {
return
}
source := "watchdog:kmsg"
// Collect unique component keys from events.
seen := map[string]string{} // componentKey → first raw line
for _, evt := range window.events {
if len(evt.ids) == 0 {
// MCE or un-identified error.
key := "cpu:all"
if evt.category == "memory" {
key = "memory:all"
}
if _, exists := seen[key]; !exists {
seen[key] = evt.raw
}
continue
}
for _, id := range evt.ids {
var key string
switch evt.category {
case "gpu", "pcie":
key = "pcie:" + normalizeBDF(id)
case "storage":
key = "storage:" + id
default:
key = "pcie:" + normalizeBDF(id)
}
if _, exists := seen[key]; !exists {
seen[key] = evt.raw
}
}
}
for key, detail := range seen {
detail = "kernel error during " + window.target + " SAT: " + truncate(detail, 120)
w.statusDB.Record(key, source, "Warning", detail)
}
}
// parseKmsgLine parses a single /dev/kmsg line and returns an event if it matches
// any pattern in platform.HardwareErrorPatterns.
// kmsg format: "<priority>,<sequence>,<timestamp_usec>,-;message text"
func parseKmsgLine(raw string) (kmsgEvent, bool) {
msg := raw
if idx := strings.Index(raw, ";"); idx >= 0 {
msg = strings.TrimSpace(raw[idx+1:])
}
if msg == "" {
return kmsgEvent{}, false
}
for _, p := range platform.HardwareErrorPatterns {
m := p.Re.FindStringSubmatch(msg)
if m == nil {
continue
}
evt := kmsgEvent{
timestamp: time.Now(),
raw: msg,
category: p.Category,
}
if p.BDFGroup > 0 && p.BDFGroup < len(m) {
evt.ids = append(evt.ids, normalizeBDF(m[p.BDFGroup]))
}
if p.DevGroup > 0 && p.DevGroup < len(m) {
evt.ids = append(evt.ids, m[p.DevGroup])
}
return evt, true
}
return kmsgEvent{}, false
}
// normalizeBDF normalizes a PCIe BDF to the 4-part form "0000:c8:00.0".
func normalizeBDF(bdf string) string {
bdf = strings.ToLower(strings.TrimSpace(bdf))
if strings.Count(bdf, ":") == 1 {
return "0000:" + bdf
}
return bdf
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "..."
}
// isSATTarget returns true for task targets that run hardware acceptance tests.
func isSATTarget(target string) bool {
switch target {
case "nvidia", "nvidia-stress", "memory", "memory-stress", "storage",
"cpu", "sat-stress", "amd", "amd-mem", "amd-bandwidth", "amd-stress",
"platform-stress":
return true
}
return false
}