feat(tasks): run all queued tasks in parallel

Tasks are now started simultaneously when multiple are enqueued (e.g.
Run All). The worker drains all pending tasks at once and launches each
in its own goroutine, waiting via WaitGroup. kmsg watcher updated to
use a shared event window with a reference counter across concurrent tasks.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mikhail Chusavitin
2026-04-03 09:15:06 +03:00
parent 444a7d16cc
commit 295a19b93a
2 changed files with 79 additions and 48 deletions

View File

@@ -14,17 +14,17 @@ import (
) )
// kmsgWatcher reads /dev/kmsg and accumulates hardware error events. // kmsgWatcher reads /dev/kmsg and accumulates hardware error events.
// During an active SAT task window it records matching lines; on task finish // It supports multiple concurrent SAT tasks: a shared event window is open
// it writes Warning status records to the component status DB. // while any SAT task is running, and flushed when all tasks complete.
type kmsgWatcher struct { type kmsgWatcher struct {
mu sync.Mutex mu sync.Mutex
activeWindow *kmsgWindow activeCount int // number of in-flight SAT tasks
window *kmsgWindow
statusDB *app.ComponentStatusDB statusDB *app.ComponentStatusDB
} }
type kmsgWindow struct { type kmsgWindow struct {
taskID string targets []string // SAT targets running concurrently
target string
startedAt time.Time startedAt time.Time
seen map[kmsgEventKey]bool seen map[kmsgEventKey]bool
events []kmsgEvent events []kmsgEvent
@@ -71,7 +71,7 @@ func (w *kmsgWatcher) run() {
continue continue
} }
w.mu.Lock() w.mu.Lock()
if w.activeWindow != nil { if w.window != nil {
w.recordEvent(evt) w.recordEvent(evt)
} }
w.mu.Unlock() w.mu.Unlock()
@@ -85,41 +85,49 @@ func (w *kmsgWatcher) run() {
// Must be called with w.mu held. // Must be called with w.mu held.
func (w *kmsgWatcher) recordEvent(evt kmsgEvent) { func (w *kmsgWatcher) recordEvent(evt kmsgEvent) {
if len(evt.ids) == 0 { if len(evt.ids) == 0 {
// Events without a device ID (e.g. MCE) — deduplicate by category.
key := kmsgEventKey{id: "", category: evt.category} key := kmsgEventKey{id: "", category: evt.category}
if !w.activeWindow.seen[key] { if !w.window.seen[key] {
w.activeWindow.seen[key] = true w.window.seen[key] = true
w.activeWindow.events = append(w.activeWindow.events, evt) w.window.events = append(w.window.events, evt)
} }
return return
} }
for _, id := range evt.ids { for _, id := range evt.ids {
key := kmsgEventKey{id: id, category: evt.category} key := kmsgEventKey{id: id, category: evt.category}
if !w.activeWindow.seen[key] { if !w.window.seen[key] {
w.activeWindow.seen[key] = true w.window.seen[key] = true
w.activeWindow.events = append(w.activeWindow.events, evt) w.window.events = append(w.window.events, evt)
} }
} }
} }
// NotifyTaskStarted opens a new event window for the given SAT task. // NotifyTaskStarted increments the active task counter and opens a shared event window
// if this is the first task starting.
func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) { func (w *kmsgWatcher) NotifyTaskStarted(taskID, target string) {
w.mu.Lock() w.mu.Lock()
defer w.mu.Unlock() defer w.mu.Unlock()
w.activeWindow = &kmsgWindow{ if w.activeCount == 0 {
taskID: taskID, w.window = &kmsgWindow{
target: target, startedAt: time.Now(),
startedAt: time.Now(), seen: make(map[kmsgEventKey]bool),
seen: make(map[kmsgEventKey]bool), }
}
w.activeCount++
if w.window != nil {
w.window.targets = append(w.window.targets, target)
} }
} }
// NotifyTaskFinished closes the event window and asynchronously writes status records. // NotifyTaskFinished decrements the active task counter. When all tasks finish,
// it flushes the accumulated events to the status DB.
func (w *kmsgWatcher) NotifyTaskFinished(taskID string) { func (w *kmsgWatcher) NotifyTaskFinished(taskID string) {
w.mu.Lock() w.mu.Lock()
window := w.activeWindow w.activeCount--
if window != nil && window.taskID == taskID { var window *kmsgWindow
w.activeWindow = nil if w.activeCount <= 0 {
w.activeCount = 0
window = w.window
w.window = nil
} }
w.mu.Unlock() w.mu.Unlock()
@@ -164,7 +172,7 @@ func (w *kmsgWatcher) flushWindow(window *kmsgWindow) {
} }
} }
for key, detail := range seen { for key, detail := range seen {
detail = "kernel error during " + window.target + " SAT: " + truncate(detail, 120) detail = "kernel error during SAT (" + strings.Join(window.targets, ",") + "): " + truncate(detail, 120)
w.statusDB.Record(key, source, "Warning", detail) w.statusDB.Record(key, source, "Warning", detail)
} }
} }

View File

@@ -393,11 +393,13 @@ func (q *taskQueue) worker() {
for { for {
<-q.trigger <-q.trigger
setCPUGovernor("performance") setCPUGovernor("performance")
// Drain all pending tasks and start them in parallel.
q.mu.Lock()
var batch []*Task
for { for {
q.mu.Lock()
t := q.nextPending() t := q.nextPending()
if t == nil { if t == nil {
q.mu.Unlock()
break break
} }
now := time.Now() now := time.Now()
@@ -406,37 +408,58 @@ func (q *taskQueue) worker() {
t.DoneAt = nil t.DoneAt = nil
t.ErrMsg = "" t.ErrMsg = ""
j := newTaskJobState(t.LogPath) j := newTaskJobState(t.LogPath)
ctx, cancel := context.WithCancel(context.Background())
j.cancel = cancel
t.job = j t.job = j
batch = append(batch, t)
}
if len(batch) > 0 {
q.persistLocked() q.persistLocked()
q.mu.Unlock() }
q.mu.Unlock()
if q.kmsgWatcher != nil && isSATTarget(t.Target) { var wg sync.WaitGroup
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target) for _, t := range batch {
} t := t
j := t.job
taskCtx, taskCancel := context.WithCancel(context.Background())
j.cancel = taskCancel
wg.Add(1)
go func() {
defer wg.Done()
q.runTask(t, j, ctx) if q.kmsgWatcher != nil && isSATTarget(t.Target) {
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
if q.kmsgWatcher != nil {
q.kmsgWatcher.NotifyTaskFinished(t.ID)
}
q.mu.Lock()
now2 := time.Now()
t.DoneAt = &now2
if t.Status == TaskRunning { // not cancelled externally
if j.err != "" {
t.Status = TaskFailed
t.ErrMsg = j.err
} else {
t.Status = TaskDone
} }
}
q.runTask(t, j, taskCtx)
if q.kmsgWatcher != nil {
q.kmsgWatcher.NotifyTaskFinished(t.ID)
}
q.mu.Lock()
now2 := time.Now()
t.DoneAt = &now2
if t.Status == TaskRunning {
if j.err != "" {
t.Status = TaskFailed
t.ErrMsg = j.err
} else {
t.Status = TaskDone
}
}
q.persistLocked()
q.mu.Unlock()
}()
}
wg.Wait()
if len(batch) > 0 {
q.mu.Lock()
q.prune() q.prune()
q.persistLocked() q.persistLocked()
q.mu.Unlock() q.mu.Unlock()
} }
setCPUGovernor("powersave") setCPUGovernor("powersave")
} }
} }