Files
bee/audit/internal/webui/tasks.go

1304 lines
35 KiB
Go

package webui
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"os/exec"
"path/filepath"
"runtime/debug"
"sort"
"strings"
"sync"
"time"
"bee/audit/internal/app"
"bee/audit/internal/platform"
)
// Task statuses.
const (
TaskPending = "pending"
TaskRunning = "running"
TaskDone = "done"
TaskFailed = "failed"
TaskCancelled = "cancelled"
)
// taskNames maps target → human-readable name for validate (SAT) runs.
var taskNames = map[string]string{
"nvidia": "NVIDIA SAT",
"nvidia-targeted-stress": "NVIDIA Targeted Stress Validate (dcgmi diag targeted_stress)",
"nvidia-benchmark": "NVIDIA Benchmark",
"nvidia-compute": "NVIDIA Max Compute Load (dcgmproftester)",
"nvidia-targeted-power": "NVIDIA Targeted Power (dcgmi diag targeted_power)",
"nvidia-pulse": "NVIDIA Pulse Test (dcgmi diag pulse_test)",
"nvidia-interconnect": "NVIDIA Interconnect Test (NCCL all_reduce_perf)",
"nvidia-bandwidth": "NVIDIA Bandwidth Test (NVBandwidth)",
"nvidia-stress": "NVIDIA GPU Stress",
"memory": "Memory SAT",
"storage": "Storage SAT",
"cpu": "CPU SAT",
"amd": "AMD GPU SAT",
"amd-mem": "AMD GPU MEM Integrity",
"amd-bandwidth": "AMD GPU MEM Bandwidth",
"amd-stress": "AMD GPU Burn-in",
"memory-stress": "Memory Burn-in",
"sat-stress": "SAT Stress (stressapptest)",
"platform-stress": "Platform Thermal Cycling",
"audit": "Audit",
"support-bundle": "Support Bundle",
"install": "Install to Disk",
"install-to-ram": "Install to RAM",
}
// burnNames maps target → human-readable name when a burn profile is set.
var burnNames = map[string]string{
"nvidia": "NVIDIA Burn-in",
"memory": "Memory Burn-in",
"cpu": "CPU Burn-in",
"amd": "AMD GPU Burn-in",
}
func nvidiaStressTaskName(loader string) string {
switch strings.TrimSpace(strings.ToLower(loader)) {
case platform.NvidiaStressLoaderJohn:
return "NVIDIA GPU Stress (John/OpenCL)"
case platform.NvidiaStressLoaderNCCL:
return "NVIDIA GPU Stress (NCCL)"
default:
return "NVIDIA GPU Stress (bee-gpu-burn)"
}
}
func taskDisplayName(target, profile, loader string) string {
name := taskNames[target]
if profile != "" {
if n, ok := burnNames[target]; ok {
name = n
}
}
if target == "nvidia-stress" {
name = nvidiaStressTaskName(loader)
}
if name == "" {
name = target
}
return name
}
// Task represents one unit of work in the queue.
type Task struct {
ID string `json:"id"`
Name string `json:"name"`
Target string `json:"target"`
Priority int `json:"priority"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
DoneAt *time.Time `json:"done_at,omitempty"`
ElapsedSec int `json:"elapsed_sec,omitempty"`
ErrMsg string `json:"error,omitempty"`
LogPath string `json:"log_path,omitempty"`
ArtifactsDir string `json:"artifacts_dir,omitempty"`
ReportJSONPath string `json:"report_json_path,omitempty"`
ReportHTMLPath string `json:"report_html_path,omitempty"`
// runtime fields (not serialised)
job *jobState
params taskParams
}
// taskParams holds optional parameters parsed from the run request.
type taskParams struct {
Duration int `json:"duration,omitempty"`
StressMode bool `json:"stress_mode,omitempty"`
GPUIndices []int `json:"gpu_indices,omitempty"`
ExcludeGPUIndices []int `json:"exclude_gpu_indices,omitempty"`
StaggerGPUStart bool `json:"stagger_gpu_start,omitempty"`
SizeMB int `json:"size_mb,omitempty"`
Passes int `json:"passes,omitempty"`
Loader string `json:"loader,omitempty"`
BurnProfile string `json:"burn_profile,omitempty"`
BenchmarkProfile string `json:"benchmark_profile,omitempty"`
RunNCCL bool `json:"run_nccl,omitempty"`
ParallelGPUs bool `json:"parallel_gpus,omitempty"`
DisplayName string `json:"display_name,omitempty"`
Device string `json:"device,omitempty"` // for install
PlatformComponents []string `json:"platform_components,omitempty"`
}
type persistedTask struct {
ID string `json:"id"`
Name string `json:"name"`
Target string `json:"target"`
Priority int `json:"priority"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
DoneAt *time.Time `json:"done_at,omitempty"`
ErrMsg string `json:"error,omitempty"`
LogPath string `json:"log_path,omitempty"`
ArtifactsDir string `json:"artifacts_dir,omitempty"`
ReportJSONPath string `json:"report_json_path,omitempty"`
ReportHTMLPath string `json:"report_html_path,omitempty"`
Params taskParams `json:"params,omitempty"`
}
type burnPreset struct {
DurationSec int
}
type nvidiaRampSpec struct {
DurationSec int
StaggerSeconds int
TotalDurationSec int
}
func resolveBurnPreset(profile string) burnPreset {
switch profile {
case "overnight":
return burnPreset{DurationSec: 8 * 60 * 60}
case "acceptance":
return burnPreset{DurationSec: 60 * 60}
default:
return burnPreset{DurationSec: 5 * 60}
}
}
func resolveNvidiaRampPlan(profile string, enabled bool, selected []int) (nvidiaRampSpec, error) {
base := resolveBurnPreset(profile).DurationSec
plan := nvidiaRampSpec{
DurationSec: base,
TotalDurationSec: base,
}
if !enabled {
return plan, nil
}
count := len(selected)
if count == 0 {
return nvidiaRampSpec{}, fmt.Errorf("staggered NVIDIA burn requires explicit GPU selection")
}
if count == 1 {
return plan, nil
}
switch profile {
case "acceptance":
plan.StaggerSeconds = 10 * 60
plan.TotalDurationSec = plan.DurationSec + plan.StaggerSeconds*(count-1)
case "overnight":
plan.StaggerSeconds = 60 * 60
plan.TotalDurationSec = 8 * 60 * 60
minTotal := count * 60 * 60
if plan.TotalDurationSec < minTotal {
plan.TotalDurationSec = minTotal
}
if plan.TotalDurationSec > 10*60*60 {
return nvidiaRampSpec{}, fmt.Errorf("overnight staggered NVIDIA burn supports at most 10 GPUs")
}
plan.DurationSec = plan.TotalDurationSec - plan.StaggerSeconds*(count-1)
default:
plan.StaggerSeconds = 2 * 60
plan.TotalDurationSec = plan.DurationSec + plan.StaggerSeconds*(count-1)
}
return plan, nil
}
func resolvePlatformStressPreset(profile string) platform.PlatformStressOptions {
acceptanceCycles := []platform.PlatformStressCycle{
{LoadSec: 85, IdleSec: 5},
{LoadSec: 80, IdleSec: 10},
{LoadSec: 55, IdleSec: 5},
{LoadSec: 60, IdleSec: 0},
{LoadSec: 100, IdleSec: 10},
{LoadSec: 145, IdleSec: 15},
{LoadSec: 190, IdleSec: 20},
{LoadSec: 235, IdleSec: 25},
{LoadSec: 280, IdleSec: 30},
{LoadSec: 325, IdleSec: 35},
{LoadSec: 370, IdleSec: 40},
{LoadSec: 415, IdleSec: 45},
{LoadSec: 460, IdleSec: 50},
{LoadSec: 510, IdleSec: 0},
}
switch profile {
case "overnight":
cycles := make([]platform.PlatformStressCycle, 0, len(acceptanceCycles)*8)
for range 8 {
cycles = append(cycles, acceptanceCycles...)
}
return platform.PlatformStressOptions{Cycles: cycles}
case "acceptance":
return platform.PlatformStressOptions{Cycles: acceptanceCycles}
default: // smoke
return platform.PlatformStressOptions{Cycles: []platform.PlatformStressCycle{
{LoadSec: 85, IdleSec: 5},
{LoadSec: 80, IdleSec: 10},
{LoadSec: 55, IdleSec: 5},
{LoadSec: 60, IdleSec: 0},
}}
}
}
// taskQueue manages a priority-ordered list of tasks and runs them one at a time.
type taskQueue struct {
mu sync.Mutex
tasks []*Task
trigger chan struct{}
opts *HandlerOptions // set by startWorker
statePath string
logsDir string
started bool
kmsgWatcher *kmsgWatcher
}
var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)}
const maxTaskHistory = 50
var (
runMemoryAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, sizeMB, passes int, logFunc func(string)) (string, error) {
return a.RunMemoryAcceptancePackCtx(ctx, baseDir, sizeMB, passes, logFunc)
}
runStorageAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, extended bool, logFunc func(string)) (string, error) {
return a.RunStorageAcceptancePackCtx(ctx, baseDir, extended, logFunc)
}
runCPUAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
return a.RunCPUAcceptancePackCtx(ctx, baseDir, durationSec, logFunc)
}
runAMDAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
return a.RunAMDAcceptancePackCtx(ctx, baseDir, logFunc)
}
runAMDMemIntegrityPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
return a.RunAMDMemIntegrityPackCtx(ctx, baseDir, logFunc)
}
runAMDMemBandwidthPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
return a.RunAMDMemBandwidthPackCtx(ctx, baseDir, logFunc)
}
runNvidiaStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, opts platform.NvidiaStressOptions, logFunc func(string)) (string, error) {
return a.RunNvidiaStressPackCtx(ctx, baseDir, opts, logFunc)
}
runAMDStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
return a.RunAMDStressPackCtx(ctx, baseDir, durationSec, logFunc)
}
runMemoryStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
return a.RunMemoryStressPackCtx(ctx, baseDir, durationSec, logFunc)
}
runSATStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
return a.RunSATStressPackCtx(ctx, baseDir, durationSec, logFunc)
}
buildSupportBundle = app.BuildSupportBundle
installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd {
return exec.CommandContext(ctx, "bee-install", device, logPath)
}
)
// enqueue adds a task to the queue and notifies the worker.
func (q *taskQueue) enqueue(t *Task) {
q.mu.Lock()
q.assignTaskLogPathLocked(t)
q.tasks = append(q.tasks, t)
q.prune()
q.persistLocked()
q.mu.Unlock()
taskSerialEvent(t, "queued")
select {
case q.trigger <- struct{}{}:
default:
}
}
// prune removes oldest completed tasks beyond maxTaskHistory.
func (q *taskQueue) prune() {
var done []*Task
var active []*Task
for _, t := range q.tasks {
switch t.Status {
case TaskDone, TaskFailed, TaskCancelled:
done = append(done, t)
default:
active = append(active, t)
}
}
if len(done) > maxTaskHistory {
done = done[len(done)-maxTaskHistory:]
}
q.tasks = append(active, done...)
}
// nextPending returns the highest-priority pending task (nil if none).
func (q *taskQueue) nextPending() *Task {
var best *Task
for _, t := range q.tasks {
if t.Status != TaskPending {
continue
}
if best == nil || t.Priority > best.Priority ||
(t.Priority == best.Priority && t.CreatedAt.Before(best.CreatedAt)) {
best = t
}
}
return best
}
// findByID looks up a task by ID.
func (q *taskQueue) findByID(id string) (*Task, bool) {
q.mu.Lock()
defer q.mu.Unlock()
for _, t := range q.tasks {
if t.ID == id {
return t, true
}
}
return nil, false
}
// findJob returns the jobState for a task ID (for SSE streaming compatibility).
func (q *taskQueue) findJob(id string) (*jobState, bool) {
t, ok := q.findByID(id)
if !ok || t.job == nil {
return nil, false
}
return t.job, true
}
type taskStreamSource struct {
status string
errMsg string
logPath string
job *jobState
}
func (q *taskQueue) taskStreamSource(id string) (taskStreamSource, bool) {
q.mu.Lock()
defer q.mu.Unlock()
for _, t := range q.tasks {
if t.ID != id {
continue
}
return taskStreamSource{
status: t.Status,
errMsg: t.ErrMsg,
logPath: t.LogPath,
job: t.job,
}, true
}
return taskStreamSource{}, false
}
func (q *taskQueue) hasActiveTarget(target string) bool {
q.mu.Lock()
defer q.mu.Unlock()
for _, t := range q.tasks {
if t.Target != target {
continue
}
if t.Status == TaskPending || t.Status == TaskRunning {
return true
}
}
return false
}
// snapshot returns a copy of all tasks sorted for display with newest tasks first.
func (q *taskQueue) snapshot() []Task {
q.mu.Lock()
defer q.mu.Unlock()
out := make([]Task, len(q.tasks))
for i, t := range q.tasks {
out[i] = *t
out[i].ElapsedSec = taskElapsedSec(&out[i], time.Now())
}
sort.SliceStable(out, func(i, j int) bool {
if !out[i].CreatedAt.Equal(out[j].CreatedAt) {
return out[i].CreatedAt.After(out[j].CreatedAt)
}
si := statusOrder(out[i].Status)
sj := statusOrder(out[j].Status)
if si != sj {
return si < sj
}
if out[i].Priority != out[j].Priority {
return out[i].Priority > out[j].Priority
}
return out[i].Name < out[j].Name
})
return out
}
func statusOrder(s string) int {
switch s {
case TaskRunning:
return 0
case TaskPending:
return 1
default:
return 2
}
}
// startWorker launches the queue runner goroutine.
func (q *taskQueue) startWorker(opts *HandlerOptions) {
q.mu.Lock()
q.opts = opts
q.statePath = filepath.Join(opts.ExportDir, "tasks-state.json")
q.logsDir = filepath.Join(opts.ExportDir, "tasks")
_ = os.MkdirAll(q.logsDir, 0755)
if !q.started {
q.loadLocked()
q.started = true
goRecoverLoop("task worker", 2*time.Second, q.worker)
}
hasPending := q.nextPending() != nil
q.mu.Unlock()
if hasPending {
select {
case q.trigger <- struct{}{}:
default:
}
}
}
func (q *taskQueue) worker() {
for {
<-q.trigger
func() {
setCPUGovernor("performance")
defer setCPUGovernor("powersave")
for {
q.mu.Lock()
t := q.nextPending()
if t == nil {
q.prune()
q.persistLocked()
q.mu.Unlock()
return
}
now := time.Now()
t.Status = TaskRunning
t.StartedAt = &now
t.DoneAt = nil
t.ErrMsg = ""
j := newTaskJobState(t.LogPath, taskSerialPrefix(t))
t.job = j
q.persistLocked()
q.mu.Unlock()
taskCtx, taskCancel := context.WithCancel(context.Background())
j.cancel = taskCancel
q.executeTask(t, j, taskCtx)
taskCancel()
q.mu.Lock()
q.prune()
q.persistLocked()
q.mu.Unlock()
}
}()
}
}
func (q *taskQueue) executeTask(t *Task, j *jobState, ctx context.Context) {
startedKmsgWatch := false
defer q.finalizeTaskRun(t, j)
defer func() {
if startedKmsgWatch && q.kmsgWatcher != nil {
q.kmsgWatcher.NotifyTaskFinished(t.ID)
}
}()
defer func() {
if rec := recover(); rec != nil {
msg := fmt.Sprintf("task panic: %v", rec)
slog.Error("task panic",
"task_id", t.ID,
"target", t.Target,
"panic", fmt.Sprint(rec),
"stack", string(debug.Stack()),
)
j.append("ERROR: " + msg)
j.finish(msg)
}
}()
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
startedKmsgWatch = true
}
q.runTask(t, j, ctx)
}
func (q *taskQueue) finalizeTaskRun(t *Task, j *jobState) {
q.mu.Lock()
now := time.Now()
t.DoneAt = &now
if t.Status == TaskRunning {
if j.err != "" {
t.Status = TaskFailed
t.ErrMsg = j.err
} else {
t.Status = TaskDone
t.ErrMsg = ""
}
}
q.finalizeTaskArtifactPathsLocked(t)
q.persistLocked()
q.mu.Unlock()
if err := writeTaskReportArtifacts(t); err != nil {
appendJobLog(t.LogPath, "WARN: task report generation failed: "+err.Error())
}
if t.ErrMsg != "" {
taskSerialEvent(t, "finished with status="+t.Status+" error="+t.ErrMsg)
return
}
taskSerialEvent(t, "finished with status="+t.Status)
}
// setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files.
// Silently ignores errors (e.g. when cpufreq is not available).
func setCPUGovernor(governor string) {
matches, err := filepath.Glob("/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor")
if err != nil || len(matches) == 0 {
return
}
for _, path := range matches {
_ = os.WriteFile(path, []byte(governor), 0644)
}
}
// runTask executes the work for a task, writing output to j.
func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
if q.opts == nil {
j.append("ERROR: handler options not configured")
j.finish("handler options not configured")
return
}
a := q.opts.App
j.append(fmt.Sprintf("Starting %s...", t.Name))
if len(j.lines) > 0 {
j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339)))
}
var (
archive string
err error
)
switch t.Target {
case "nvidia":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
diagLevel := 2
if t.params.StressMode {
diagLevel = 3
}
if len(t.params.GPUIndices) > 0 || diagLevel > 0 {
result, e := a.RunNvidiaAcceptancePackWithOptions(
ctx, "", diagLevel, t.params.GPUIndices, j.append,
)
if e != nil {
err = e
} else {
archive = result.Body
}
} else {
archive, err = a.RunNvidiaAcceptancePack("", j.append)
}
case "nvidia-targeted-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if dur <= 0 {
dur = 300
}
archive, err = a.RunNvidiaTargetedStressValidatePack(ctx, "", dur, t.params.GPUIndices, j.append)
case "nvidia-benchmark":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNvidiaBenchmarkCtx(ctx, "", platform.NvidiaBenchmarkOptions{
Profile: t.params.BenchmarkProfile,
SizeMB: t.params.SizeMB,
GPUIndices: t.params.GPUIndices,
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
RunNCCL: t.params.RunNCCL,
ParallelGPUs: t.params.ParallelGPUs,
}, j.append)
case "nvidia-compute":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices)
if planErr != nil {
err = planErr
break
}
if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 {
dur = rampPlan.DurationSec
}
if rampPlan.StaggerSeconds > 0 {
j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec))
}
archive, err = a.RunNvidiaOfficialComputePack(ctx, "", dur, t.params.GPUIndices, rampPlan.StaggerSeconds, j.append)
case "nvidia-targeted-power":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = a.RunNvidiaTargetedPowerPack(ctx, "", dur, t.params.GPUIndices, j.append)
case "nvidia-pulse":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = a.RunNvidiaPulseTestPack(ctx, "", dur, t.params.GPUIndices, j.append)
case "nvidia-bandwidth":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNvidiaBandwidthPack(ctx, "", t.params.GPUIndices, j.append)
case "nvidia-interconnect":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{
DurationSec: dur,
Loader: platform.NvidiaStressLoaderNCCL,
GPUIndices: t.params.GPUIndices,
}, j.append)
case "nvidia-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices)
if planErr != nil {
err = planErr
break
}
if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 {
dur = rampPlan.DurationSec
}
if rampPlan.StaggerSeconds > 0 {
j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec))
}
archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{
DurationSec: dur,
Loader: t.params.Loader,
GPUIndices: t.params.GPUIndices,
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
StaggerSeconds: rampPlan.StaggerSeconds,
}, j.append)
case "memory":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
sizeMB, passes := 256, 1
if t.params.StressMode {
sizeMB, passes = 1024, 3
}
archive, err = runMemoryAcceptancePackCtx(a, ctx, "", sizeMB, passes, j.append)
case "storage":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runStorageAcceptancePackCtx(a, ctx, "", t.params.StressMode, j.append)
case "cpu":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
if dur <= 0 {
if t.params.StressMode {
dur = 1800
} else {
dur = 60
}
}
j.append(fmt.Sprintf("CPU stress duration: %ds", dur))
archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append)
case "amd":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append)
case "amd-mem":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append)
case "amd-bandwidth":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append)
case "amd-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append)
case "memory-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append)
case "sat-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append)
case "platform-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
opts := resolvePlatformStressPreset(t.params.BurnProfile)
opts.Components = t.params.PlatformComponents
archive, err = a.RunPlatformStress(ctx, "", opts, j.append)
case "audit":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
result, e := a.RunAuditNow(q.opts.RuntimeMode)
if e != nil {
err = e
} else {
for _, line := range splitLines(result.Body) {
j.append(line)
}
}
case "support-bundle":
j.append("Building support bundle...")
archive, err = buildSupportBundle(q.opts.ExportDir)
case "install":
if strings.TrimSpace(t.params.Device) == "" {
err = fmt.Errorf("device is required")
break
}
installLogPath := platform.InstallLogPath(t.params.Device)
j.append("Install log: " + installLogPath)
err = streamCmdJob(j, installCommand(ctx, t.params.Device, installLogPath))
case "install-to-ram":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
err = a.RunInstallToRAM(ctx, j.append)
default:
j.append("ERROR: unknown target: " + t.Target)
j.finish("unknown target")
return
}
// If the SAT archive was produced, check overall_status and write to component DB.
if archive != "" {
archivePath := app.ExtractArchivePath(archive)
if err == nil {
if app.ReadSATOverallStatus(archivePath) == "FAILED" {
err = fmt.Errorf("SAT overall_status=FAILED (see summary.txt)")
}
}
if db := q.statusDB(); db != nil {
app.ApplySATResultToDB(db, t.Target, archivePath)
}
}
if err != nil {
if ctx.Err() != nil {
j.append("Aborted.")
j.finish("aborted")
} else {
j.append("ERROR: " + err.Error())
j.finish(err.Error())
}
return
}
if archive != "" {
j.append("Archive: " + archive)
}
j.finish("")
}
func (q *taskQueue) statusDB() *app.ComponentStatusDB {
if q.opts == nil || q.opts.App == nil {
return nil
}
return q.opts.App.StatusDB
}
func splitLines(s string) []string {
var out []string
for _, l := range splitNL(s) {
if l != "" {
out = append(out, l)
}
}
return out
}
func splitNL(s string) []string {
var out []string
start := 0
for i, c := range s {
if c == '\n' {
out = append(out, s[start:i])
start = i + 1
}
}
out = append(out, s[start:])
return out
}
// ── HTTP handlers ─────────────────────────────────────────────────────────────
func (h *handler) handleAPITasksList(w http.ResponseWriter, _ *http.Request) {
tasks := globalQueue.snapshot()
writeJSON(w, tasks)
}
func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
t, ok := globalQueue.findByID(id)
if !ok {
writeError(w, http.StatusNotFound, "task not found")
return
}
globalQueue.mu.Lock()
defer globalQueue.mu.Unlock()
switch t.Status {
case TaskPending:
t.Status = TaskCancelled
now := time.Now()
t.DoneAt = &now
globalQueue.persistLocked()
taskSerialEvent(t, "finished with status="+t.Status)
writeJSON(w, map[string]string{"status": "cancelled"})
case TaskRunning:
if t.job != nil {
t.job.abort()
}
t.Status = TaskCancelled
now := time.Now()
t.DoneAt = &now
globalQueue.persistLocked()
taskSerialEvent(t, "finished with status="+t.Status)
writeJSON(w, map[string]string{"status": "cancelled"})
default:
writeError(w, http.StatusConflict, "task is not running or pending")
}
}
func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
t, ok := globalQueue.findByID(id)
if !ok {
writeError(w, http.StatusNotFound, "task not found")
return
}
var req struct {
Delta int `json:"delta"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid body")
return
}
globalQueue.mu.Lock()
defer globalQueue.mu.Unlock()
if t.Status != TaskPending {
writeError(w, http.StatusConflict, "only pending tasks can be reprioritised")
return
}
t.Priority += req.Delta
globalQueue.persistLocked()
writeJSON(w, map[string]int{"priority": t.Priority})
}
func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request) {
globalQueue.mu.Lock()
now := time.Now()
n := 0
for _, t := range globalQueue.tasks {
switch t.Status {
case TaskPending:
t.Status = TaskCancelled
t.DoneAt = &now
taskSerialEvent(t, "finished with status="+t.Status)
n++
case TaskRunning:
if t.job != nil {
t.job.abort()
}
t.Status = TaskCancelled
t.DoneAt = &now
taskSerialEvent(t, "finished with status="+t.Status)
n++
}
}
globalQueue.persistLocked()
globalQueue.mu.Unlock()
writeJSON(w, map[string]int{"cancelled": n})
}
func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Request) {
// Cancel all queued/running tasks in the queue first.
globalQueue.mu.Lock()
now := time.Now()
cancelled := 0
for _, t := range globalQueue.tasks {
switch t.Status {
case TaskPending:
t.Status = TaskCancelled
t.DoneAt = &now
taskSerialEvent(t, "finished with status="+t.Status)
cancelled++
case TaskRunning:
if t.job != nil {
t.job.abort()
}
t.Status = TaskCancelled
t.DoneAt = &now
taskSerialEvent(t, "finished with status="+t.Status)
cancelled++
}
}
globalQueue.persistLocked()
globalQueue.mu.Unlock()
// Kill orphaned test worker processes at the OS level.
killed := platform.KillTestWorkers()
writeJSON(w, map[string]any{
"cancelled": cancelled,
"killed": len(killed),
"processes": killed,
})
}
func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
src, ok := globalQueue.taskStreamSource(id)
if !ok {
http.Error(w, "task not found", http.StatusNotFound)
return
}
if src.job != nil {
streamJob(w, r, src.job)
return
}
if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled {
j := newTaskJobState(src.logPath)
j.finish(src.errMsg)
streamJob(w, r, j)
return
}
if !sseStart(w) {
return
}
sseWrite(w, "", "Task is queued. Waiting for worker...")
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
src, ok = globalQueue.taskStreamSource(id)
if !ok {
sseWrite(w, "done", "task not found")
return
}
if src.job != nil {
streamSubscribedJob(w, r, src.job)
return
}
if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled {
j := newTaskJobState(src.logPath)
j.finish(src.errMsg)
streamSubscribedJob(w, r, j)
return
}
case <-r.Context().Done():
return
}
}
}
func (q *taskQueue) assignTaskLogPathLocked(t *Task) {
if q.logsDir == "" || t.ID == "" {
return
}
q.ensureTaskArtifactPathsLocked(t)
}
func (q *taskQueue) loadLocked() {
if q.statePath == "" {
return
}
data, err := os.ReadFile(q.statePath)
if err != nil || len(data) == 0 {
return
}
var persisted []persistedTask
if err := json.Unmarshal(data, &persisted); err != nil {
return
}
for _, pt := range persisted {
t := &Task{
ID: pt.ID,
Name: pt.Name,
Target: pt.Target,
Priority: pt.Priority,
Status: pt.Status,
CreatedAt: pt.CreatedAt,
StartedAt: pt.StartedAt,
DoneAt: pt.DoneAt,
ErrMsg: pt.ErrMsg,
LogPath: pt.LogPath,
ArtifactsDir: pt.ArtifactsDir,
ReportJSONPath: pt.ReportJSONPath,
ReportHTMLPath: pt.ReportHTMLPath,
params: pt.Params,
}
q.assignTaskLogPathLocked(t)
if t.Status == TaskRunning {
// The task was interrupted by a bee-web restart. Child processes
// (e.g. bee-gpu-burn-worker) survive the restart in their own
// process groups and cannot be cancelled retroactively. Mark the
// task as failed so the user can decide whether to re-run it
// rather than blindly re-launching duplicate workers.
now := time.Now()
t.Status = TaskFailed
t.DoneAt = &now
t.ErrMsg = "interrupted by bee-web restart"
} else if t.Status == TaskPending {
t.StartedAt = nil
t.DoneAt = nil
t.ErrMsg = ""
}
q.tasks = append(q.tasks, t)
}
q.prune()
q.persistLocked()
}
func (q *taskQueue) persistLocked() {
if q.statePath == "" {
return
}
state := make([]persistedTask, 0, len(q.tasks))
for _, t := range q.tasks {
state = append(state, persistedTask{
ID: t.ID,
Name: t.Name,
Target: t.Target,
Priority: t.Priority,
Status: t.Status,
CreatedAt: t.CreatedAt,
StartedAt: t.StartedAt,
DoneAt: t.DoneAt,
ErrMsg: t.ErrMsg,
LogPath: t.LogPath,
ArtifactsDir: t.ArtifactsDir,
ReportJSONPath: t.ReportJSONPath,
ReportHTMLPath: t.ReportHTMLPath,
Params: t.params,
})
}
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
return
}
tmp := q.statePath + ".tmp"
if err := os.WriteFile(tmp, data, 0644); err != nil {
return
}
_ = os.Rename(tmp, q.statePath)
}
func taskElapsedSec(t *Task, now time.Time) int {
if t == nil || t.StartedAt == nil || t.StartedAt.IsZero() {
return 0
}
start := *t.StartedAt
if !t.CreatedAt.IsZero() && start.Before(t.CreatedAt) {
start = t.CreatedAt
}
end := now
if t.DoneAt != nil && !t.DoneAt.IsZero() {
end = *t.DoneAt
}
if end.Before(start) {
return 0
}
return int(end.Sub(start).Round(time.Second) / time.Second)
}
func taskFolderStatus(status string) string {
status = strings.TrimSpace(strings.ToLower(status))
switch status {
case TaskRunning, TaskDone, TaskFailed, TaskCancelled:
return status
default:
return TaskPending
}
}
func sanitizeTaskFolderPart(s string) string {
s = strings.TrimSpace(strings.ToLower(s))
if s == "" {
return "task"
}
var b strings.Builder
lastDash := false
for _, r := range s {
isAlnum := (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9')
if isAlnum {
b.WriteRune(r)
lastDash = false
continue
}
if !lastDash {
b.WriteByte('-')
lastDash = true
}
}
out := strings.Trim(b.String(), "-")
if out == "" {
return "task"
}
return out
}
func taskArtifactsDir(root string, t *Task, status string) string {
if strings.TrimSpace(root) == "" || t == nil {
return ""
}
prefix := taskFolderNumberPrefix(t.ID)
return filepath.Join(root, fmt.Sprintf("%s_%s_%s", prefix, sanitizeTaskFolderPart(t.Name), taskFolderStatus(status)))
}
func taskFolderNumberPrefix(taskID string) string {
taskID = strings.TrimSpace(taskID)
if strings.HasPrefix(taskID, "TASK-") && len(taskID) >= len("TASK-000") {
num := strings.TrimSpace(strings.TrimPrefix(taskID, "TASK-"))
if len(num) == 3 {
allDigits := true
for _, r := range num {
if r < '0' || r > '9' {
allDigits = false
break
}
}
if allDigits {
return num
}
}
}
fallback := sanitizeTaskFolderPart(taskID)
if fallback == "" {
return "000"
}
return fallback
}
func ensureTaskReportPaths(t *Task) {
if t == nil || strings.TrimSpace(t.ArtifactsDir) == "" {
return
}
if t.LogPath == "" || filepath.Base(t.LogPath) == "task.log" {
t.LogPath = filepath.Join(t.ArtifactsDir, "task.log")
}
t.ReportJSONPath = filepath.Join(t.ArtifactsDir, "report.json")
t.ReportHTMLPath = filepath.Join(t.ArtifactsDir, "report.html")
}
func (q *taskQueue) ensureTaskArtifactPathsLocked(t *Task) {
if t == nil || strings.TrimSpace(q.logsDir) == "" || strings.TrimSpace(t.ID) == "" {
return
}
if strings.TrimSpace(t.ArtifactsDir) == "" {
t.ArtifactsDir = taskArtifactsDir(q.logsDir, t, t.Status)
}
if t.ArtifactsDir != "" {
_ = os.MkdirAll(t.ArtifactsDir, 0755)
}
ensureTaskReportPaths(t)
}
func (q *taskQueue) finalizeTaskArtifactPathsLocked(t *Task) {
if t == nil || strings.TrimSpace(q.logsDir) == "" || strings.TrimSpace(t.ID) == "" {
return
}
q.ensureTaskArtifactPathsLocked(t)
dstDir := taskArtifactsDir(q.logsDir, t, t.Status)
if dstDir == "" {
return
}
if t.ArtifactsDir != "" && t.ArtifactsDir != dstDir {
if _, err := os.Stat(dstDir); err != nil {
_ = os.Rename(t.ArtifactsDir, dstDir)
}
t.ArtifactsDir = dstDir
}
ensureTaskReportPaths(t)
}