Files
bee/audit/internal/webui/tasks_test.go
Michael Chus bf6ecab4f0 Add per-precision benchmark phases, weighted TOPS scoring, and ECC tracking
- Split steady window into 6 equal slots: fp8/fp16/fp32/fp64/fp4 + combined
- Each precision phase runs bee-gpu-burn with --precision filter so PowerCVPct reflects single-kernel stability (not round-robin artifact)
- Add fp4 support in bee-gpu-stress.c for Blackwell (cc>=100) via existing CUDA_R_4F_E2M1 guard
- Weighted TOPS: fp64×2.0, fp32×1.0, fp16×0.5, fp8×0.25, fp4×0.125
- SyntheticScore = sum of weighted TOPS from per-precision phases
- MixedScore = sum from combined phase; MixedEfficiency = Mixed/Synthetic
- ComputeScore = SyntheticScore × (1 + MixedEfficiency × 0.3)
- ECC volatile counters sampled before/after each phase and overall
- DegradationReasons: ecc_uncorrected_errors, ecc_corrected_errors
- Report: per-precision stability table with ECC columns, methodology section
- Ramp-up history table redesign: GPU indices as columns, runs as rows

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 10:49:49 +03:00

822 lines
22 KiB
Go

package webui
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"
"bee/audit/internal/app"
"bee/audit/internal/platform"
)
func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) {
dir := t.TempDir()
q := &taskQueue{
statePath: filepath.Join(dir, "tasks-state.json"),
logsDir: filepath.Join(dir, "tasks"),
trigger: make(chan struct{}, 1),
}
if err := os.MkdirAll(q.logsDir, 0755); err != nil {
t.Fatal(err)
}
started := time.Now().Add(-time.Minute)
// A task that was pending (not yet started) must be re-queued on restart.
pendingTask := &Task{
ID: "task-pending",
Name: "Memory Burn-in",
Target: "memory-stress",
Priority: 2,
Status: TaskPending,
CreatedAt: time.Now().Add(-2 * time.Minute),
params: taskParams{Duration: 300, BurnProfile: "smoke"},
}
// A task that was running when bee-web crashed must NOT be re-queued —
// its child processes (e.g. gpu-burn-worker) survive the restart in
// their own process groups and can't be cancelled retroactively.
runningTask := &Task{
ID: "task-running",
Name: "NVIDIA GPU Stress",
Target: "nvidia-stress",
Priority: 1,
Status: TaskRunning,
CreatedAt: time.Now().Add(-3 * time.Minute),
StartedAt: &started,
params: taskParams{Duration: 86400},
}
for _, task := range []*Task{pendingTask, runningTask} {
q.tasks = append(q.tasks, task)
q.assignTaskLogPathLocked(task)
}
q.persistLocked()
recovered := &taskQueue{
statePath: q.statePath,
logsDir: q.logsDir,
trigger: make(chan struct{}, 1),
}
recovered.loadLocked()
if len(recovered.tasks) != 2 {
t.Fatalf("tasks=%d want 2", len(recovered.tasks))
}
byID := map[string]*Task{}
for i := range recovered.tasks {
byID[recovered.tasks[i].ID] = recovered.tasks[i]
}
// Pending task must be re-queued as pending with params intact.
p := byID["task-pending"]
if p == nil {
t.Fatal("task-pending not found")
}
if p.Status != TaskPending {
t.Fatalf("pending task: status=%q want %q", p.Status, TaskPending)
}
if p.StartedAt != nil {
t.Fatalf("pending task: started_at=%v want nil", p.StartedAt)
}
if p.params.Duration != 300 || p.params.BurnProfile != "smoke" {
t.Fatalf("pending task: params=%+v", p.params)
}
if p.LogPath == "" {
t.Fatal("pending task: expected log path")
}
// Running task must be marked failed, not re-queued, to prevent
// launching duplicate workers (e.g. a second set of gpu-burn-workers).
r := byID["task-running"]
if r == nil {
t.Fatal("task-running not found")
}
if r.Status != TaskFailed {
t.Fatalf("running task: status=%q want %q", r.Status, TaskFailed)
}
if r.ErrMsg == "" {
t.Fatal("running task: expected non-empty error message")
}
if r.DoneAt == nil {
t.Fatal("running task: expected done_at to be set")
}
}
func TestNewTaskJobStateLoadsExistingLog(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "task.log")
if err := os.WriteFile(path, []byte("line1\nline2\n"), 0644); err != nil {
t.Fatal(err)
}
j := newTaskJobState(path)
existing, ch := j.subscribe()
if ch == nil {
t.Fatal("expected live subscription channel")
}
if len(existing) != 2 || existing[0] != "line1" || existing[1] != "line2" {
t.Fatalf("existing=%v", existing)
}
}
func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
q := &taskQueue{
tasks: []*Task{
{
ID: "old-running",
Name: "Old Running",
Status: TaskRunning,
Priority: 10,
CreatedAt: now.Add(-3 * time.Minute),
},
{
ID: "new-done",
Name: "New Done",
Status: TaskDone,
Priority: 0,
CreatedAt: now.Add(-1 * time.Minute),
},
{
ID: "mid-pending",
Name: "Mid Pending",
Status: TaskPending,
Priority: 1,
CreatedAt: now.Add(-2 * time.Minute),
},
},
}
got := q.snapshot()
if len(got) != 3 {
t.Fatalf("snapshot len=%d want 3", len(got))
}
if got[0].ID != "new-done" || got[1].ID != "mid-pending" || got[2].ID != "old-running" {
t.Fatalf("snapshot order=%q,%q,%q", got[0].ID, got[1].ID, got[2].ID)
}
}
func TestNewJobIDUsesTASKPrefixAndZeroPadding(t *testing.T) {
globalQueue.mu.Lock()
origTasks := globalQueue.tasks
globalQueue.tasks = nil
globalQueue.mu.Unlock()
origCounter := jobCounter.Load()
jobCounter.Store(0)
t.Cleanup(func() {
globalQueue.mu.Lock()
globalQueue.tasks = origTasks
globalQueue.mu.Unlock()
jobCounter.Store(origCounter)
})
if got := newJobID("ignored"); got != "TASK-000" {
t.Fatalf("id=%q want TASK-000", got)
}
if got := newJobID("ignored"); got != "TASK-001" {
t.Fatalf("id=%q want TASK-001", got)
}
}
func TestTaskArtifactsDirStartsWithTaskNumber(t *testing.T) {
root := t.TempDir()
task := &Task{
ID: "TASK-007",
Name: "NVIDIA Benchmark",
}
got := filepath.Base(taskArtifactsDir(root, task, TaskDone))
if !strings.HasPrefix(got, "007_") {
t.Fatalf("artifacts dir=%q want prefix 007_", got)
}
}
func TestHandleAPITasksStreamReplaysPersistedLogWithoutLiveJob(t *testing.T) {
dir := t.TempDir()
logPath := filepath.Join(dir, "task.log")
if err := os.WriteFile(logPath, []byte("line1\nline2\n"), 0644); err != nil {
t.Fatal(err)
}
globalQueue.mu.Lock()
origTasks := globalQueue.tasks
globalQueue.tasks = []*Task{{
ID: "done-1",
Name: "Done Task",
Status: TaskDone,
CreatedAt: time.Now(),
LogPath: logPath,
}}
globalQueue.mu.Unlock()
t.Cleanup(func() {
globalQueue.mu.Lock()
globalQueue.tasks = origTasks
globalQueue.mu.Unlock()
})
req := httptest.NewRequest(http.MethodGet, "/api/tasks/done-1/stream", nil)
req.SetPathValue("id", "done-1")
rec := httptest.NewRecorder()
h := &handler{}
h.handleAPITasksStream(rec, req)
if rec.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
}
body := rec.Body.String()
if !strings.Contains(body, "data: line1\n\n") || !strings.Contains(body, "data: line2\n\n") {
t.Fatalf("body=%q", body)
}
if !strings.Contains(body, "event: done\n") {
t.Fatalf("missing done event: %q", body)
}
}
func TestHandleAPITasksStreamPendingTaskStartsSSEImmediately(t *testing.T) {
globalQueue.mu.Lock()
origTasks := globalQueue.tasks
globalQueue.tasks = []*Task{{
ID: "pending-1",
Name: "Pending Task",
Status: TaskPending,
CreatedAt: time.Now(),
}}
globalQueue.mu.Unlock()
t.Cleanup(func() {
globalQueue.mu.Lock()
globalQueue.tasks = origTasks
globalQueue.mu.Unlock()
})
ctx, cancel := context.WithCancel(context.Background())
req := httptest.NewRequest(http.MethodGet, "/api/tasks/pending-1/stream", nil).WithContext(ctx)
req.SetPathValue("id", "pending-1")
rec := httptest.NewRecorder()
done := make(chan struct{})
go func() {
h := &handler{}
h.handleAPITasksStream(rec, req)
close(done)
}()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if strings.Contains(rec.Body.String(), "Task is queued. Waiting for worker...") {
cancel()
<-done
if rec.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
}
return
}
time.Sleep(20 * time.Millisecond)
}
cancel()
<-done
t.Fatalf("stream did not emit queued status promptly, body=%q", rec.Body.String())
}
func TestFinalizeTaskRunCreatesReportFolderAndArtifacts(t *testing.T) {
dir := t.TempDir()
metricsPath := filepath.Join(dir, "metrics.db")
prevMetricsPath := taskReportMetricsDBPath
taskReportMetricsDBPath = metricsPath
t.Cleanup(func() { taskReportMetricsDBPath = prevMetricsPath })
db, err := openMetricsDB(metricsPath)
if err != nil {
t.Fatalf("openMetricsDB: %v", err)
}
base := time.Now().UTC().Add(-45 * time.Second)
if err := db.Write(platform.LiveMetricSample{
Timestamp: base,
CPULoadPct: 42,
MemLoadPct: 35,
PowerW: 510,
}); err != nil {
t.Fatalf("Write: %v", err)
}
_ = db.Close()
q := &taskQueue{
statePath: filepath.Join(dir, "tasks-state.json"),
logsDir: filepath.Join(dir, "tasks"),
trigger: make(chan struct{}, 1),
}
if err := os.MkdirAll(q.logsDir, 0755); err != nil {
t.Fatal(err)
}
started := time.Now().UTC().Add(-90 * time.Second)
task := &Task{
ID: "task-1",
Name: "CPU SAT",
Target: "cpu",
Status: TaskRunning,
CreatedAt: started.Add(-10 * time.Second),
StartedAt: &started,
}
q.assignTaskLogPathLocked(task)
appendJobLog(task.LogPath, "line-1")
job := newTaskJobState(task.LogPath)
job.finish("")
q.finalizeTaskRun(task, job)
if task.Status != TaskDone {
t.Fatalf("status=%q want %q", task.Status, TaskDone)
}
if !strings.Contains(filepath.Base(task.ArtifactsDir), "_done") {
t.Fatalf("artifacts dir=%q", task.ArtifactsDir)
}
if _, err := os.Stat(task.ReportJSONPath); err != nil {
t.Fatalf("report json: %v", err)
}
if _, err := os.Stat(task.ReportHTMLPath); err != nil {
t.Fatalf("report html: %v", err)
}
var report taskReport
data, err := os.ReadFile(task.ReportJSONPath)
if err != nil {
t.Fatalf("ReadFile(report.json): %v", err)
}
if err := json.Unmarshal(data, &report); err != nil {
t.Fatalf("Unmarshal(report.json): %v", err)
}
if report.ID != task.ID || report.Status != TaskDone {
t.Fatalf("report=%+v", report)
}
if len(report.Charts) == 0 {
t.Fatalf("expected charts in report, got none")
}
}
func TestWriteTaskReportArtifactsIncludesBenchmarkResultsForTask(t *testing.T) {
dir := t.TempDir()
metricsPath := filepath.Join(dir, "metrics.db")
prevMetricsPath := taskReportMetricsDBPath
taskReportMetricsDBPath = metricsPath
t.Cleanup(func() { taskReportMetricsDBPath = prevMetricsPath })
benchmarkDir := filepath.Join(dir, "bee-benchmark", "gpu-benchmark-20260406-120000")
if err := os.MkdirAll(benchmarkDir, 0755); err != nil {
t.Fatal(err)
}
result := platform.NvidiaBenchmarkResult{
GeneratedAt: time.Date(2026, time.April, 6, 12, 0, 0, 0, time.UTC),
BenchmarkProfile: "standard",
OverallStatus: "OK",
GPUs: []platform.BenchmarkGPUResult{
{
Index: 0,
Name: "NVIDIA H100 PCIe",
Scores: platform.BenchmarkScorecard{
CompositeScore: 1176.25,
},
},
},
}
raw, err := json.Marshal(result)
if err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(benchmarkDir, "result.json"), raw, 0644); err != nil {
t.Fatal(err)
}
artifactsDir := filepath.Join(dir, "tasks", "task-bench_done")
if err := os.MkdirAll(artifactsDir, 0755); err != nil {
t.Fatal(err)
}
task := &Task{
ID: "task-bench",
Name: "NVIDIA Benchmark",
Target: "nvidia-benchmark",
Status: TaskDone,
CreatedAt: time.Now().UTC().Add(-time.Minute),
ArtifactsDir: artifactsDir,
}
ensureTaskReportPaths(task)
logText := "line-1\nArchive: " + filepath.Join(dir, "bee-benchmark", "gpu-benchmark-20260406-120000.tar.gz") + "\n"
if err := os.WriteFile(task.LogPath, []byte(logText), 0644); err != nil {
t.Fatal(err)
}
if err := writeTaskReportArtifacts(task); err != nil {
t.Fatalf("writeTaskReportArtifacts: %v", err)
}
body, err := os.ReadFile(task.ReportHTMLPath)
if err != nil {
t.Fatalf("ReadFile(report.html): %v", err)
}
html := string(body)
for _, needle := range []string{
`Benchmark Results`,
`Composite score for this benchmark task.`,
`GPU 0`,
`1176.25`,
} {
if !strings.Contains(html, needle) {
t.Fatalf("report missing %q: %s", needle, html)
}
}
}
func TestTaskLifecycleMirrorsToSerialConsole(t *testing.T) {
var lines []string
prev := taskSerialWriteLine
taskSerialWriteLine = func(line string) { lines = append(lines, line) }
t.Cleanup(func() { taskSerialWriteLine = prev })
dir := t.TempDir()
q := &taskQueue{
statePath: filepath.Join(dir, "tasks-state.json"),
logsDir: filepath.Join(dir, "tasks"),
trigger: make(chan struct{}, 1),
}
task := &Task{
ID: "task-serial-1",
Name: "CPU SAT",
Target: "cpu",
Status: TaskPending,
CreatedAt: time.Now().UTC(),
}
q.enqueue(task)
started := time.Now().UTC()
task.Status = TaskRunning
task.StartedAt = &started
job := newTaskJobState(task.LogPath, taskSerialPrefix(task))
job.append("Starting CPU SAT...")
job.append("CPU stress duration: 60s")
job.finish("")
q.finalizeTaskRun(task, job)
joined := strings.Join(lines, "\n")
for _, needle := range []string{
"queued",
"Starting CPU SAT...",
"CPU stress duration: 60s",
"finished with status=done",
} {
if !strings.Contains(joined, needle) {
t.Fatalf("serial mirror missing %q in %q", needle, joined)
}
}
}
func TestResolveBurnPreset(t *testing.T) {
tests := []struct {
profile string
want burnPreset
}{
{profile: "smoke", want: burnPreset{DurationSec: 5 * 60}},
{profile: "acceptance", want: burnPreset{DurationSec: 60 * 60}},
{profile: "overnight", want: burnPreset{DurationSec: 8 * 60 * 60}},
{profile: "", want: burnPreset{DurationSec: 5 * 60}},
}
for _, tc := range tests {
if got := resolveBurnPreset(tc.profile); got != tc.want {
t.Fatalf("resolveBurnPreset(%q)=%+v want %+v", tc.profile, got, tc.want)
}
}
}
func TestResolveNvidiaRampPlan(t *testing.T) {
tests := []struct {
name string
profile string
enabled bool
selected []int
want nvidiaRampSpec
wantErr string
}{
{
name: "disabled uses base preset",
profile: "acceptance",
selected: []int{0, 1},
want: nvidiaRampSpec{DurationSec: 60 * 60, TotalDurationSec: 60 * 60},
},
{
name: "smoke ramp uses two minute steps",
profile: "smoke",
enabled: true,
selected: []int{0, 1, 2},
want: nvidiaRampSpec{DurationSec: 5 * 60, StaggerSeconds: 2 * 60, TotalDurationSec: 9 * 60},
},
{
name: "acceptance ramp uses ten minute steps",
profile: "acceptance",
enabled: true,
selected: []int{0, 1, 2},
want: nvidiaRampSpec{DurationSec: 60 * 60, StaggerSeconds: 10 * 60, TotalDurationSec: 80 * 60},
},
{
name: "overnight stays at eight hours when possible",
profile: "overnight",
enabled: true,
selected: []int{0, 1, 2},
want: nvidiaRampSpec{DurationSec: 6 * 60 * 60, StaggerSeconds: 60 * 60, TotalDurationSec: 8 * 60 * 60},
},
{
name: "overnight extends to keep one hour after final gpu",
profile: "overnight",
enabled: true,
selected: []int{0, 1, 2, 3, 4, 5, 6, 7, 8},
want: nvidiaRampSpec{DurationSec: 60 * 60, StaggerSeconds: 60 * 60, TotalDurationSec: 9 * 60 * 60},
},
{
name: "overnight rejects impossible gpu count",
profile: "overnight",
enabled: true,
selected: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
wantErr: "at most 10 GPUs",
},
{
name: "enabled requires explicit selection",
profile: "smoke",
enabled: true,
wantErr: "requires explicit GPU selection",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got, err := resolveNvidiaRampPlan(tc.profile, tc.enabled, tc.selected)
if tc.wantErr != "" {
if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
t.Fatalf("err=%v want substring %q", err, tc.wantErr)
}
return
}
if err != nil {
t.Fatalf("resolveNvidiaRampPlan error: %v", err)
}
if got != tc.want {
t.Fatalf("resolveNvidiaRampPlan(%q, %t, %v)=%+v want %+v", tc.profile, tc.enabled, tc.selected, got, tc.want)
}
})
}
}
func TestTaskDisplayNameUsesNvidiaStressLoader(t *testing.T) {
tests := []struct {
loader string
want string
}{
{loader: "", want: "NVIDIA GPU Stress (bee-gpu-burn)"},
{loader: "builtin", want: "NVIDIA GPU Stress (bee-gpu-burn)"},
{loader: "john", want: "NVIDIA GPU Stress (John/OpenCL)"},
{loader: "nccl", want: "NVIDIA GPU Stress (NCCL)"},
}
for _, tc := range tests {
if got := taskDisplayName("nvidia-stress", "acceptance", tc.loader); got != tc.want {
t.Fatalf("taskDisplayName(loader=%q)=%q want %q", tc.loader, got, tc.want)
}
}
}
func TestRunTaskHonorsCancel(t *testing.T) {
blocked := make(chan struct{})
released := make(chan struct{})
aRun := func(_ any, ctx context.Context, _ string, _ int, _ func(string)) (string, error) {
close(blocked)
select {
case <-ctx.Done():
close(released)
return "", ctx.Err()
case <-time.After(5 * time.Second):
close(released)
return "unexpected", nil
}
}
q := &taskQueue{
opts: &HandlerOptions{App: &app.App{}},
}
tk := &Task{
ID: "cpu-1",
Name: "CPU SAT",
Target: "cpu",
Status: TaskRunning,
CreatedAt: time.Now(),
params: taskParams{Duration: 60},
}
j := &jobState{}
ctx, cancel := context.WithCancel(context.Background())
j.cancel = cancel
tk.job = j
orig := runCPUAcceptancePackCtx
runCPUAcceptancePackCtx = func(_ *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
return aRun(nil, ctx, baseDir, durationSec, logFunc)
}
defer func() { runCPUAcceptancePackCtx = orig }()
done := make(chan struct{})
go func() {
q.runTask(tk, j, ctx)
close(done)
}()
<-blocked
j.abort()
select {
case <-released:
case <-time.After(2 * time.Second):
t.Fatal("task did not observe cancel")
}
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("runTask did not return after cancel")
}
}
func TestRunTaskUsesBurnProfileDurationForCPU(t *testing.T) {
var gotDuration int
q := &taskQueue{
opts: &HandlerOptions{App: &app.App{}},
}
tk := &Task{
ID: "cpu-burn-1",
Name: "CPU Burn-in",
Target: "cpu",
Status: TaskRunning,
CreatedAt: time.Now(),
params: taskParams{BurnProfile: "smoke"},
}
j := &jobState{}
orig := runCPUAcceptancePackCtx
runCPUAcceptancePackCtx = func(_ *app.App, _ context.Context, _ string, durationSec int, _ func(string)) (string, error) {
gotDuration = durationSec
return "/tmp/cpu-burn.tar.gz", nil
}
defer func() { runCPUAcceptancePackCtx = orig }()
q.runTask(tk, j, context.Background())
if gotDuration != 5*60 {
t.Fatalf("duration=%d want %d", gotDuration, 5*60)
}
}
func TestRunTaskBuildsSupportBundleWithoutApp(t *testing.T) {
dir := t.TempDir()
q := &taskQueue{
opts: &HandlerOptions{ExportDir: dir},
}
tk := &Task{
ID: "support-bundle-1",
Name: "Support Bundle",
Target: "support-bundle",
Status: TaskRunning,
CreatedAt: time.Now(),
}
j := &jobState{}
var gotExportDir string
orig := buildSupportBundle
buildSupportBundle = func(exportDir string) (string, error) {
gotExportDir = exportDir
return filepath.Join(exportDir, "bundle.tar.gz"), nil
}
defer func() { buildSupportBundle = orig }()
q.runTask(tk, j, context.Background())
if gotExportDir != dir {
t.Fatalf("exportDir=%q want %q", gotExportDir, dir)
}
if j.err != "" {
t.Fatalf("unexpected error: %q", j.err)
}
if !strings.Contains(strings.Join(j.lines, "\n"), "Archive: "+filepath.Join(dir, "bundle.tar.gz")) {
t.Fatalf("lines=%v", j.lines)
}
}
func TestTaskElapsedSecClampsInvalidStartedAt(t *testing.T) {
now := time.Date(2026, 4, 1, 19, 10, 0, 0, time.UTC)
created := time.Date(2026, 4, 1, 19, 4, 5, 0, time.UTC)
started := time.Time{}
task := &Task{
Status: TaskRunning,
CreatedAt: created,
StartedAt: &started,
}
if got := taskElapsedSec(task, now); got != 0 {
t.Fatalf("taskElapsedSec(zero start)=%d want 0", got)
}
stale := created.Add(-24 * time.Hour)
task.StartedAt = &stale
if got := taskElapsedSec(task, now); got != int(now.Sub(created).Seconds()) {
t.Fatalf("taskElapsedSec(stale start)=%d want %d", got, int(now.Sub(created).Seconds()))
}
}
func TestRunTaskInstallUsesSharedCommandStreaming(t *testing.T) {
q := &taskQueue{
opts: &HandlerOptions{},
}
tk := &Task{
ID: "install-1",
Name: "Install to Disk",
Target: "install",
Status: TaskRunning,
CreatedAt: time.Now(),
params: taskParams{Device: "/dev/sda"},
}
j := &jobState{}
var gotDevice string
var gotLogPath string
orig := installCommand
installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd {
gotDevice = device
gotLogPath = logPath
return exec.CommandContext(ctx, "sh", "-c", "printf 'line1\nline2\n'")
}
defer func() { installCommand = orig }()
q.runTask(tk, j, context.Background())
if gotDevice != "/dev/sda" {
t.Fatalf("device=%q want /dev/sda", gotDevice)
}
if gotLogPath == "" {
t.Fatal("expected install log path")
}
logs := strings.Join(j.lines, "\n")
if !strings.Contains(logs, "Install log: ") {
t.Fatalf("missing install log line: %v", j.lines)
}
if !strings.Contains(logs, "line1") || !strings.Contains(logs, "line2") {
t.Fatalf("missing streamed output: %v", j.lines)
}
if j.err != "" {
t.Fatalf("unexpected error: %q", j.err)
}
}
func TestExecuteTaskMarksPanicsAsFailedAndClosesKmsgWindow(t *testing.T) {
dir := t.TempDir()
q := &taskQueue{
opts: &HandlerOptions{App: &app.App{}},
statePath: filepath.Join(dir, "tasks-state.json"),
logsDir: filepath.Join(dir, "tasks"),
kmsgWatcher: newKmsgWatcher(nil),
}
tk := &Task{
ID: "cpu-panic-1",
Name: "CPU SAT",
Target: "cpu",
Status: TaskRunning,
CreatedAt: time.Now(),
}
j := &jobState{}
orig := runCPUAcceptancePackCtx
runCPUAcceptancePackCtx = func(_ *app.App, _ context.Context, _ string, _ int, _ func(string)) (string, error) {
panic("boom")
}
defer func() { runCPUAcceptancePackCtx = orig }()
q.executeTask(tk, j, context.Background())
if tk.Status != TaskFailed {
t.Fatalf("status=%q want %q", tk.Status, TaskFailed)
}
if tk.DoneAt == nil {
t.Fatal("expected done_at to be set")
}
if !strings.Contains(tk.ErrMsg, "task panic: boom") {
t.Fatalf("task error=%q", tk.ErrMsg)
}
if !strings.Contains(j.err, "task panic: boom") {
t.Fatalf("job error=%q", j.err)
}
q.kmsgWatcher.mu.Lock()
activeCount := q.kmsgWatcher.activeCount
window := q.kmsgWatcher.window
q.kmsgWatcher.mu.Unlock()
if activeCount != 0 {
t.Fatalf("activeCount=%d want 0", activeCount)
}
if window != nil {
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
}
}