fix(webui): prevent orphaned workers on restart, reduce metrics polling, add Kill Workers button

- tasks: mark TaskRunning tasks as TaskFailed on bee-web restart instead of
  re-queueing them — prevents duplicate gpu-burn-worker spawns when bee-web
  crashes mid-test (each restart was launching a new set of 8 workers on top
  of still-alive orphans from the previous crash)
- server: reduce metrics collector interval 1s→5s, grow ring buffer to 360
  samples (30 min); cuts nvidia-smi/ipmitool/sensors subprocess rate by 5×
- platform: add KillTestWorkers() — scans /proc and SIGKILLs bee-gpu-burn,
  stress-ng, stressapptest, memtester without relying on pkill/killall
- webui: add "Kill Workers" button next to Cancel All; calls
  POST /api/tasks/kill-workers which cancels the task queue then kills
  orphaned OS-level processes; shows toast with killed count
- metricsdb: sort GPU indices and fan/temp names after map iteration to fix
  non-deterministic sample reconstruction order (flaky test)
- server: fix chartYAxisNumber to use one decimal place for 1000–9999
  (e.g. "1,7к" instead of "2к") so Y-axis ticks are distinguishable

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mikhail Chusavitin
2026-04-02 10:13:43 +03:00
parent b2b0444131
commit 1f750d3edd
7 changed files with 216 additions and 32 deletions

View File

