Tasks are now started simultaneously when multiple are enqueued (e.g. Run All). The worker drains all pending tasks at once and launches each in its own goroutine, waiting via WaitGroup. kmsg watcher updated to use a shared event window with a reference counter across concurrent tasks. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
239 lines
5.6 KiB
Go
239 lines
5.6 KiB
Go
package webui
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"bee/audit/internal/app"
|
|
"bee/audit/internal/platform"
|
|
)
|
|
|
|
// kmsgWatcher reads /dev/kmsg and accumulates hardware error events.
|
|
// It supports multiple concurrent SAT tasks: a shared event window is open
|
|
// while any SAT task is running, and flushed when all tasks complete.
|
|
type kmsgWatcher struct {
|
|
mu sync.Mutex
|
|
activeCount int // number of in-flight SAT tasks
|
|
window *kmsgWindow
|
|
statusDB *app.ComponentStatusDB
|
|
}
|
|
|
|
type kmsgWindow struct {
|
|
targets []string // SAT targets running concurrently
|
|
startedAt time.Time
|
|
seen map[kmsgEventKey]bool
|
|
events []kmsgEvent
|
|
}
|
|
|
|
type kmsgEventKey struct {
|
|
id string // BDF or device name
|
|
category string
|
|
}
|
|
|
|
type kmsgEvent struct {
|
|
timestamp time.Time
|
|
raw string
|
|
ids []string // BDF addresses or device names extracted
|
|
category string
|
|
}
|
|
|
|
func newKmsgWatcher(statusDB *app.ComponentStatusDB) *kmsgWatcher {
|
|
return &kmsgWatcher{statusDB: statusDB}
|
|
}
|
|
|
|
// start launches the background kmsg reading goroutine.
|
|
func (w *kmsgWatcher) start() {
|
|
go w.run()
|
|
}
|
|
|
|
func (w *kmsgWatcher) run() {
|
|
f, err := os.Open("/dev/kmsg")
|
|
if err != nil {
|
|
slog.Warn("kmsg watcher unavailable", "err", err)
|
|
return
|
|
}
|
|
defer f.Close()
|
|
|
|
// Best-effort seek to end so we only capture events from now forward.
|
|
_, _ = f.Seek(0, io.SeekEnd)
|
|
|
|
scanner := bufio.NewScanner(f)
|
|
scanner.Buffer(make([]byte, 64*1024), 64*1024)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
evt, ok := parseKmsgLine(line)
|
|
if !ok {
|
|
continue
|
|
}
|
|
w.mu.Lock()
|
|
if w.window != nil {
|
|
w.recordEvent(evt)
|
|
}
|
|
w.mu.Unlock()
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
slog.Warn("kmsg watcher stopped", "err", err)
|
|
}
|
|
}
|
|
|
|
// recordEvent appends evt to the active window, deduplicating by (id, category).
|
|
// Must be called with w.mu held.
|
|
func (w *kmsgWatcher) recordEvent(evt kmsgEvent) {
|
|
if len(evt.ids) == 0 {
|
|
key := kmsgEventKey{id: "", category: evt.category}
|
|
if !w.window.seen[key] {
|
|
w.window.seen[key] = true
|
|
w.window.events = append(w.window.events, evt)
|
|
}
|
|
return
|
|
}
|
|
for _, id := range evt.ids {
|
|
key := kmsgEventKey{id: id, category: evt.category}
|
|
if !w.window.seen[key] {
|
|
w.window.seen[key] = true
|
|
w.window.events = append(w.window.events, evt)
|
|
}
|
|
}
|
|
}
|
|
|
|
// NotifyTaskStarted increments the active task counter and opens a shared event window
|
|
// if this is the first task starting.
|
|
func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
if w.activeCount == 0 {
|
|
w.window = &kmsgWindow{
|
|
startedAt: time.Now(),
|
|
seen: make(map[kmsgEventKey]bool),
|
|
}
|
|
}
|
|
w.activeCount++
|
|
if w.window != nil {
|
|
w.window.targets = append(w.window.targets, target)
|
|
}
|
|
}
|
|
|
|
// NotifyTaskFinished decrements the active task counter. When all tasks finish,
|
|
// it flushes the accumulated events to the status DB.
|
|
func (w *kmsgWatcher) NotifyTaskFinished(taskID string) {
|
|
w.mu.Lock()
|
|
w.activeCount--
|
|
var window *kmsgWindow
|
|
if w.activeCount <= 0 {
|
|
w.activeCount = 0
|
|
window = w.window
|
|
w.window = nil
|
|
}
|
|
w.mu.Unlock()
|
|
|
|
if window == nil || len(window.events) == 0 {
|
|
return
|
|
}
|
|
go w.flushWindow(window)
|
|
}
|
|
|
|
func (w *kmsgWatcher) flushWindow(window *kmsgWindow) {
|
|
if w.statusDB == nil {
|
|
return
|
|
}
|
|
source := "watchdog:kmsg"
|
|
// Collect unique component keys from events.
|
|
seen := map[string]string{} // componentKey → first raw line
|
|
for _, evt := range window.events {
|
|
if len(evt.ids) == 0 {
|
|
// MCE or un-identified error.
|
|
key := "cpu:all"
|
|
if evt.category == "memory" {
|
|
key = "memory:all"
|
|
}
|
|
if _, exists := seen[key]; !exists {
|
|
seen[key] = evt.raw
|
|
}
|
|
continue
|
|
}
|
|
for _, id := range evt.ids {
|
|
var key string
|
|
switch evt.category {
|
|
case "gpu", "pcie":
|
|
key = "pcie:" + normalizeBDF(id)
|
|
case "storage":
|
|
key = "storage:" + id
|
|
default:
|
|
key = "pcie:" + normalizeBDF(id)
|
|
}
|
|
if _, exists := seen[key]; !exists {
|
|
seen[key] = evt.raw
|
|
}
|
|
}
|
|
}
|
|
for key, detail := range seen {
|
|
detail = "kernel error during SAT (" + strings.Join(window.targets, ",") + "): " + truncate(detail, 120)
|
|
w.statusDB.Record(key, source, "Warning", detail)
|
|
}
|
|
}
|
|
|
|
// parseKmsgLine parses a single /dev/kmsg line and returns an event if it matches
|
|
// any pattern in platform.HardwareErrorPatterns.
|
|
// kmsg format: "<priority>,<sequence>,<timestamp_usec>,-;message text"
|
|
func parseKmsgLine(raw string) (kmsgEvent, bool) {
|
|
msg := raw
|
|
if idx := strings.Index(raw, ";"); idx >= 0 {
|
|
msg = strings.TrimSpace(raw[idx+1:])
|
|
}
|
|
if msg == "" {
|
|
return kmsgEvent{}, false
|
|
}
|
|
|
|
for _, p := range platform.HardwareErrorPatterns {
|
|
m := p.Re.FindStringSubmatch(msg)
|
|
if m == nil {
|
|
continue
|
|
}
|
|
evt := kmsgEvent{
|
|
timestamp: time.Now(),
|
|
raw: msg,
|
|
category: p.Category,
|
|
}
|
|
if p.BDFGroup > 0 && p.BDFGroup < len(m) {
|
|
evt.ids = append(evt.ids, normalizeBDF(m[p.BDFGroup]))
|
|
}
|
|
if p.DevGroup > 0 && p.DevGroup < len(m) {
|
|
evt.ids = append(evt.ids, m[p.DevGroup])
|
|
}
|
|
return evt, true
|
|
}
|
|
return kmsgEvent{}, false
|
|
}
|
|
|
|
// normalizeBDF normalizes a PCIe BDF to the 4-part form "0000:c8:00.0".
|
|
func normalizeBDF(bdf string) string {
|
|
bdf = strings.ToLower(strings.TrimSpace(bdf))
|
|
if strings.Count(bdf, ":") == 1 {
|
|
return "0000:" + bdf
|
|
}
|
|
return bdf
|
|
}
|
|
|
|
func truncate(s string, max int) string {
|
|
if len(s) <= max {
|
|
return s
|
|
}
|
|
return s[:max] + "..."
|
|
}
|
|
|
|
// isSATTarget returns true for task targets that run hardware acceptance tests.
|
|
func isSATTarget(target string) bool {
|
|
switch target {
|
|
case "nvidia", "nvidia-stress", "memory", "memory-stress", "storage",
|
|
"cpu", "sat-stress", "amd", "amd-mem", "amd-bandwidth", "amd-stress",
|
|
"platform-stress":
|
|
return true
|
|
}
|
|
return false
|
|
}
|