package webui import ( "context" "encoding/json" "net/http" "net/http/httptest" "os" "os/exec" "path/filepath" "strings" "testing" "time" "bee/audit/internal/app" "bee/audit/internal/platform" ) func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) { dir := t.TempDir() q := &taskQueue{ statePath: filepath.Join(dir, "tasks-state.json"), logsDir: filepath.Join(dir, "tasks"), trigger: make(chan struct{}, 1), } if err := os.MkdirAll(q.logsDir, 0755); err != nil { t.Fatal(err) } started := time.Now().Add(-time.Minute) // A task that was pending (not yet started) must be re-queued on restart. pendingTask := &Task{ ID: "task-pending", Name: "Memory Burn-in", Target: "memory-stress", Priority: 2, Status: TaskPending, CreatedAt: time.Now().Add(-2 * time.Minute), params: taskParams{Duration: 300, BurnProfile: "smoke"}, } // A task that was running when bee-web crashed must NOT be re-queued — // its child processes (e.g. gpu-burn-worker) survive the restart in // their own process groups and can't be cancelled retroactively. runningTask := &Task{ ID: "task-running", Name: "NVIDIA GPU Stress", Target: "nvidia-stress", Priority: 1, Status: TaskRunning, CreatedAt: time.Now().Add(-3 * time.Minute), StartedAt: &started, params: taskParams{Duration: 86400}, } for _, task := range []*Task{pendingTask, runningTask} { q.tasks = append(q.tasks, task) q.assignTaskLogPathLocked(task) } q.persistLocked() recovered := &taskQueue{ statePath: q.statePath, logsDir: q.logsDir, trigger: make(chan struct{}, 1), } recovered.loadLocked() if len(recovered.tasks) != 2 { t.Fatalf("tasks=%d want 2", len(recovered.tasks)) } byID := map[string]*Task{} for i := range recovered.tasks { byID[recovered.tasks[i].ID] = recovered.tasks[i] } // Pending task must be re-queued as pending with params intact. p := byID["task-pending"] if p == nil { t.Fatal("task-pending not found") } if p.Status != TaskPending { t.Fatalf("pending task: status=%q want %q", p.Status, TaskPending) } if p.StartedAt != nil { t.Fatalf("pending task: started_at=%v want nil", p.StartedAt) } if p.params.Duration != 300 || p.params.BurnProfile != "smoke" { t.Fatalf("pending task: params=%+v", p.params) } if p.LogPath == "" { t.Fatal("pending task: expected log path") } // Running task must be marked failed, not re-queued, to prevent // launching duplicate workers (e.g. a second set of gpu-burn-workers). r := byID["task-running"] if r == nil { t.Fatal("task-running not found") } if r.Status != TaskFailed { t.Fatalf("running task: status=%q want %q", r.Status, TaskFailed) } if r.ErrMsg == "" { t.Fatal("running task: expected non-empty error message") } if r.DoneAt == nil { t.Fatal("running task: expected done_at to be set") } } func TestNewTaskJobStateLoadsExistingLog(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "task.log") if err := os.WriteFile(path, []byte("line1\nline2\n"), 0644); err != nil { t.Fatal(err) } j := newTaskJobState(path) existing, ch := j.subscribe() if ch == nil { t.Fatal("expected live subscription channel") } if len(existing) != 2 || existing[0] != "line1" || existing[1] != "line2" { t.Fatalf("existing=%v", existing) } } func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) { now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC) q := &taskQueue{ tasks: []*Task{ { ID: "old-running", Name: "Old Running", Status: TaskRunning, Priority: 10, CreatedAt: now.Add(-3 * time.Minute), }, { ID: "new-done", Name: "New Done", Status: TaskDone, Priority: 0, CreatedAt: now.Add(-1 * time.Minute), }, { ID: "mid-pending", Name: "Mid Pending", Status: TaskPending, Priority: 1, CreatedAt: now.Add(-2 * time.Minute), }, }, } got := q.snapshot() if len(got) != 3 { t.Fatalf("snapshot len=%d want 3", len(got)) } if got[0].ID != "new-done" || got[1].ID != "mid-pending" || got[2].ID != "old-running" { t.Fatalf("snapshot order=%q,%q,%q", got[0].ID, got[1].ID, got[2].ID) } } func TestNewJobIDUsesTASKPrefixAndZeroPadding(t *testing.T) { globalQueue.mu.Lock() origTasks := globalQueue.tasks globalQueue.tasks = nil globalQueue.mu.Unlock() origCounter := jobCounter.Load() jobCounter.Store(0) t.Cleanup(func() { globalQueue.mu.Lock() globalQueue.tasks = origTasks globalQueue.mu.Unlock() jobCounter.Store(origCounter) }) if got := newJobID("ignored"); got != "TASK-000" { t.Fatalf("id=%q want TASK-000", got) } if got := newJobID("ignored"); got != "TASK-001" { t.Fatalf("id=%q want TASK-001", got) } } func TestTaskArtifactsDirStartsWithTaskNumber(t *testing.T) { root := t.TempDir() task := &Task{ ID: "TASK-007", Name: "NVIDIA Benchmark", } got := filepath.Base(taskArtifactsDir(root, task, TaskDone)) if !strings.HasPrefix(got, "007_") { t.Fatalf("artifacts dir=%q want prefix 007_", got) } } func TestHandleAPITasksStreamReplaysPersistedLogWithoutLiveJob(t *testing.T) { dir := t.TempDir() logPath := filepath.Join(dir, "task.log") if err := os.WriteFile(logPath, []byte("line1\nline2\n"), 0644); err != nil { t.Fatal(err) } globalQueue.mu.Lock() origTasks := globalQueue.tasks globalQueue.tasks = []*Task{{ ID: "done-1", Name: "Done Task", Status: TaskDone, CreatedAt: time.Now(), LogPath: logPath, }} globalQueue.mu.Unlock() t.Cleanup(func() { globalQueue.mu.Lock() globalQueue.tasks = origTasks globalQueue.mu.Unlock() }) req := httptest.NewRequest(http.MethodGet, "/api/tasks/done-1/stream", nil) req.SetPathValue("id", "done-1") rec := httptest.NewRecorder() h := &handler{} h.handleAPITasksStream(rec, req) if rec.Code != http.StatusOK { t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) } body := rec.Body.String() if !strings.Contains(body, "data: line1\n\n") || !strings.Contains(body, "data: line2\n\n") { t.Fatalf("body=%q", body) } if !strings.Contains(body, "event: done\n") { t.Fatalf("missing done event: %q", body) } } func TestHandleAPITasksStreamPendingTaskStartsSSEImmediately(t *testing.T) { globalQueue.mu.Lock() origTasks := globalQueue.tasks globalQueue.tasks = []*Task{{ ID: "pending-1", Name: "Pending Task", Status: TaskPending, CreatedAt: time.Now(), }} globalQueue.mu.Unlock() t.Cleanup(func() { globalQueue.mu.Lock() globalQueue.tasks = origTasks globalQueue.mu.Unlock() }) ctx, cancel := context.WithCancel(context.Background()) req := httptest.NewRequest(http.MethodGet, "/api/tasks/pending-1/stream", nil).WithContext(ctx) req.SetPathValue("id", "pending-1") rec := httptest.NewRecorder() done := make(chan struct{}) go func() { h := &handler{} h.handleAPITasksStream(rec, req) close(done) }() deadline := time.Now().Add(2 * time.Second) for time.Now().Before(deadline) { if strings.Contains(rec.Body.String(), "Task is queued. Waiting for worker...") { cancel() <-done if rec.Code != http.StatusOK { t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) } return } time.Sleep(20 * time.Millisecond) } cancel() <-done t.Fatalf("stream did not emit queued status promptly, body=%q", rec.Body.String()) } func TestFinalizeTaskRunCreatesReportFolderAndArtifacts(t *testing.T) { dir := t.TempDir() metricsPath := filepath.Join(dir, "metrics.db") prevMetricsPath := taskReportMetricsDBPath taskReportMetricsDBPath = metricsPath t.Cleanup(func() { taskReportMetricsDBPath = prevMetricsPath }) db, err := openMetricsDB(metricsPath) if err != nil { t.Fatalf("openMetricsDB: %v", err) } base := time.Now().UTC().Add(-45 * time.Second) if err := db.Write(platform.LiveMetricSample{ Timestamp: base, CPULoadPct: 42, MemLoadPct: 35, PowerW: 510, }); err != nil { t.Fatalf("Write: %v", err) } _ = db.Close() q := &taskQueue{ statePath: filepath.Join(dir, "tasks-state.json"), logsDir: filepath.Join(dir, "tasks"), trigger: make(chan struct{}, 1), } if err := os.MkdirAll(q.logsDir, 0755); err != nil { t.Fatal(err) } started := time.Now().UTC().Add(-90 * time.Second) task := &Task{ ID: "task-1", Name: "CPU SAT", Target: "cpu", Status: TaskRunning, CreatedAt: started.Add(-10 * time.Second), StartedAt: &started, } q.assignTaskLogPathLocked(task) appendJobLog(task.LogPath, "line-1") job := newTaskJobState(task.LogPath) job.finish("") q.finalizeTaskRun(task, job) if task.Status != TaskDone { t.Fatalf("status=%q want %q", task.Status, TaskDone) } if !strings.Contains(filepath.Base(task.ArtifactsDir), "_done") { t.Fatalf("artifacts dir=%q", task.ArtifactsDir) } if _, err := os.Stat(task.ReportJSONPath); err != nil { t.Fatalf("report json: %v", err) } if _, err := os.Stat(task.ReportHTMLPath); err != nil { t.Fatalf("report html: %v", err) } var report taskReport data, err := os.ReadFile(task.ReportJSONPath) if err != nil { t.Fatalf("ReadFile(report.json): %v", err) } if err := json.Unmarshal(data, &report); err != nil { t.Fatalf("Unmarshal(report.json): %v", err) } if report.ID != task.ID || report.Status != TaskDone { t.Fatalf("report=%+v", report) } if len(report.Charts) == 0 { t.Fatalf("expected charts in report, got none") } } func TestWriteTaskReportArtifactsIncludesBenchmarkResultsForTask(t *testing.T) { dir := t.TempDir() metricsPath := filepath.Join(dir, "metrics.db") prevMetricsPath := taskReportMetricsDBPath taskReportMetricsDBPath = metricsPath t.Cleanup(func() { taskReportMetricsDBPath = prevMetricsPath }) benchmarkDir := filepath.Join(dir, "bee-benchmark", "gpu-benchmark-20260406-120000") if err := os.MkdirAll(benchmarkDir, 0755); err != nil { t.Fatal(err) } result := platform.NvidiaBenchmarkResult{ GeneratedAt: time.Date(2026, time.April, 6, 12, 0, 0, 0, time.UTC), BenchmarkProfile: "standard", OverallStatus: "OK", GPUs: []platform.BenchmarkGPUResult{ { Index: 0, Name: "NVIDIA H100 PCIe", Scores: platform.BenchmarkScorecard{ CompositeScore: 1176.25, }, }, }, } raw, err := json.Marshal(result) if err != nil { t.Fatal(err) } if err := os.WriteFile(filepath.Join(benchmarkDir, "result.json"), raw, 0644); err != nil { t.Fatal(err) } artifactsDir := filepath.Join(dir, "tasks", "task-bench_done") if err := os.MkdirAll(artifactsDir, 0755); err != nil { t.Fatal(err) } task := &Task{ ID: "task-bench", Name: "NVIDIA Benchmark", Target: "nvidia-benchmark", Status: TaskDone, CreatedAt: time.Now().UTC().Add(-time.Minute), ArtifactsDir: artifactsDir, } ensureTaskReportPaths(task) logText := "line-1\nArchive: " + filepath.Join(dir, "bee-benchmark", "gpu-benchmark-20260406-120000.tar.gz") + "\n" if err := os.WriteFile(task.LogPath, []byte(logText), 0644); err != nil { t.Fatal(err) } if err := writeTaskReportArtifacts(task); err != nil { t.Fatalf("writeTaskReportArtifacts: %v", err) } body, err := os.ReadFile(task.ReportHTMLPath) if err != nil { t.Fatalf("ReadFile(report.html): %v", err) } html := string(body) for _, needle := range []string{ `Benchmark Results`, `Composite score for this benchmark task.`, `GPU #0 — NVIDIA H100 PCIe`, `1176.25`, } { if !strings.Contains(html, needle) { t.Fatalf("report missing %q: %s", needle, html) } } } func TestTaskLifecycleMirrorsToSerialConsole(t *testing.T) { var lines []string prev := taskSerialWriteLine taskSerialWriteLine = func(line string) { lines = append(lines, line) } t.Cleanup(func() { taskSerialWriteLine = prev }) dir := t.TempDir() q := &taskQueue{ statePath: filepath.Join(dir, "tasks-state.json"), logsDir: filepath.Join(dir, "tasks"), trigger: make(chan struct{}, 1), } task := &Task{ ID: "task-serial-1", Name: "CPU SAT", Target: "cpu", Status: TaskPending, CreatedAt: time.Now().UTC(), } q.enqueue(task) started := time.Now().UTC() task.Status = TaskRunning task.StartedAt = &started job := newTaskJobState(task.LogPath, taskSerialPrefix(task)) job.append("Starting CPU SAT...") job.append("CPU stress duration: 60s") job.finish("") q.finalizeTaskRun(task, job) joined := strings.Join(lines, "\n") for _, needle := range []string{ "queued", "Starting CPU SAT...", "CPU stress duration: 60s", "finished with status=done", } { if !strings.Contains(joined, needle) { t.Fatalf("serial mirror missing %q in %q", needle, joined) } } } func TestResolveBurnPreset(t *testing.T) { tests := []struct { profile string want burnPreset }{ {profile: "smoke", want: burnPreset{DurationSec: 5 * 60}}, {profile: "acceptance", want: burnPreset{DurationSec: 60 * 60}}, {profile: "overnight", want: burnPreset{DurationSec: 8 * 60 * 60}}, {profile: "", want: burnPreset{DurationSec: 5 * 60}}, } for _, tc := range tests { if got := resolveBurnPreset(tc.profile); got != tc.want { t.Fatalf("resolveBurnPreset(%q)=%+v want %+v", tc.profile, got, tc.want) } } } func TestTaskDisplayNameUsesNvidiaStressLoader(t *testing.T) { tests := []struct { loader string want string }{ {loader: "", want: "NVIDIA GPU Stress (bee-gpu-burn)"}, {loader: "builtin", want: "NVIDIA GPU Stress (bee-gpu-burn)"}, {loader: "john", want: "NVIDIA GPU Stress (John/OpenCL)"}, {loader: "nccl", want: "NVIDIA GPU Stress (NCCL)"}, } for _, tc := range tests { if got := taskDisplayName("nvidia-stress", "acceptance", tc.loader); got != tc.want { t.Fatalf("taskDisplayName(loader=%q)=%q want %q", tc.loader, got, tc.want) } } } func TestRunTaskHonorsCancel(t *testing.T) { blocked := make(chan struct{}) released := make(chan struct{}) aRun := func(_ any, ctx context.Context, _ string, _ int, _ func(string)) (string, error) { close(blocked) select { case <-ctx.Done(): close(released) return "", ctx.Err() case <-time.After(5 * time.Second): close(released) return "unexpected", nil } } q := &taskQueue{ opts: &HandlerOptions{App: &app.App{}}, } tk := &Task{ ID: "cpu-1", Name: "CPU SAT", Target: "cpu", Status: TaskRunning, CreatedAt: time.Now(), params: taskParams{Duration: 60}, } j := &jobState{} ctx, cancel := context.WithCancel(context.Background()) j.cancel = cancel tk.job = j orig := runCPUAcceptancePackCtx runCPUAcceptancePackCtx = func(_ *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return aRun(nil, ctx, baseDir, durationSec, logFunc) } defer func() { runCPUAcceptancePackCtx = orig }() done := make(chan struct{}) go func() { q.runTask(tk, j, ctx) close(done) }() <-blocked j.abort() select { case <-released: case <-time.After(2 * time.Second): t.Fatal("task did not observe cancel") } select { case <-done: case <-time.After(2 * time.Second): t.Fatal("runTask did not return after cancel") } } func TestRunTaskUsesBurnProfileDurationForCPU(t *testing.T) { var gotDuration int q := &taskQueue{ opts: &HandlerOptions{App: &app.App{}}, } tk := &Task{ ID: "cpu-burn-1", Name: "CPU Burn-in", Target: "cpu", Status: TaskRunning, CreatedAt: time.Now(), params: taskParams{BurnProfile: "smoke"}, } j := &jobState{} orig := runCPUAcceptancePackCtx runCPUAcceptancePackCtx = func(_ *app.App, _ context.Context, _ string, durationSec int, _ func(string)) (string, error) { gotDuration = durationSec return "/tmp/cpu-burn.tar.gz", nil } defer func() { runCPUAcceptancePackCtx = orig }() q.runTask(tk, j, context.Background()) if gotDuration != 5*60 { t.Fatalf("duration=%d want %d", gotDuration, 5*60) } } func TestRunTaskBuildsSupportBundleWithoutApp(t *testing.T) { dir := t.TempDir() q := &taskQueue{ opts: &HandlerOptions{ExportDir: dir}, } tk := &Task{ ID: "support-bundle-1", Name: "Support Bundle", Target: "support-bundle", Status: TaskRunning, CreatedAt: time.Now(), } j := &jobState{} var gotExportDir string orig := buildSupportBundle buildSupportBundle = func(exportDir string) (string, error) { gotExportDir = exportDir return filepath.Join(exportDir, "bundle.tar.gz"), nil } defer func() { buildSupportBundle = orig }() q.runTask(tk, j, context.Background()) if gotExportDir != dir { t.Fatalf("exportDir=%q want %q", gotExportDir, dir) } if j.err != "" { t.Fatalf("unexpected error: %q", j.err) } if !strings.Contains(strings.Join(j.lines, "\n"), "Archive: "+filepath.Join(dir, "bundle.tar.gz")) { t.Fatalf("lines=%v", j.lines) } } func TestTaskElapsedSecClampsInvalidStartedAt(t *testing.T) { now := time.Date(2026, 4, 1, 19, 10, 0, 0, time.UTC) created := time.Date(2026, 4, 1, 19, 4, 5, 0, time.UTC) started := time.Time{} task := &Task{ Status: TaskRunning, CreatedAt: created, StartedAt: &started, } if got := taskElapsedSec(task, now); got != 0 { t.Fatalf("taskElapsedSec(zero start)=%d want 0", got) } stale := created.Add(-24 * time.Hour) task.StartedAt = &stale if got := taskElapsedSec(task, now); got != int(now.Sub(created).Seconds()) { t.Fatalf("taskElapsedSec(stale start)=%d want %d", got, int(now.Sub(created).Seconds())) } } func TestRunTaskInstallUsesSharedCommandStreaming(t *testing.T) { q := &taskQueue{ opts: &HandlerOptions{}, } tk := &Task{ ID: "install-1", Name: "Install to Disk", Target: "install", Status: TaskRunning, CreatedAt: time.Now(), params: taskParams{Device: "/dev/sda"}, } j := &jobState{} var gotDevice string var gotLogPath string orig := installCommand installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd { gotDevice = device gotLogPath = logPath return exec.CommandContext(ctx, "sh", "-c", "printf 'line1\nline2\n'") } defer func() { installCommand = orig }() q.runTask(tk, j, context.Background()) if gotDevice != "/dev/sda" { t.Fatalf("device=%q want /dev/sda", gotDevice) } if gotLogPath == "" { t.Fatal("expected install log path") } logs := strings.Join(j.lines, "\n") if !strings.Contains(logs, "Install log: ") { t.Fatalf("missing install log line: %v", j.lines) } if !strings.Contains(logs, "line1") || !strings.Contains(logs, "line2") { t.Fatalf("missing streamed output: %v", j.lines) } if j.err != "" { t.Fatalf("unexpected error: %q", j.err) } } func TestExecuteTaskMarksPanicsAsFailedAndClosesKmsgWindow(t *testing.T) { dir := t.TempDir() q := &taskQueue{ opts: &HandlerOptions{App: &app.App{}}, statePath: filepath.Join(dir, "tasks-state.json"), logsDir: filepath.Join(dir, "tasks"), kmsgWatcher: newKmsgWatcher(nil), } tk := &Task{ ID: "cpu-panic-1", Name: "CPU SAT", Target: "cpu", Status: TaskRunning, CreatedAt: time.Now(), } j := &jobState{} orig := runCPUAcceptancePackCtx runCPUAcceptancePackCtx = func(_ *app.App, _ context.Context, _ string, _ int, _ func(string)) (string, error) { panic("boom") } defer func() { runCPUAcceptancePackCtx = orig }() q.executeTask(tk, j, context.Background()) if tk.Status != TaskFailed { t.Fatalf("status=%q want %q", tk.Status, TaskFailed) } if tk.DoneAt == nil { t.Fatal("expected done_at to be set") } if !strings.Contains(tk.ErrMsg, "task panic: boom") { t.Fatalf("task error=%q", tk.ErrMsg) } if !strings.Contains(j.err, "task panic: boom") { t.Fatalf("job error=%q", j.err) } q.kmsgWatcher.mu.Lock() activeCount := q.kmsgWatcher.activeCount window := q.kmsgWatcher.window q.kmsgWatcher.mu.Unlock() if activeCount != 0 { t.Fatalf("activeCount=%d want 0", activeCount) } if window != nil { t.Fatalf("expected kmsg window to be cleared, got %+v", window) } }