@@ -6,6 +6,7 @@ import (
"io"
"os"
"path/filepath"
"sort"
"strconv"
"time"
@@ -217,7 +218,9 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri
}
}
// Collect unique GPU indices and fan names from loaded data (preserve order)
// Collect unique GPU indices and fan/temp names from loaded data.
// Sort each list so that sample reconstruction is deterministic regardless
// of Go's non-deterministic map iteration order.
seenGPU := map[int]bool{}
var gpuIndices []int
for k := range gpuData {
@@ -226,6 +229,8 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri
gpuIndices = append(gpuIndices, k.idx)
}
}
sort.Ints(gpuIndices)
seenFan := map[string]bool{}
var fanNames []string
for k := range fanData {
@@ -234,6 +239,8 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri
fanNames = append(fanNames, k.name)
}
}
sort.Strings(fanNames)
seenTemp := map[string]bool{}
var tempNames []string
for k := range tempData {
@@ -242,6 +249,7 @@ func (m *MetricsDB) loadSamples(query string, args ...any) ([]platform.LiveMetri
tempNames = append(tempNames, k.name)
}
}
sort.Strings(tempNames)
samples := make([]platform.LiveMetricSample, len(sysRows))
for i, r := range sysRows {

View File

@@ -1577,8 +1577,10 @@ func renderInstall() string {
// ── Tasks ─────────────────────────────────────────────────────────────────────
func renderTasks() string {
return `<div style="display:flex;align-items:center;gap:12px;margin-bottom:16px">
return `<div style="display:flex;align-items:center;gap:12px;margin-bottom:16px;flex-wrap:wrap">
<button class="btn btn-danger btn-sm" onclick="cancelAll()">Cancel All</button>
<button class="btn btn-sm" style="background:#b45309;color:#fff" onclick="killWorkers()" title="Send SIGKILL to all running test processes (bee-gpu-burn, stress-ng, stressapptest, memtester)">Kill Workers</button>
<span id="kill-toast" style="font-size:12px;color:var(--muted);display:none"></span>
<span style="font-size:12px;color:var(--muted)">Tasks run one at a time. Logs persist after navigation.</span>
</div>
<div class="card">
@@ -1639,6 +1641,21 @@ function cancelTask(id) {
function cancelAll() {
fetch('/api/tasks/cancel-all',{method:'POST'}).then(()=>loadTasks());
}
function killWorkers() {
if (!confirm('Send SIGKILL to all running test workers (bee-gpu-burn, stress-ng, stressapptest, memtester)?\n\nThis will also cancel all queued and running tasks.')) return;
fetch('/api/tasks/kill-workers',{method:'POST'})
.then(r=>r.json())
.then(d=>{
loadTasks();
var toast = document.getElementById('kill-toast');
var parts = [];
if (d.cancelled > 0) parts.push(d.cancelled+' task'+(d.cancelled===1?'':'s')+' cancelled');
if (d.killed > 0) parts.push(d.killed+' process'+(d.killed===1?'':'es')+' killed');
toast.textContent = parts.length ? parts.join(', ')+'.' : 'No processes found.';
toast.style.display = '';
setTimeout(()=>{ toast.style.display='none'; }, 5000);
});
}
function setPriority(id, delta) {
fetch('/api/tasks/'+id+'/priority',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({delta:delta})})
.then(()=>loadTasks());

View File

@@ -128,7 +128,11 @@ type namedMetricsRing struct {
Ring *metricsRing
}
const metricsChartWindow = 120
// metricsChartWindow is the number of samples kept in the live ring buffer.
// At metricsCollectInterval = 5 s this covers 30 minutes of live history.
const metricsChartWindow = 360
var metricsCollectInterval = 5 * time.Second
// pendingNetChange tracks a network state change awaiting confirmation.
type pendingNetChange struct {
@@ -238,6 +242,7 @@ func NewHandler(opts HandlerOptions) http.Handler {
// Tasks
mux.HandleFunc("GET /api/tasks", h.handleAPITasksList)
mux.HandleFunc("POST /api/tasks/cancel-all", h.handleAPITasksCancelAll)
mux.HandleFunc("POST /api/tasks/kill-workers", h.handleAPITasksKillWorkers)
mux.HandleFunc("POST /api/tasks/{id}/cancel", h.handleAPITasksCancel)
mux.HandleFunc("POST /api/tasks/{id}/priority", h.handleAPITasksPriority)
mux.HandleFunc("GET /api/tasks/{id}/stream", h.handleAPITasksStream)
@@ -301,7 +306,7 @@ func NewHandler(opts HandlerOptions) http.Handler {
func (h *handler) startMetricsCollector() {
go func() {
ticker := time.NewTicker(1 * time.Second)
ticker := time.NewTicker(metricsCollectInterval)
defer ticker.Stop()
for range ticker.C {
sample := platform.SampleLiveMetrics()
@@ -1059,9 +1064,16 @@ func chartYAxisNumber(v float64) string {
v = -v
}
var out string
if v >= 1000 {
switch {
case v >= 10000:
out = fmt.Sprintf("%dк", int((v+500)/1000))
} else {
case v >= 1000:
// Use one decimal place so ticks like 1400, 1600, 1800 read as
// "1,4к", "1,6к", "1,8к" instead of the ambiguous "1к"/"2к".
s := fmt.Sprintf("%.1f", v/1000)
s = strings.TrimRight(strings.TrimRight(s, "0"), ".")
out = strings.ReplaceAll(s, ".", ",") + "к"
default:
out = fmt.Sprintf("%.0f", v)
}
if neg {

View File

@@ -175,10 +175,13 @@ func TestChartYAxisNumber(t *testing.T) {
}{
{in: 999, want: "999"},
{in: 1000, want: "1к"},
{in: 1370, want: "1к"},
{in: 1500, want: "2к"},
{in: 1370, want: "1,4к"},
{in: 1500, want: "1,5к"},
{in: 1700, want: "1,7к"},
{in: 2000, want: "2к"},
{in: 9999, want: "10к"},
{in: 10200, want: "10к"},
{in: -1499, want: "-1к"},
{in: -1500, want: "-1,5к"},
}
for _, tc := range tests {
if got := chartYAxisNumber(tc.in); got != tc.want {

View File

@@ -716,6 +716,38 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request
writeJSON(w, map[string]int{"cancelled": n})
}
func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Request) {
// Cancel all queued/running tasks in the queue first.
globalQueue.mu.Lock()
now := time.Now()
cancelled := 0
for _, t := range globalQueue.tasks {
switch t.Status {
case TaskPending:
t.Status = TaskCancelled
t.DoneAt = &now
cancelled++
case TaskRunning:
if t.job != nil {
t.job.abort()
}
t.Status = TaskCancelled
t.DoneAt = &now
cancelled++
}
}
globalQueue.persistLocked()
globalQueue.mu.Unlock()
// Kill orphaned test worker processes at the OS level.
killed := platform.KillTestWorkers()
writeJSON(w, map[string]any{
"cancelled": cancelled,
"killed": len(killed),
"processes": killed,
})
}
func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
// Wait up to 5s for the task to get a job (it may be pending)
@@ -769,8 +801,17 @@ func (q *taskQueue) loadLocked() {
params: pt.Params,
}
q.assignTaskLogPathLocked(t)
if t.Status == TaskPending || t.Status == TaskRunning {
t.Status = TaskPending
if t.Status == TaskRunning {
// The task was interrupted by a bee-web restart. Child processes
// (e.g. bee-gpu-burn-worker) survive the restart in their own
// process groups and cannot be cancelled retroactively. Mark the
// task as failed so the user can decide whether to re-run it
// rather than blindly re-launching duplicate workers.
now := time.Now()
t.Status = TaskFailed
t.DoneAt = &now
t.ErrMsg = "interrupted by bee-web restart"
} else if t.Status == TaskPending {
t.StartedAt = nil
t.DoneAt = nil
t.ErrMsg = ""

View File

@@ -24,21 +24,34 @@ func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) {
}
started := time.Now().Add(-time.Minute)
task := &Task{
ID: "task-1",
// A task that was pending (not yet started) must be re-queued on restart.
pendingTask := &Task{
ID: "task-pending",
Name: "Memory Burn-in",
Target: "memory-stress",
Priority: 2,
Status: TaskRunning,
Status: TaskPending,
CreatedAt: time.Now().Add(-2 * time.Minute),
StartedAt: &started,
params: taskParams{
Duration: 300,
BurnProfile: "smoke",
},
params: taskParams{Duration: 300, BurnProfile: "smoke"},
}
// A task that was running when bee-web crashed must NOT be re-queued —
// its child processes (e.g. gpu-burn-worker) survive the restart in
// their own process groups and can't be cancelled retroactively.
runningTask := &Task{
ID: "task-running",
Name: "NVIDIA GPU Stress",
Target: "nvidia-stress",
Priority: 1,
Status: TaskRunning,
CreatedAt: time.Now().Add(-3 * time.Minute),
StartedAt: &started,
params: taskParams{Duration: 86400},
}
for _, task := range []*Task{pendingTask, runningTask} {
q.tasks = append(q.tasks, task)
q.assignTaskLogPathLocked(task)
}
q.tasks = append(q.tasks, task)
q.assignTaskLogPathLocked(task)
q.persistLocked()
recovered := &taskQueue{
@@ -48,21 +61,47 @@ func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) {
}
recovered.loadLocked()
if len(recovered.tasks) != 1 {
t.Fatalf("tasks=%d want 1", len(recovered.tasks))
if len(recovered.tasks) != 2 {
t.Fatalf("tasks=%d want 2", len(recovered.tasks))
}
got := recovered.tasks[0]
if got.Status != TaskPending {
t.Fatalf("status=%q want %q", got.Status, TaskPending)
byID := map[string]*Task{}
for i := range recovered.tasks {
byID[recovered.tasks[i].ID] = recovered.tasks[i]
}
if got.StartedAt != nil {
t.Fatalf("started_at=%v want nil for recovered pending task", got.StartedAt)
// Pending task must be re-queued as pending with params intact.
p := byID["task-pending"]
if p == nil {
t.Fatal("task-pending not found")
}
if got.params.Duration != 300 || got.params.BurnProfile != "smoke" {
t.Fatalf("params=%+v", got.params)
if p.Status != TaskPending {
t.Fatalf("pending task: status=%q want %q", p.Status, TaskPending)
}
if got.LogPath == "" {
t.Fatal("expected log path")
if p.StartedAt != nil {
t.Fatalf("pending task: started_at=%v want nil", p.StartedAt)
}
if p.params.Duration != 300 || p.params.BurnProfile != "smoke" {
t.Fatalf("pending task: params=%+v", p.params)
}
if p.LogPath == "" {
t.Fatal("pending task: expected log path")
}
// Running task must be marked failed, not re-queued, to prevent
// launching duplicate workers (e.g. a second set of gpu-burn-workers).
r := byID["task-running"]
if r == nil {
t.Fatal("task-running not found")
}
if r.Status != TaskFailed {
t.Fatalf("running task: status=%q want %q", r.Status, TaskFailed)
}
if r.ErrMsg == "" {
t.Fatal("running task: expected non-empty error message")
}
if r.DoneAt == nil {
t.Fatal("running task: expected done_at to be set")
}
}