From 295a19b93aa60ccc6d7795960bfebe3a8e79b4b2 Mon Sep 17 00:00:00 2001 From: Mikhail Chusavitin Date: Fri, 3 Apr 2026 09:15:06 +0300 Subject: [PATCH] feat(tasks): run all queued tasks in parallel Tasks are now started simultaneously when multiple are enqueued (e.g. Run All). The worker drains all pending tasks at once and launches each in its own goroutine, waiting via WaitGroup. kmsg watcher updated to use a shared event window with a reference counter across concurrent tasks. Co-Authored-By: Claude Sonnet 4.6 --- audit/internal/webui/kmsg_watcher.go | 56 ++++++++++++---------- audit/internal/webui/tasks.go | 71 ++++++++++++++++++---------- 2 files changed, 79 insertions(+), 48 deletions(-) diff --git a/audit/internal/webui/kmsg_watcher.go b/audit/internal/webui/kmsg_watcher.go index 4e4c41d..0672ddd 100644 --- a/audit/internal/webui/kmsg_watcher.go +++ b/audit/internal/webui/kmsg_watcher.go @@ -14,17 +14,17 @@ import ( ) // kmsgWatcher reads /dev/kmsg and accumulates hardware error events. -// During an active SAT task window it records matching lines; on task finish -// it writes Warning status records to the component status DB. +// It supports multiple concurrent SAT tasks: a shared event window is open +// while any SAT task is running, and flushed when all tasks complete. type kmsgWatcher struct { mu sync.Mutex - activeWindow *kmsgWindow + activeCount int // number of in-flight SAT tasks + window *kmsgWindow statusDB *app.ComponentStatusDB } type kmsgWindow struct { - taskID string - target string + targets []string // SAT targets running concurrently startedAt time.Time seen map[kmsgEventKey]bool events []kmsgEvent @@ -71,7 +71,7 @@ func (w *kmsgWatcher) run() { continue } w.mu.Lock() - if w.activeWindow != nil { + if w.window != nil { w.recordEvent(evt) } w.mu.Unlock() @@ -85,41 +85,49 @@ func (w *kmsgWatcher) run() { // Must be called with w.mu held. func (w *kmsgWatcher) recordEvent(evt kmsgEvent) { if len(evt.ids) == 0 { - // Events without a device ID (e.g. MCE) — deduplicate by category. key := kmsgEventKey{id: "", category: evt.category} - if !w.activeWindow.seen[key] { - w.activeWindow.seen[key] = true - w.activeWindow.events = append(w.activeWindow.events, evt) + if !w.window.seen[key] { + w.window.seen[key] = true + w.window.events = append(w.window.events, evt) } return } for _, id := range evt.ids { key := kmsgEventKey{id: id, category: evt.category} - if !w.activeWindow.seen[key] { - w.activeWindow.seen[key] = true - w.activeWindow.events = append(w.activeWindow.events, evt) + if !w.window.seen[key] { + w.window.seen[key] = true + w.window.events = append(w.window.events, evt) } } } -// NotifyTaskStarted opens a new event window for the given SAT task. +// NotifyTaskStarted increments the active task counter and opens a shared event window +// if this is the first task starting. func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) { w.mu.Lock() defer w.mu.Unlock() - w.activeWindow = &kmsgWindow{ - taskID: taskID, - target: target, - startedAt: time.Now(), - seen: make(map[kmsgEventKey]bool), + if w.activeCount == 0 { + w.window = &kmsgWindow{ + startedAt: time.Now(), + seen: make(map[kmsgEventKey]bool), + } + } + w.activeCount++ + if w.window != nil { + w.window.targets = append(w.window.targets, target) } } -// NotifyTaskFinished closes the event window and asynchronously writes status records. +// NotifyTaskFinished decrements the active task counter. When all tasks finish, +// it flushes the accumulated events to the status DB. func (w *kmsgWatcher) NotifyTaskFinished(taskID string) { w.mu.Lock() - window := w.activeWindow - if window != nil && window.taskID == taskID { - w.activeWindow = nil + w.activeCount-- + var window *kmsgWindow + if w.activeCount <= 0 { + w.activeCount = 0 + window = w.window + w.window = nil } w.mu.Unlock() @@ -164,7 +172,7 @@ func (w *kmsgWatcher) flushWindow(window *kmsgWindow) { } } for key, detail := range seen { - detail = "kernel error during " + window.target + " SAT: " + truncate(detail, 120) + detail = "kernel error during SAT (" + strings.Join(window.targets, ",") + "): " + truncate(detail, 120) w.statusDB.Record(key, source, "Warning", detail) } } diff --git a/audit/internal/webui/tasks.go b/audit/internal/webui/tasks.go index 7745501..039b324 100644 --- a/audit/internal/webui/tasks.go +++ b/audit/internal/webui/tasks.go @@ -393,11 +393,13 @@ func (q *taskQueue) worker() { for { <-q.trigger setCPUGovernor("performance") + + // Drain all pending tasks and start them in parallel. + q.mu.Lock() + var batch []*Task for { - q.mu.Lock() t := q.nextPending() if t == nil { - q.mu.Unlock() break } now := time.Now() @@ -406,37 +408,58 @@ func (q *taskQueue) worker() { t.DoneAt = nil t.ErrMsg = "" j := newTaskJobState(t.LogPath) - ctx, cancel := context.WithCancel(context.Background()) - j.cancel = cancel t.job = j + batch = append(batch, t) + } + if len(batch) > 0 { q.persistLocked() - q.mu.Unlock() + } + q.mu.Unlock() - if q.kmsgWatcher != nil && isSATTarget(t.Target) { - q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target) - } + var wg sync.WaitGroup + for _, t := range batch { + t := t + j := t.job + taskCtx, taskCancel := context.WithCancel(context.Background()) + j.cancel = taskCancel + wg.Add(1) + go func() { + defer wg.Done() - q.runTask(t, j, ctx) - - if q.kmsgWatcher != nil { - q.kmsgWatcher.NotifyTaskFinished(t.ID) - } - - q.mu.Lock() - now2 := time.Now() - t.DoneAt = &now2 - if t.Status == TaskRunning { // not cancelled externally - if j.err != "" { - t.Status = TaskFailed - t.ErrMsg = j.err - } else { - t.Status = TaskDone + if q.kmsgWatcher != nil && isSATTarget(t.Target) { + q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target) } - } + + q.runTask(t, j, taskCtx) + + if q.kmsgWatcher != nil { + q.kmsgWatcher.NotifyTaskFinished(t.ID) + } + + q.mu.Lock() + now2 := time.Now() + t.DoneAt = &now2 + if t.Status == TaskRunning { + if j.err != "" { + t.Status = TaskFailed + t.ErrMsg = j.err + } else { + t.Status = TaskDone + } + } + q.persistLocked() + q.mu.Unlock() + }() + } + wg.Wait() + + if len(batch) > 0 { + q.mu.Lock() q.prune() q.persistLocked() q.mu.Unlock() } + setCPUGovernor("powersave") } }