From 1f750d3eddf664d85c16dc7777948593237ee3ae Mon Sep 17 00:00:00 2001 From: Mikhail Chusavitin Date: Thu, 2 Apr 2026 10:13:43 +0300 Subject: [PATCH] fix(webui): prevent orphaned workers on restart, reduce metrics polling, add Kill Workers button MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tasks: mark TaskRunning tasks as TaskFailed on bee-web restart instead of re-queueing them — prevents duplicate gpu-burn-worker spawns when bee-web crashes mid-test (each restart was launching a new set of 8 workers on top of still-alive orphans from the previous crash) - server: reduce metrics collector interval 1s→5s, grow ring buffer to 360 samples (30 min); cuts nvidia-smi/ipmitool/sensors subprocess rate by 5× - platform: add KillTestWorkers() — scans /proc and SIGKILLs bee-gpu-burn, stress-ng, stressapptest, memtester without relying on pkill/killall - webui: add "Kill Workers" button next to Cancel All; calls POST /api/tasks/kill-workers which cancels the task queue then kills orphaned OS-level processes; shows toast with killed count - metricsdb: sort GPU indices and fan/temp names after map iteration to fix non-deterministic sample reconstruction order (flaky test) - server: fix chartYAxisNumber to use one decimal place for 1000–9999 (e.g. "1,7к" instead of "2к") so Y-axis ticks are distinguishable Co-Authored-By: Claude Sonnet 4.6 --- audit/internal/platform/kill_workers.go | 64 +++++++++++++++++++ audit/internal/webui/metricsdb.go | 10 ++- audit/internal/webui/pages.go | 19 +++++- audit/internal/webui/server.go | 20 ++++-- audit/internal/webui/server_test.go | 9 ++- audit/internal/webui/tasks.go | 45 +++++++++++++- audit/internal/webui/tasks_test.go | 81 ++++++++++++++++++------- 7 files changed, 216 insertions(+), 32 deletions(-) create mode 100644 audit/internal/platform/kill_workers.go diff --git a/audit/internal/platform/kill_workers.go b/audit/internal/platform/kill_workers.go new file mode 100644 index 0000000..ce0f65c --- /dev/null +++ b/audit/internal/platform/kill_workers.go @@ -0,0 +1,64 @@ +package platform + +import ( + "fmt" + "os" + "strconv" + "strings" + "syscall" +) + +// workerPatterns are substrings matched against /proc//cmdline to identify +// bee test worker processes that should be killed by KillTestWorkers. +var workerPatterns = []string{ + "bee-gpu-burn", + "stress-ng", + "stressapptest", + "memtester", +} + +// KilledProcess describes a process that was sent SIGKILL. +type KilledProcess struct { + PID int `json:"pid"` + Name string `json:"name"` +} + +// KillTestWorkers scans /proc for running test worker processes and sends +// SIGKILL to each one found. It returns a list of killed processes. +// Errors for individual processes (e.g. already exited) are silently ignored. +func KillTestWorkers() []KilledProcess { + entries, err := os.ReadDir("/proc") + if err != nil { + return nil + } + + var killed []KilledProcess + for _, e := range entries { + if !e.IsDir() { + continue + } + pid, err := strconv.Atoi(e.Name()) + if err != nil { + continue + } + cmdline, err := os.ReadFile(fmt.Sprintf("/proc/%d/cmdline", pid)) + if err != nil { + continue + } + // /proc/*/cmdline uses NUL bytes as argument separators. + args := strings.SplitN(strings.ReplaceAll(string(cmdline), "\x00", " "), " ", 2) + exe := strings.TrimSpace(args[0]) + base := exe + if idx := strings.LastIndexByte(exe, '/'); idx >= 0 { + base = exe[idx+1:] + } + for _, pat := range workerPatterns { + if strings.Contains(base, pat) || strings.Contains(exe, pat) { + _ = syscall.Kill(pid, syscall.SIGKILL) + killed = append(killed, KilledProcess{PID: pid, Name: base}) + break + } + } + } + return killed +} diff --git a/audit/internal/webui/metricsdb.go b/audit/internal/webui/metricsdb.go index 3039e69..090f3bd 100644 --- a/audit/internal/webui/metricsdb.go +++ b/audit/internal/webui/metricsdb.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "sort" "strconv" "time" @@ -217,7 +218,9 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri } } - // Collect unique GPU indices and fan names from loaded data (preserve order) + // Collect unique GPU indices and fan/temp names from loaded data. + // Sort each list so that sample reconstruction is deterministic regardless + // of Go's non-deterministic map iteration order. seenGPU := map[int]bool{} var gpuIndices []int for k := range gpuData { @@ -226,6 +229,8 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri gpuIndices = append(gpuIndices, k.idx) } } + sort.Ints(gpuIndices) + seenFan := map[string]bool{} var fanNames []string for k := range fanData { @@ -234,6 +239,8 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri fanNames = append(fanNames, k.name) } } + sort.Strings(fanNames) + seenTemp := map[string]bool{} var tempNames []string for k := range tempData { @@ -242,6 +249,7 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri tempNames = append(tempNames, k.name) } } + sort.Strings(tempNames) samples := make([]platform.LiveMetricSample, len(sysRows)) for i, r := range sysRows { diff --git a/audit/internal/webui/pages.go b/audit/internal/webui/pages.go index 7026a48..136ee2f 100644 --- a/audit/internal/webui/pages.go +++ b/audit/internal/webui/pages.go @@ -1577,8 +1577,10 @@ func renderInstall() string { // ── Tasks ───────────────────────────────────────────────────────────────────── func renderTasks() string { - return `
+ return `
+ + Tasks run one at a time. Logs persist after navigation.
@@ -1639,6 +1641,21 @@ function cancelTask(id) { function cancelAll() { fetch('/api/tasks/cancel-all',{method:'POST'}).then(()=>loadTasks()); } +function killWorkers() { + if (!confirm('Send SIGKILL to all running test workers (bee-gpu-burn, stress-ng, stressapptest, memtester)?\n\nThis will also cancel all queued and running tasks.')) return; + fetch('/api/tasks/kill-workers',{method:'POST'}) + .then(r=>r.json()) + .then(d=>{ + loadTasks(); + var toast = document.getElementById('kill-toast'); + var parts = []; + if (d.cancelled > 0) parts.push(d.cancelled+' task'+(d.cancelled===1?'':'s')+' cancelled'); + if (d.killed > 0) parts.push(d.killed+' process'+(d.killed===1?'':'es')+' killed'); + toast.textContent = parts.length ? parts.join(', ')+'.' : 'No processes found.'; + toast.style.display = ''; + setTimeout(()=>{ toast.style.display='none'; }, 5000); + }); +} function setPriority(id, delta) { fetch('/api/tasks/'+id+'/priority',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({delta:delta})}) .then(()=>loadTasks()); diff --git a/audit/internal/webui/server.go b/audit/internal/webui/server.go index 39daedf..d9d022b 100644 --- a/audit/internal/webui/server.go +++ b/audit/internal/webui/server.go @@ -128,7 +128,11 @@ type namedMetricsRing struct { Ring *metricsRing } -const metricsChartWindow = 120 +// metricsChartWindow is the number of samples kept in the live ring buffer. +// At metricsCollectInterval = 5 s this covers 30 minutes of live history. +const metricsChartWindow = 360 + +var metricsCollectInterval = 5 * time.Second // pendingNetChange tracks a network state change awaiting confirmation. type pendingNetChange struct { @@ -238,6 +242,7 @@ func NewHandler(opts HandlerOptions) http.Handler { // Tasks mux.HandleFunc("GET /api/tasks", h.handleAPITasksList) mux.HandleFunc("POST /api/tasks/cancel-all", h.handleAPITasksCancelAll) + mux.HandleFunc("POST /api/tasks/kill-workers", h.handleAPITasksKillWorkers) mux.HandleFunc("POST /api/tasks/{id}/cancel", h.handleAPITasksCancel) mux.HandleFunc("POST /api/tasks/{id}/priority", h.handleAPITasksPriority) mux.HandleFunc("GET /api/tasks/{id}/stream", h.handleAPITasksStream) @@ -301,7 +306,7 @@ func NewHandler(opts HandlerOptions) http.Handler { func (h *handler) startMetricsCollector() { go func() { - ticker := time.NewTicker(1 * time.Second) + ticker := time.NewTicker(metricsCollectInterval) defer ticker.Stop() for range ticker.C { sample := platform.SampleLiveMetrics() @@ -1059,9 +1064,16 @@ func chartYAxisNumber(v float64) string { v = -v } var out string - if v >= 1000 { + switch { + case v >= 10000: out = fmt.Sprintf("%dк", int((v+500)/1000)) - } else { + case v >= 1000: + // Use one decimal place so ticks like 1400, 1600, 1800 read as + // "1,4к", "1,6к", "1,8к" instead of the ambiguous "1к"/"2к". + s := fmt.Sprintf("%.1f", v/1000) + s = strings.TrimRight(strings.TrimRight(s, "0"), ".") + out = strings.ReplaceAll(s, ".", ",") + "к" + default: out = fmt.Sprintf("%.0f", v) } if neg { diff --git a/audit/internal/webui/server_test.go b/audit/internal/webui/server_test.go index 927d5a6..e563876 100644 --- a/audit/internal/webui/server_test.go +++ b/audit/internal/webui/server_test.go @@ -175,10 +175,13 @@ func TestChartYAxisNumber(t *testing.T) { }{ {in: 999, want: "999"}, {in: 1000, want: "1к"}, - {in: 1370, want: "1к"}, - {in: 1500, want: "2к"}, + {in: 1370, want: "1,4к"}, + {in: 1500, want: "1,5к"}, + {in: 1700, want: "1,7к"}, + {in: 2000, want: "2к"}, + {in: 9999, want: "10к"}, {in: 10200, want: "10к"}, - {in: -1499, want: "-1к"}, + {in: -1500, want: "-1,5к"}, } for _, tc := range tests { if got := chartYAxisNumber(tc.in); got != tc.want { diff --git a/audit/internal/webui/tasks.go b/audit/internal/webui/tasks.go index 82d14eb..4341740 100644 --- a/audit/internal/webui/tasks.go +++ b/audit/internal/webui/tasks.go @@ -716,6 +716,38 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request writeJSON(w, map[string]int{"cancelled": n}) } +func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Request) { + // Cancel all queued/running tasks in the queue first. + globalQueue.mu.Lock() + now := time.Now() + cancelled := 0 + for _, t := range globalQueue.tasks { + switch t.Status { + case TaskPending: + t.Status = TaskCancelled + t.DoneAt = &now + cancelled++ + case TaskRunning: + if t.job != nil { + t.job.abort() + } + t.Status = TaskCancelled + t.DoneAt = &now + cancelled++ + } + } + globalQueue.persistLocked() + globalQueue.mu.Unlock() + + // Kill orphaned test worker processes at the OS level. + killed := platform.KillTestWorkers() + writeJSON(w, map[string]any{ + "cancelled": cancelled, + "killed": len(killed), + "processes": killed, + }) +} + func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Wait up to 5s for the task to get a job (it may be pending) @@ -769,8 +801,17 @@ func (q *taskQueue) loadLocked() { params: pt.Params, } q.assignTaskLogPathLocked(t) - if t.Status == TaskPending || t.Status == TaskRunning { - t.Status = TaskPending + if t.Status == TaskRunning { + // The task was interrupted by a bee-web restart. Child processes + // (e.g. bee-gpu-burn-worker) survive the restart in their own + // process groups and cannot be cancelled retroactively. Mark the + // task as failed so the user can decide whether to re-run it + // rather than blindly re-launching duplicate workers. + now := time.Now() + t.Status = TaskFailed + t.DoneAt = &now + t.ErrMsg = "interrupted by bee-web restart" + } else if t.Status == TaskPending { t.StartedAt = nil t.DoneAt = nil t.ErrMsg = "" diff --git a/audit/internal/webui/tasks_test.go b/audit/internal/webui/tasks_test.go index 584b100..8ba6788 100644 --- a/audit/internal/webui/tasks_test.go +++ b/audit/internal/webui/tasks_test.go @@ -24,21 +24,34 @@ func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) { } started := time.Now().Add(-time.Minute) - task := &Task{ - ID: "task-1", + + // A task that was pending (not yet started) must be re-queued on restart. + pendingTask := &Task{ + ID: "task-pending", Name: "Memory Burn-in", Target: "memory-stress", Priority: 2, - Status: TaskRunning, + Status: TaskPending, CreatedAt: time.Now().Add(-2 * time.Minute), - StartedAt: &started, - params: taskParams{ - Duration: 300, - BurnProfile: "smoke", - }, + params: taskParams{Duration: 300, BurnProfile: "smoke"}, + } + // A task that was running when bee-web crashed must NOT be re-queued — + // its child processes (e.g. gpu-burn-worker) survive the restart in + // their own process groups and can't be cancelled retroactively. + runningTask := &Task{ + ID: "task-running", + Name: "NVIDIA GPU Stress", + Target: "nvidia-stress", + Priority: 1, + Status: TaskRunning, + CreatedAt: time.Now().Add(-3 * time.Minute), + StartedAt: &started, + params: taskParams{Duration: 86400}, + } + for _, task := range []*Task{pendingTask, runningTask} { + q.tasks = append(q.tasks, task) + q.assignTaskLogPathLocked(task) } - q.tasks = append(q.tasks, task) - q.assignTaskLogPathLocked(task) q.persistLocked() recovered := &taskQueue{ @@ -48,21 +61,47 @@ func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) { } recovered.loadLocked() - if len(recovered.tasks) != 1 { - t.Fatalf("tasks=%d want 1", len(recovered.tasks)) + if len(recovered.tasks) != 2 { + t.Fatalf("tasks=%d want 2", len(recovered.tasks)) } - got := recovered.tasks[0] - if got.Status != TaskPending { - t.Fatalf("status=%q want %q", got.Status, TaskPending) + + byID := map[string]*Task{} + for i := range recovered.tasks { + byID[recovered.tasks[i].ID] = recovered.tasks[i] } - if got.StartedAt != nil { - t.Fatalf("started_at=%v want nil for recovered pending task", got.StartedAt) + + // Pending task must be re-queued as pending with params intact. + p := byID["task-pending"] + if p == nil { + t.Fatal("task-pending not found") } - if got.params.Duration != 300 || got.params.BurnProfile != "smoke" { - t.Fatalf("params=%+v", got.params) + if p.Status != TaskPending { + t.Fatalf("pending task: status=%q want %q", p.Status, TaskPending) } - if got.LogPath == "" { - t.Fatal("expected log path") + if p.StartedAt != nil { + t.Fatalf("pending task: started_at=%v want nil", p.StartedAt) + } + if p.params.Duration != 300 || p.params.BurnProfile != "smoke" { + t.Fatalf("pending task: params=%+v", p.params) + } + if p.LogPath == "" { + t.Fatal("pending task: expected log path") + } + + // Running task must be marked failed, not re-queued, to prevent + // launching duplicate workers (e.g. a second set of gpu-burn-workers). + r := byID["task-running"] + if r == nil { + t.Fatal("task-running not found") + } + if r.Status != TaskFailed { + t.Fatalf("running task: status=%q want %q", r.Status, TaskFailed) + } + if r.ErrMsg == "" { + t.Fatal("running task: expected non-empty error message") + } + if r.DoneAt == nil { + t.Fatal("running task: expected done_at to be set") } }