470 lines
12 KiB
Go
470 lines
12 KiB
Go
package webui
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"bee/audit/internal/app"
|
|
)
|
|
|
|
func TestTaskQueuePersistsAndRecoversPendingTasks(t *testing.T) {
|
|
dir := t.TempDir()
|
|
q := &taskQueue{
|
|
statePath: filepath.Join(dir, "tasks-state.json"),
|
|
logsDir: filepath.Join(dir, "tasks"),
|
|
trigger: make(chan struct{}, 1),
|
|
}
|
|
if err := os.MkdirAll(q.logsDir, 0755); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
started := time.Now().Add(-time.Minute)
|
|
|
|
// A task that was pending (not yet started) must be re-queued on restart.
|
|
pendingTask := &Task{
|
|
ID: "task-pending",
|
|
Name: "Memory Burn-in",
|
|
Target: "memory-stress",
|
|
Priority: 2,
|
|
Status: TaskPending,
|
|
CreatedAt: time.Now().Add(-2 * time.Minute),
|
|
params: taskParams{Duration: 300, BurnProfile: "smoke"},
|
|
}
|
|
// A task that was running when bee-web crashed must NOT be re-queued —
|
|
// its child processes (e.g. gpu-burn-worker) survive the restart in
|
|
// their own process groups and can't be cancelled retroactively.
|
|
runningTask := &Task{
|
|
ID: "task-running",
|
|
Name: "NVIDIA GPU Stress",
|
|
Target: "nvidia-stress",
|
|
Priority: 1,
|
|
Status: TaskRunning,
|
|
CreatedAt: time.Now().Add(-3 * time.Minute),
|
|
StartedAt: &started,
|
|
params: taskParams{Duration: 86400},
|
|
}
|
|
for _, task := range []*Task{pendingTask, runningTask} {
|
|
q.tasks = append(q.tasks, task)
|
|
q.assignTaskLogPathLocked(task)
|
|
}
|
|
q.persistLocked()
|
|
|
|
recovered := &taskQueue{
|
|
statePath: q.statePath,
|
|
logsDir: q.logsDir,
|
|
trigger: make(chan struct{}, 1),
|
|
}
|
|
recovered.loadLocked()
|
|
|
|
if len(recovered.tasks) != 2 {
|
|
t.Fatalf("tasks=%d want 2", len(recovered.tasks))
|
|
}
|
|
|
|
byID := map[string]*Task{}
|
|
for i := range recovered.tasks {
|
|
byID[recovered.tasks[i].ID] = recovered.tasks[i]
|
|
}
|
|
|
|
// Pending task must be re-queued as pending with params intact.
|
|
p := byID["task-pending"]
|
|
if p == nil {
|
|
t.Fatal("task-pending not found")
|
|
}
|
|
if p.Status != TaskPending {
|
|
t.Fatalf("pending task: status=%q want %q", p.Status, TaskPending)
|
|
}
|
|
if p.StartedAt != nil {
|
|
t.Fatalf("pending task: started_at=%v want nil", p.StartedAt)
|
|
}
|
|
if p.params.Duration != 300 || p.params.BurnProfile != "smoke" {
|
|
t.Fatalf("pending task: params=%+v", p.params)
|
|
}
|
|
if p.LogPath == "" {
|
|
t.Fatal("pending task: expected log path")
|
|
}
|
|
|
|
// Running task must be marked failed, not re-queued, to prevent
|
|
// launching duplicate workers (e.g. a second set of gpu-burn-workers).
|
|
r := byID["task-running"]
|
|
if r == nil {
|
|
t.Fatal("task-running not found")
|
|
}
|
|
if r.Status != TaskFailed {
|
|
t.Fatalf("running task: status=%q want %q", r.Status, TaskFailed)
|
|
}
|
|
if r.ErrMsg == "" {
|
|
t.Fatal("running task: expected non-empty error message")
|
|
}
|
|
if r.DoneAt == nil {
|
|
t.Fatal("running task: expected done_at to be set")
|
|
}
|
|
}
|
|
|
|
func TestNewTaskJobStateLoadsExistingLog(t *testing.T) {
|
|
dir := t.TempDir()
|
|
path := filepath.Join(dir, "task.log")
|
|
if err := os.WriteFile(path, []byte("line1\nline2\n"), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
j := newTaskJobState(path)
|
|
existing, ch := j.subscribe()
|
|
if ch == nil {
|
|
t.Fatal("expected live subscription channel")
|
|
}
|
|
if len(existing) != 2 || existing[0] != "line1" || existing[1] != "line2" {
|
|
t.Fatalf("existing=%v", existing)
|
|
}
|
|
}
|
|
|
|
func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
|
|
now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
|
|
q := &taskQueue{
|
|
tasks: []*Task{
|
|
{
|
|
ID: "old-running",
|
|
Name: "Old Running",
|
|
Status: TaskRunning,
|
|
Priority: 10,
|
|
CreatedAt: now.Add(-3 * time.Minute),
|
|
},
|
|
{
|
|
ID: "new-done",
|
|
Name: "New Done",
|
|
Status: TaskDone,
|
|
Priority: 0,
|
|
CreatedAt: now.Add(-1 * time.Minute),
|
|
},
|
|
{
|
|
ID: "mid-pending",
|
|
Name: "Mid Pending",
|
|
Status: TaskPending,
|
|
Priority: 1,
|
|
CreatedAt: now.Add(-2 * time.Minute),
|
|
},
|
|
},
|
|
}
|
|
|
|
got := q.snapshot()
|
|
if len(got) != 3 {
|
|
t.Fatalf("snapshot len=%d want 3", len(got))
|
|
}
|
|
if got[0].ID != "new-done" || got[1].ID != "mid-pending" || got[2].ID != "old-running" {
|
|
t.Fatalf("snapshot order=%q,%q,%q", got[0].ID, got[1].ID, got[2].ID)
|
|
}
|
|
}
|
|
|
|
func TestHandleAPITasksStreamReplaysPersistedLogWithoutLiveJob(t *testing.T) {
|
|
dir := t.TempDir()
|
|
logPath := filepath.Join(dir, "task.log")
|
|
if err := os.WriteFile(logPath, []byte("line1\nline2\n"), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
globalQueue.mu.Lock()
|
|
origTasks := globalQueue.tasks
|
|
globalQueue.tasks = []*Task{{
|
|
ID: "done-1",
|
|
Name: "Done Task",
|
|
Status: TaskDone,
|
|
CreatedAt: time.Now(),
|
|
LogPath: logPath,
|
|
}}
|
|
globalQueue.mu.Unlock()
|
|
t.Cleanup(func() {
|
|
globalQueue.mu.Lock()
|
|
globalQueue.tasks = origTasks
|
|
globalQueue.mu.Unlock()
|
|
})
|
|
|
|
req := httptest.NewRequest(http.MethodGet, "/api/tasks/done-1/stream", nil)
|
|
req.SetPathValue("id", "done-1")
|
|
rec := httptest.NewRecorder()
|
|
|
|
h := &handler{}
|
|
h.handleAPITasksStream(rec, req)
|
|
|
|
if rec.Code != http.StatusOK {
|
|
t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
|
|
}
|
|
body := rec.Body.String()
|
|
if !strings.Contains(body, "data: line1\n\n") || !strings.Contains(body, "data: line2\n\n") {
|
|
t.Fatalf("body=%q", body)
|
|
}
|
|
if !strings.Contains(body, "event: done\n") {
|
|
t.Fatalf("missing done event: %q", body)
|
|
}
|
|
}
|
|
|
|
func TestHandleAPITasksStreamPendingTaskStartsSSEImmediately(t *testing.T) {
|
|
globalQueue.mu.Lock()
|
|
origTasks := globalQueue.tasks
|
|
globalQueue.tasks = []*Task{{
|
|
ID: "pending-1",
|
|
Name: "Pending Task",
|
|
Status: TaskPending,
|
|
CreatedAt: time.Now(),
|
|
}}
|
|
globalQueue.mu.Unlock()
|
|
t.Cleanup(func() {
|
|
globalQueue.mu.Lock()
|
|
globalQueue.tasks = origTasks
|
|
globalQueue.mu.Unlock()
|
|
})
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
req := httptest.NewRequest(http.MethodGet, "/api/tasks/pending-1/stream", nil).WithContext(ctx)
|
|
req.SetPathValue("id", "pending-1")
|
|
rec := httptest.NewRecorder()
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
h := &handler{}
|
|
h.handleAPITasksStream(rec, req)
|
|
close(done)
|
|
}()
|
|
|
|
deadline := time.Now().Add(2 * time.Second)
|
|
for time.Now().Before(deadline) {
|
|
if strings.Contains(rec.Body.String(), "Task is queued. Waiting for worker...") {
|
|
cancel()
|
|
<-done
|
|
if rec.Code != http.StatusOK {
|
|
t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
|
|
}
|
|
return
|
|
}
|
|
time.Sleep(20 * time.Millisecond)
|
|
}
|
|
cancel()
|
|
<-done
|
|
t.Fatalf("stream did not emit queued status promptly, body=%q", rec.Body.String())
|
|
}
|
|
|
|
func TestResolveBurnPreset(t *testing.T) {
|
|
tests := []struct {
|
|
profile string
|
|
want burnPreset
|
|
}{
|
|
{profile: "smoke", want: burnPreset{NvidiaDiag: 1, DurationSec: 5 * 60}},
|
|
{profile: "acceptance", want: burnPreset{NvidiaDiag: 3, DurationSec: 60 * 60}},
|
|
{profile: "overnight", want: burnPreset{NvidiaDiag: 4, DurationSec: 8 * 60 * 60}},
|
|
{profile: "", want: burnPreset{NvidiaDiag: 1, DurationSec: 5 * 60}},
|
|
}
|
|
for _, tc := range tests {
|
|
if got := resolveBurnPreset(tc.profile); got != tc.want {
|
|
t.Fatalf("resolveBurnPreset(%q)=%+v want %+v", tc.profile, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestTaskDisplayNameUsesNvidiaStressLoader(t *testing.T) {
|
|
tests := []struct {
|
|
loader string
|
|
want string
|
|
}{
|
|
{loader: "", want: "NVIDIA GPU Stress (bee-gpu-burn)"},
|
|
{loader: "builtin", want: "NVIDIA GPU Stress (bee-gpu-burn)"},
|
|
{loader: "john", want: "NVIDIA GPU Stress (John/OpenCL)"},
|
|
{loader: "nccl", want: "NVIDIA GPU Stress (NCCL)"},
|
|
}
|
|
for _, tc := range tests {
|
|
if got := taskDisplayName("nvidia-stress", "acceptance", tc.loader); got != tc.want {
|
|
t.Fatalf("taskDisplayName(loader=%q)=%q want %q", tc.loader, got, tc.want)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestRunTaskHonorsCancel(t *testing.T) {
|
|
blocked := make(chan struct{})
|
|
released := make(chan struct{})
|
|
aRun := func(_ any, ctx context.Context, _ string, _ int, _ func(string)) (string, error) {
|
|
close(blocked)
|
|
select {
|
|
case <-ctx.Done():
|
|
close(released)
|
|
return "", ctx.Err()
|
|
case <-time.After(5 * time.Second):
|
|
close(released)
|
|
return "unexpected", nil
|
|
}
|
|
}
|
|
|
|
q := &taskQueue{
|
|
opts: &HandlerOptions{App: &app.App{}},
|
|
}
|
|
tk := &Task{
|
|
ID: "cpu-1",
|
|
Name: "CPU SAT",
|
|
Target: "cpu",
|
|
Status: TaskRunning,
|
|
CreatedAt: time.Now(),
|
|
params: taskParams{Duration: 60},
|
|
}
|
|
j := &jobState{}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
j.cancel = cancel
|
|
tk.job = j
|
|
|
|
orig := runCPUAcceptancePackCtx
|
|
runCPUAcceptancePackCtx = func(_ *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return aRun(nil, ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
defer func() { runCPUAcceptancePackCtx = orig }()
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
q.runTask(tk, j, ctx)
|
|
close(done)
|
|
}()
|
|
|
|
<-blocked
|
|
j.abort()
|
|
|
|
select {
|
|
case <-released:
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("task did not observe cancel")
|
|
}
|
|
select {
|
|
case <-done:
|
|
case <-time.After(2 * time.Second):
|
|
t.Fatal("runTask did not return after cancel")
|
|
}
|
|
}
|
|
|
|
func TestRunTaskUsesBurnProfileDurationForCPU(t *testing.T) {
|
|
var gotDuration int
|
|
q := &taskQueue{
|
|
opts: &HandlerOptions{App: &app.App{}},
|
|
}
|
|
tk := &Task{
|
|
ID: "cpu-burn-1",
|
|
Name: "CPU Burn-in",
|
|
Target: "cpu",
|
|
Status: TaskRunning,
|
|
CreatedAt: time.Now(),
|
|
params: taskParams{BurnProfile: "smoke"},
|
|
}
|
|
j := &jobState{}
|
|
|
|
orig := runCPUAcceptancePackCtx
|
|
runCPUAcceptancePackCtx = func(_ *app.App, _ context.Context, _ string, durationSec int, _ func(string)) (string, error) {
|
|
gotDuration = durationSec
|
|
return "/tmp/cpu-burn.tar.gz", nil
|
|
}
|
|
defer func() { runCPUAcceptancePackCtx = orig }()
|
|
|
|
q.runTask(tk, j, context.Background())
|
|
|
|
if gotDuration != 5*60 {
|
|
t.Fatalf("duration=%d want %d", gotDuration, 5*60)
|
|
}
|
|
}
|
|
|
|
func TestRunTaskBuildsSupportBundleWithoutApp(t *testing.T) {
|
|
dir := t.TempDir()
|
|
q := &taskQueue{
|
|
opts: &HandlerOptions{ExportDir: dir},
|
|
}
|
|
tk := &Task{
|
|
ID: "support-bundle-1",
|
|
Name: "Support Bundle",
|
|
Target: "support-bundle",
|
|
Status: TaskRunning,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
j := &jobState{}
|
|
|
|
var gotExportDir string
|
|
orig := buildSupportBundle
|
|
buildSupportBundle = func(exportDir string) (string, error) {
|
|
gotExportDir = exportDir
|
|
return filepath.Join(exportDir, "bundle.tar.gz"), nil
|
|
}
|
|
defer func() { buildSupportBundle = orig }()
|
|
|
|
q.runTask(tk, j, context.Background())
|
|
|
|
if gotExportDir != dir {
|
|
t.Fatalf("exportDir=%q want %q", gotExportDir, dir)
|
|
}
|
|
if j.err != "" {
|
|
t.Fatalf("unexpected error: %q", j.err)
|
|
}
|
|
if !strings.Contains(strings.Join(j.lines, "\n"), "Archive: "+filepath.Join(dir, "bundle.tar.gz")) {
|
|
t.Fatalf("lines=%v", j.lines)
|
|
}
|
|
}
|
|
|
|
func TestTaskElapsedSecClampsInvalidStartedAt(t *testing.T) {
|
|
now := time.Date(2026, 4, 1, 19, 10, 0, 0, time.UTC)
|
|
created := time.Date(2026, 4, 1, 19, 4, 5, 0, time.UTC)
|
|
started := time.Time{}
|
|
task := &Task{
|
|
Status: TaskRunning,
|
|
CreatedAt: created,
|
|
StartedAt: &started,
|
|
}
|
|
if got := taskElapsedSec(task, now); got != 0 {
|
|
t.Fatalf("taskElapsedSec(zero start)=%d want 0", got)
|
|
}
|
|
|
|
stale := created.Add(-24 * time.Hour)
|
|
task.StartedAt = &stale
|
|
if got := taskElapsedSec(task, now); got != int(now.Sub(created).Seconds()) {
|
|
t.Fatalf("taskElapsedSec(stale start)=%d want %d", got, int(now.Sub(created).Seconds()))
|
|
}
|
|
}
|
|
|
|
func TestRunTaskInstallUsesSharedCommandStreaming(t *testing.T) {
|
|
q := &taskQueue{
|
|
opts: &HandlerOptions{},
|
|
}
|
|
tk := &Task{
|
|
ID: "install-1",
|
|
Name: "Install to Disk",
|
|
Target: "install",
|
|
Status: TaskRunning,
|
|
CreatedAt: time.Now(),
|
|
params: taskParams{Device: "/dev/sda"},
|
|
}
|
|
j := &jobState{}
|
|
|
|
var gotDevice string
|
|
var gotLogPath string
|
|
orig := installCommand
|
|
installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd {
|
|
gotDevice = device
|
|
gotLogPath = logPath
|
|
return exec.CommandContext(ctx, "sh", "-c", "printf 'line1\nline2\n'")
|
|
}
|
|
defer func() { installCommand = orig }()
|
|
|
|
q.runTask(tk, j, context.Background())
|
|
|
|
if gotDevice != "/dev/sda" {
|
|
t.Fatalf("device=%q want /dev/sda", gotDevice)
|
|
}
|
|
if gotLogPath == "" {
|
|
t.Fatal("expected install log path")
|
|
}
|
|
logs := strings.Join(j.lines, "\n")
|
|
if !strings.Contains(logs, "Install log: ") {
|
|
t.Fatalf("missing install log line: %v", j.lines)
|
|
}
|
|
if !strings.Contains(logs, "line1") || !strings.Contains(logs, "line2") {
|
|
t.Fatalf("missing streamed output: %v", j.lines)
|
|
}
|
|
if j.err != "" {
|
|
t.Fatalf("unexpected error: %q", j.err)
|
|
}
|
|
}
|