feat(watchdog): hardware error monitor + unified component status store

- Add platform/error_patterns.go: pluggable table of kernel log patterns
  (NVIDIA/GPU, PCIe AER, storage I/O, MCE, EDAC) — extend by adding one struct
- Add app/component_status_db.go: persistent JSON store (component-status.json)
  keyed by "pcie:BDF", "storage:dev", "cpu:all", "memory:all"; OK never
  downgrades Warning or Critical
- Add webui/kmsg_watcher.go: goroutine reads /dev/kmsg during SAT tasks,
  writes Warning to DB for matched hardware errors
- Fix task status: overall_status=FAILED in summary.txt now marks task failed
- Audit routine overlays component DB statuses into bee-audit.json on every read

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mikhail Chusavitin
2026-04-02 19:20:59 +03:00
parent 99cece524c
commit fd722692a4
8 changed files with 774 additions and 13 deletions

View File

@@ -40,6 +40,8 @@ type App struct {
sat satRunner
runtime runtimeChecker
installer installer
// StatusDB is the unified component health store (nil if unavailable).
StatusDB *ComponentStatusDB
}
type ActionResult struct {
@@ -136,7 +138,7 @@ type runtimeChecker interface {
}
func New(platform *platform.System) *App {
return &App{
a := &App{
network: platform,
services: platform,
exports: platform,
@@ -145,6 +147,10 @@ func New(platform *platform.System) *App {
runtime: platform,
installer: platform,
}
if db, err := OpenComponentStatusDB(DefaultExportDir + "/component-status.json"); err == nil {
a.StatusDB = db
}
return a
}
// ApplySATOverlay parses a raw audit JSON, overlays the latest SAT results,
@@ -154,7 +160,7 @@ func ApplySATOverlay(auditJSON []byte) ([]byte, error) {
if err != nil {
return nil, err
}
applyLatestSATStatuses(&snap.Hardware, DefaultSATBaseDir)
applyLatestSATStatuses(&snap.Hardware, DefaultSATBaseDir, nil)
return json.MarshalIndent(snap, "", " ")
}
@@ -174,7 +180,7 @@ func (a *App) RunAudit(runtimeMode runtimeenv.Mode, output string) (string, erro
}
}
result := collector.Run(runtimeMode)
applyLatestSATStatuses(&result.Hardware, DefaultSATBaseDir)
applyLatestSATStatuses(&result.Hardware, DefaultSATBaseDir, a.StatusDB)
if health, err := ReadRuntimeHealth(DefaultRuntimeJSONPath); err == nil {
result.Runtime = &health
}

View File

@@ -0,0 +1,266 @@
package app
import (
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"
"time"
)
// ComponentStatusDB is a persistent, append-only store of hardware component health records.
// Records are keyed by component identity strings (e.g. "pcie:0000:c8:00.0", "storage:nvme0n1").
// Once a component is marked Warning or Critical, subsequent OK entries do not downgrade it —
// the component stays at the highest observed severity until explicitly reset.
type ComponentStatusDB struct {
path string
mu sync.Mutex
records map[string]*ComponentStatusRecord
}
// ComponentStatusRecord holds the current and historical health of one hardware component.
type ComponentStatusRecord struct {
ComponentKey string `json:"component_key"`
Status string `json:"status"` // "OK", "Warning", "Critical", "Unknown"
LastCheckedAt time.Time `json:"last_checked_at"`
LastChangedAt time.Time `json:"last_changed_at"`
ErrorSummary string `json:"error_summary,omitempty"`
History []ComponentStatusEntry `json:"history"`
}
// ComponentStatusEntry is one observation written to a component's history.
type ComponentStatusEntry struct {
At time.Time `json:"at"`
Status string `json:"status"`
Source string `json:"source"` // e.g. "sat:nvidia", "sat:memory", "watchdog:kmsg"
Detail string `json:"detail,omitempty"`
}
// OpenComponentStatusDB opens (or creates) the JSON status DB at path.
func OpenComponentStatusDB(path string) (*ComponentStatusDB, error) {
db := &ComponentStatusDB{
path: path,
records: make(map[string]*ComponentStatusRecord),
}
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
data, err := os.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
if len(data) > 0 {
var records []ComponentStatusRecord
if err := json.Unmarshal(data, &records); err == nil {
for i := range records {
db.records[records[i].ComponentKey] = &records[i]
}
}
}
return db, nil
}
// Record writes one observation for the given component key.
// source is a short label like "sat:nvidia" or "watchdog:kmsg".
// status is "OK", "Warning", "Critical", or "Unknown".
// OK never downgrades an existing Warning or Critical status.
func (db *ComponentStatusDB) Record(key, source, status, detail string) {
if db == nil || strings.TrimSpace(key) == "" {
return
}
db.mu.Lock()
defer db.mu.Unlock()
now := time.Now().UTC()
rec, exists := db.records[key]
if !exists {
rec = &ComponentStatusRecord{ComponentKey: key}
db.records[key] = rec
}
rec.LastCheckedAt = now
entry := ComponentStatusEntry{At: now, Status: status, Source: source, Detail: detail}
rec.History = append(rec.History, entry)
// Status merge: OK never downgrades Warning/Critical.
newSev := componentSeverity(status)
curSev := componentSeverity(rec.Status)
if newSev > curSev {
rec.Status = status
rec.LastChangedAt = now
rec.ErrorSummary = detail
} else if rec.Status == "" {
rec.Status = status
rec.LastChangedAt = now
}
_ = db.saveLocked()
}
// Get returns the current record for a component key.
func (db *ComponentStatusDB) Get(key string) (ComponentStatusRecord, bool) {
if db == nil {
return ComponentStatusRecord{}, false
}
db.mu.Lock()
defer db.mu.Unlock()
r, ok := db.records[key]
if !ok {
return ComponentStatusRecord{}, false
}
return *r, true
}
// All returns a snapshot of all records.
func (db *ComponentStatusDB) All() []ComponentStatusRecord {
if db == nil {
return nil
}
db.mu.Lock()
defer db.mu.Unlock()
out := make([]ComponentStatusRecord, 0, len(db.records))
for _, r := range db.records {
out = append(out, *r)
}
return out
}
func (db *ComponentStatusDB) saveLocked() error {
records := make([]ComponentStatusRecord, 0, len(db.records))
for _, r := range db.records {
records = append(records, *r)
}
data, err := json.MarshalIndent(records, "", " ")
if err != nil {
return err
}
return os.WriteFile(db.path, data, 0644)
}
// componentSeverity returns a numeric severity so higher values win.
func componentSeverity(status string) int {
switch strings.TrimSpace(status) {
case "Critical":
return 3
case "Warning":
return 2
case "OK":
return 1
default:
return 0
}
}
// ApplySATResultToDB reads a SAT summary.txt from the run directory next to archivePath
// and writes component status records to db for the given SAT target.
// archivePath may be either a bare .tar.gz path or "Archive written to /path/foo.tar.gz".
func ApplySATResultToDB(db *ComponentStatusDB, target, archivePath string) {
if db == nil || strings.TrimSpace(archivePath) == "" {
return
}
archivePath = extractArchivePath(archivePath)
if archivePath == "" {
return
}
runDir := strings.TrimSuffix(archivePath, ".tar.gz")
data, err := os.ReadFile(filepath.Join(runDir, "summary.txt"))
if err != nil {
return
}
kv := parseSATKV(string(data))
overall := strings.ToUpper(strings.TrimSpace(kv["overall_status"]))
if overall == "" {
return
}
source := "sat:" + target
dbStatus := satStatusToDBStatus(overall)
// Map SAT target to component keys.
switch target {
case "nvidia", "amd", "nvidia-stress", "amd-stress", "amd-mem", "amd-bandwidth":
db.Record("pcie:gpu:"+target, source, dbStatus, target+" SAT: "+overall)
case "memory", "memory-stress", "sat-stress":
db.Record("memory:all", source, dbStatus, target+" SAT: "+overall)
case "cpu", "platform-stress":
db.Record("cpu:all", source, dbStatus, target+" SAT: "+overall)
case "storage":
// Try to record per-device if available in summary.
recordedAny := false
for key, val := range kv {
if !strings.HasSuffix(key, "_status") || key == "overall_status" {
continue
}
base := strings.TrimSuffix(key, "_status")
idx := strings.Index(base, "_")
if idx <= 0 {
continue
}
devName := base[:idx]
devStatus := satStatusToDBStatus(strings.ToUpper(strings.TrimSpace(val)))
db.Record("storage:"+devName, source, devStatus, "storage SAT: "+val)
recordedAny = true
}
if !recordedAny {
db.Record("storage:all", source, dbStatus, "storage SAT: "+overall)
}
}
}
func satStatusToDBStatus(overall string) string {
switch overall {
case "OK":
return "OK"
case "FAILED":
return "Warning"
case "PARTIAL", "UNSUPPORTED":
return "Unknown"
default:
return "Unknown"
}
}
// ExtractArchivePath extracts a bare .tar.gz path from a string that may be
// "Archive written to /path/foo.tar.gz" or already a bare path.
func ExtractArchivePath(s string) string {
return extractArchivePath(s)
}
// ReadSATOverallStatus reads the overall_status value from the summary.txt
// file located in the run directory alongside archivePath.
// Returns "" if the file cannot be read.
func ReadSATOverallStatus(archivePath string) string {
if strings.TrimSpace(archivePath) == "" {
return ""
}
runDir := strings.TrimSuffix(archivePath, ".tar.gz")
data, err := os.ReadFile(filepath.Join(runDir, "summary.txt"))
if err != nil {
return ""
}
kv := parseSATKV(string(data))
return strings.ToUpper(strings.TrimSpace(kv["overall_status"]))
}
func extractArchivePath(s string) string {
s = strings.TrimSpace(s)
if strings.HasSuffix(s, ".tar.gz") {
parts := strings.Fields(s)
if len(parts) > 0 {
return parts[len(parts)-1]
}
}
return s
}
func parseSATKV(raw string) map[string]string {
kv := make(map[string]string)
for _, line := range strings.Split(raw, "\n") {
k, v, ok := strings.Cut(strings.TrimSpace(line), "=")
if ok {
kv[strings.TrimSpace(k)] = strings.TrimSpace(v)
}
}
return kv
}

View File

@@ -9,7 +9,7 @@ import (
"bee/audit/internal/schema"
)
func applyLatestSATStatuses(snap *schema.HardwareSnapshot, baseDir string) {
func applyLatestSATStatuses(snap *schema.HardwareSnapshot, baseDir string, db *ComponentStatusDB) {
if snap == nil || strings.TrimSpace(baseDir) == "" {
return
}
@@ -28,6 +28,8 @@ func applyLatestSATStatuses(snap *schema.HardwareSnapshot, baseDir string) {
if summary, ok := loadLatestSATSummary(baseDir, "storage-"); ok {
applyStorageSAT(snap.Storage, summary)
}
// Apply unified component status DB — overlaid last so it can only upgrade severity.
applyComponentStatusDB(snap, db)
}
type satSummary struct {
@@ -206,6 +208,86 @@ func matchesGPUVendor(dev schema.HardwarePCIeDevice, vendor string) bool {
}
}
func applyComponentStatusDB(snap *schema.HardwareSnapshot, db *ComponentStatusDB) {
if snap == nil || db == nil {
return
}
for _, rec := range db.All() {
key := rec.ComponentKey
status := dbStatusToSATStatus(rec.Status)
if status == "" {
continue
}
detail := rec.ErrorSummary
ts := rec.LastChangedAt.UTC().Format("2006-01-02T15:04:05Z")
switch {
case strings.HasPrefix(key, "pcie:"):
bdf := strings.TrimPrefix(key, "pcie:")
bdf = strings.TrimPrefix(bdf, "gpu:") // strip sub-type if present
// bdf may be empty (e.g. "pcie:gpu:nvidia") — skip BDF matching
if sanitizeBDFForLookup(bdf) == "" {
break
}
normalized := sanitizeBDFForLookup(bdf)
for i := range snap.PCIeDevices {
if snap.PCIeDevices[i].BDF == nil {
continue
}
if sanitizeBDFForLookup(*snap.PCIeDevices[i].BDF) == normalized {
mergeComponentStatus(&snap.PCIeDevices[i].HardwareComponentStatus, ts, status, detail)
}
}
case strings.HasPrefix(key, "storage:"):
devName := strings.TrimPrefix(key, "storage:")
if devName == "all" {
for i := range snap.Storage {
mergeComponentStatus(&snap.Storage[i].HardwareComponentStatus, ts, status, detail)
}
} else {
for i := range snap.Storage {
linuxDev, _ := snap.Storage[i].Telemetry["linux_device"].(string)
if filepath.Base(strings.TrimSpace(linuxDev)) == devName {
mergeComponentStatus(&snap.Storage[i].HardwareComponentStatus, ts, status, detail)
}
}
}
case strings.HasPrefix(key, "memory:"):
for i := range snap.Memory {
mergeComponentStatus(&snap.Memory[i].HardwareComponentStatus, ts, status, detail)
}
case strings.HasPrefix(key, "cpu:"):
for i := range snap.CPUs {
mergeComponentStatus(&snap.CPUs[i].HardwareComponentStatus, ts, status, detail)
}
}
}
}
// dbStatusToSATStatus converts ComponentStatusDB status strings to the format
// expected by mergeComponentStatus (which uses "OK", "Warning", "Critical", "Unknown").
func dbStatusToSATStatus(s string) string {
switch strings.TrimSpace(s) {
case "OK", "Warning", "Critical", "Unknown":
return s
default:
return ""
}
}
// sanitizeBDFForLookup normalises a PCIe BDF address to a canonical lower-case form
// suitable for comparison. "c8:00.0" → "0000:c8:00.0"; already-full BDFs are left as-is.
func sanitizeBDFForLookup(bdf string) string {
bdf = strings.ToLower(strings.TrimSpace(bdf))
if bdf == "" || bdf == "gpu" || strings.ContainsAny(bdf, " \t") {
return ""
}
if strings.Count(bdf, ":") == 1 {
bdf = "0000:" + bdf
}
return bdf
}
func ptrString(v *string) string {
if v == nil {
return ""

View File

@@ -23,7 +23,7 @@ func TestApplyLatestSATStatusesMarksStorageByDevice(t *testing.T) {
usb := schema.HardwareStorage{Telemetry: map[string]any{"linux_device": "/dev/sda"}}
snap := schema.HardwareSnapshot{Storage: []schema.HardwareStorage{nvme, usb}}
applyLatestSATStatuses(&snap, baseDir)
applyLatestSATStatuses(&snap, baseDir, nil)
if snap.Storage[0].Status == nil || *snap.Storage[0].Status != "OK" {
t.Fatalf("nvme status=%v want OK", snap.Storage[0].Status)
@@ -53,7 +53,7 @@ func TestApplyLatestSATStatusesMarksAMDGPUs(t *testing.T) {
}},
}
applyLatestSATStatuses(&snap, baseDir)
applyLatestSATStatuses(&snap, baseDir, nil)
if snap.PCIeDevices[0].Status == nil || *snap.PCIeDevices[0].Status != "Critical" {
t.Fatalf("gpu status=%v want Critical", snap.PCIeDevices[0].Status)