diff --git a/audit/internal/webui/kmsg_watcher.go b/audit/internal/webui/kmsg_watcher.go index 4e4c41d..0672ddd 100644 --- a/audit/internal/webui/kmsg_watcher.go +++ b/audit/internal/webui/kmsg_watcher.go @@ -14,17 +14,17 @@ import ( ) // kmsgWatcher reads /dev/kmsg and accumulates hardware error events. -// During an active SAT task window it records matching lines; on task finish -// it writes Warning status records to the component status DB. +// It supports multiple concurrent SAT tasks: a shared event window is open +// while any SAT task is running, and flushed when all tasks complete. type kmsgWatcher struct { mu sync.Mutex - activeWindow *kmsgWindow + activeCount int // number of in-flight SAT tasks + window *kmsgWindow statusDB *app.ComponentStatusDB } type kmsgWindow struct { - taskID string - target string + targets []string // SAT targets running concurrently startedAt time.Time seen map[kmsgEventKey]bool events []kmsgEvent @@ -71,7 +71,7 @@ func (w *kmsgWatcher) run() { continue } w.mu.Lock() - if w.activeWindow != nil { + if w.window != nil { w.recordEvent(evt) } w.mu.Unlock() @@ -85,41 +85,49 @@ func (w *kmsgWatcher) run() { // Must be called with w.mu held. func (w *kmsgWatcher) recordEvent(evt kmsgEvent) { if len(evt.ids) == 0 { - // Events without a device ID (e.g. MCE) — deduplicate by category. key := kmsgEventKey{id: "", category: evt.category} - if !w.activeWindow.seen[key] { - w.activeWindow.seen[key] = true - w.activeWindow.events = append(w.activeWindow.events, evt) + if !w.window.seen[key] { + w.window.seen[key] = true + w.window.events = append(w.window.events, evt) } return } for _, id := range evt.ids { key := kmsgEventKey{id: id, category: evt.category} - if !w.activeWindow.seen[key] { - w.activeWindow.seen[key] = true - w.activeWindow.events = append(w.activeWindow.events, evt) + if !w.window.seen[key] { + w.window.seen[key] = true + w.window.events = append(w.window.events, evt) } } } -// NotifyTaskStarted opens a new event window for the given SAT task. +// NotifyTaskStarted increments the active task counter and opens a shared event window +// if this is the first task starting. func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) { w.mu.Lock() defer w.mu.Unlock() - w.activeWindow = &kmsgWindow{ - taskID: taskID, - target: target, - startedAt: time.Now(), - seen: make(map[kmsgEventKey]bool), + if w.activeCount == 0 { + w.window = &kmsgWindow{ + startedAt: time.Now(), + seen: make(map[kmsgEventKey]bool), + } + } + w.activeCount++ + if w.window != nil { + w.window.targets = append(w.window.targets, target) } } -// NotifyTaskFinished closes the event window and asynchronously writes status records. +// NotifyTaskFinished decrements the active task counter. When all tasks finish, +// it flushes the accumulated events to the status DB. func (w *kmsgWatcher) NotifyTaskFinished(taskID string) { w.mu.Lock() - window := w.activeWindow - if window != nil && window.taskID == taskID { - w.activeWindow = nil + w.activeCount-- + var window *kmsgWindow + if w.activeCount <= 0 { + w.activeCount = 0 + window = w.window + w.window = nil } w.mu.Unlock() @@ -164,7 +172,7 @@ func (w *kmsgWatcher) flushWindow(window *kmsgWindow) { } } for key, detail := range seen { - detail = "kernel error during " + window.target + " SAT: " + truncate(detail, 120) + detail = "kernel error during SAT (" + strings.Join(window.targets, ",") + "): " + truncate(detail, 120) w.statusDB.Record(key, source, "Warning", detail) } } diff --git a/audit/internal/webui/tasks.go b/audit/internal/webui/tasks.go index 7745501..039b324 100644 --- a/audit/internal/webui/tasks.go +++ b/audit/internal/webui/tasks.go @@ -393,11 +393,13 @@ func (q *taskQueue) worker() { for { <-q.trigger setCPUGovernor("performance") + + // Drain all pending tasks and start them in parallel. + q.mu.Lock() + var batch []*Task for { - q.mu.Lock() t := q.nextPending() if t == nil { - q.mu.Unlock() break } now := time.Now() @@ -406,37 +408,58 @@ func (q *taskQueue) worker() { t.DoneAt = nil t.ErrMsg = "" j := newTaskJobState(t.LogPath) - ctx, cancel := context.WithCancel(context.Background()) - j.cancel = cancel t.job = j + batch = append(batch, t) + } + if len(batch) > 0 { q.persistLocked() - q.mu.Unlock() + } + q.mu.Unlock() - if q.kmsgWatcher != nil && isSATTarget(t.Target) { - q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target) - } + var wg sync.WaitGroup + for _, t := range batch { + t := t + j := t.job + taskCtx, taskCancel := context.WithCancel(context.Background()) + j.cancel = taskCancel + wg.Add(1) + go func() { + defer wg.Done() - q.runTask(t, j, ctx) - - if q.kmsgWatcher != nil { - q.kmsgWatcher.NotifyTaskFinished(t.ID) - } - - q.mu.Lock() - now2 := time.Now() - t.DoneAt = &now2 - if t.Status == TaskRunning { // not cancelled externally - if j.err != "" { - t.Status = TaskFailed - t.ErrMsg = j.err - } else { - t.Status = TaskDone + if q.kmsgWatcher != nil && isSATTarget(t.Target) { + q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target) } - } + + q.runTask(t, j, taskCtx) + + if q.kmsgWatcher != nil { + q.kmsgWatcher.NotifyTaskFinished(t.ID) + } + + q.mu.Lock() + now2 := time.Now() + t.DoneAt = &now2 + if t.Status == TaskRunning { + if j.err != "" { + t.Status = TaskFailed + t.ErrMsg = j.err + } else { + t.Status = TaskDone + } + } + q.persistLocked() + q.mu.Unlock() + }() + } + wg.Wait() + + if len(batch) > 0 { + q.mu.Lock() q.prune() q.persistLocked() q.mu.Unlock() } + setCPUGovernor("powersave") } }