package webui import ( "context" "encoding/json" "fmt" "net/http" "os" "path/filepath" "sort" "sync" "time" "bee/audit/internal/app" ) // Task statuses. const ( TaskPending = "pending" TaskRunning = "running" TaskDone = "done" TaskFailed = "failed" TaskCancelled = "cancelled" ) // taskNames maps target → human-readable name. var taskNames = map[string]string{ "nvidia": "NVIDIA SAT", "memory": "Memory SAT", "storage": "Storage SAT", "cpu": "CPU SAT", "amd": "AMD GPU SAT", "amd-stress": "AMD GPU Burn-in", "memory-stress": "Memory Burn-in", "sat-stress": "SAT Stress (stressapptest)", "audit": "Audit", "install": "Install to Disk", "install-to-ram": "Install to RAM", } // Task represents one unit of work in the queue. type Task struct { ID string `json:"id"` Name string `json:"name"` Target string `json:"target"` Priority int `json:"priority"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` DoneAt *time.Time `json:"done_at,omitempty"` ErrMsg string `json:"error,omitempty"` LogPath string `json:"log_path,omitempty"` // runtime fields (not serialised) job *jobState params taskParams } // taskParams holds optional parameters parsed from the run request. type taskParams struct { Duration int `json:"duration,omitempty"` DiagLevel int `json:"diag_level,omitempty"` GPUIndices []int `json:"gpu_indices,omitempty"` BurnProfile string `json:"burn_profile,omitempty"` DisplayName string `json:"display_name,omitempty"` Device string `json:"device,omitempty"` // for install } type persistedTask struct { ID string `json:"id"` Name string `json:"name"` Target string `json:"target"` Priority int `json:"priority"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` DoneAt *time.Time `json:"done_at,omitempty"` ErrMsg string `json:"error,omitempty"` LogPath string `json:"log_path,omitempty"` Params taskParams `json:"params,omitempty"` } type burnPreset struct { NvidiaDiag int DurationSec int } func resolveBurnPreset(profile string) burnPreset { switch profile { case "overnight": return burnPreset{NvidiaDiag: 4, DurationSec: 8 * 60 * 60} case "acceptance": return burnPreset{NvidiaDiag: 3, DurationSec: 60 * 60} default: return burnPreset{NvidiaDiag: 1, DurationSec: 5 * 60} } } // taskQueue manages a priority-ordered list of tasks and runs them one at a time. type taskQueue struct { mu sync.Mutex tasks []*Task trigger chan struct{} opts *HandlerOptions // set by startWorker statePath string logsDir string started bool } var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)} const maxTaskHistory = 50 var ( runMemoryAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunMemoryAcceptancePackCtx(ctx, baseDir, logFunc) } runStorageAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunStorageAcceptancePackCtx(ctx, baseDir, logFunc) } runCPUAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunCPUAcceptancePackCtx(ctx, baseDir, durationSec, logFunc) } runAMDAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunAMDAcceptancePackCtx(ctx, baseDir, logFunc) } runAMDStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunAMDStressPackCtx(ctx, baseDir, durationSec, logFunc) } runMemoryStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunMemoryStressPackCtx(ctx, baseDir, durationSec, logFunc) } runSATStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunSATStressPackCtx(ctx, baseDir, durationSec, logFunc) } ) // enqueue adds a task to the queue and notifies the worker. func (q *taskQueue) enqueue(t *Task) { q.mu.Lock() q.assignTaskLogPathLocked(t) q.tasks = append(q.tasks, t) q.prune() q.persistLocked() q.mu.Unlock() select { case q.trigger <- struct{}{}: default: } } // prune removes oldest completed tasks beyond maxTaskHistory. func (q *taskQueue) prune() { var done []*Task var active []*Task for _, t := range q.tasks { switch t.Status { case TaskDone, TaskFailed, TaskCancelled: done = append(done, t) default: active = append(active, t) } } if len(done) > maxTaskHistory { done = done[len(done)-maxTaskHistory:] } q.tasks = append(active, done...) } // nextPending returns the highest-priority pending task (nil if none). func (q *taskQueue) nextPending() *Task { var best *Task for _, t := range q.tasks { if t.Status != TaskPending { continue } if best == nil || t.Priority > best.Priority || (t.Priority == best.Priority && t.CreatedAt.Before(best.CreatedAt)) { best = t } } return best } // findByID looks up a task by ID. func (q *taskQueue) findByID(id string) (*Task, bool) { q.mu.Lock() defer q.mu.Unlock() for _, t := range q.tasks { if t.ID == id { return t, true } } return nil, false } // findJob returns the jobState for a task ID (for SSE streaming compatibility). func (q *taskQueue) findJob(id string) (*jobState, bool) { t, ok := q.findByID(id) if !ok || t.job == nil { return nil, false } return t.job, true } func (q *taskQueue) hasActiveTarget(target string) bool { q.mu.Lock() defer q.mu.Unlock() for _, t := range q.tasks { if t.Target != target { continue } if t.Status == TaskPending || t.Status == TaskRunning { return true } } return false } // snapshot returns a copy of all tasks sorted for display (running first, then pending by priority, then done by doneAt desc). func (q *taskQueue) snapshot() []Task { q.mu.Lock() defer q.mu.Unlock() out := make([]Task, len(q.tasks)) for i, t := range q.tasks { out[i] = *t } sort.SliceStable(out, func(i, j int) bool { si := statusOrder(out[i].Status) sj := statusOrder(out[j].Status) if si != sj { return si < sj } if out[i].Priority != out[j].Priority { return out[i].Priority > out[j].Priority } return out[i].CreatedAt.Before(out[j].CreatedAt) }) return out } func statusOrder(s string) int { switch s { case TaskRunning: return 0 case TaskPending: return 1 default: return 2 } } // startWorker launches the queue runner goroutine. func (q *taskQueue) startWorker(opts *HandlerOptions) { q.mu.Lock() q.opts = opts q.statePath = filepath.Join(opts.ExportDir, "tasks-state.json") q.logsDir = filepath.Join(opts.ExportDir, "tasks") _ = os.MkdirAll(q.logsDir, 0755) if !q.started { q.loadLocked() q.started = true go q.worker() } hasPending := q.nextPending() != nil q.mu.Unlock() if hasPending { select { case q.trigger <- struct{}{}: default: } } } func (q *taskQueue) worker() { for { <-q.trigger setCPUGovernor("performance") for { q.mu.Lock() t := q.nextPending() if t == nil { q.mu.Unlock() break } now := time.Now() t.Status = TaskRunning t.StartedAt = &now t.DoneAt = nil t.ErrMsg = "" j := newTaskJobState(t.LogPath) ctx, cancel := context.WithCancel(context.Background()) j.cancel = cancel t.job = j q.persistLocked() q.mu.Unlock() q.runTask(t, j, ctx) q.mu.Lock() now2 := time.Now() t.DoneAt = &now2 if t.Status == TaskRunning { // not cancelled externally if j.err != "" { t.Status = TaskFailed t.ErrMsg = j.err } else { t.Status = TaskDone } } q.prune() q.persistLocked() q.mu.Unlock() } setCPUGovernor("powersave") } } // setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files. // Silently ignores errors (e.g. when cpufreq is not available). func setCPUGovernor(governor string) { matches, err := filepath.Glob("/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor") if err != nil || len(matches) == 0 { return } for _, path := range matches { _ = os.WriteFile(path, []byte(governor), 0644) } } // runTask executes the work for a task, writing output to j. func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) { if q.opts == nil || q.opts.App == nil { j.append("ERROR: app not configured") j.finish("app not configured") return } a := q.opts.App j.append(fmt.Sprintf("Starting %s...", t.Name)) if len(j.lines) > 0 { j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339))) } var ( archive string err error ) switch t.Target { case "nvidia": diagLevel := t.params.DiagLevel if t.params.BurnProfile != "" && diagLevel <= 0 { diagLevel = resolveBurnPreset(t.params.BurnProfile).NvidiaDiag } if len(t.params.GPUIndices) > 0 || diagLevel > 0 { result, e := a.RunNvidiaAcceptancePackWithOptions( ctx, "", diagLevel, t.params.GPUIndices, j.append, ) if e != nil { err = e } else { archive = result.Body } } else { archive, err = a.RunNvidiaAcceptancePack("", j.append) } case "memory": archive, err = runMemoryAcceptancePackCtx(a, ctx, "", j.append) case "storage": archive, err = runStorageAcceptancePackCtx(a, ctx, "", j.append) case "cpu": dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } if dur <= 0 { dur = 60 } archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append) case "amd": archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append) case "amd-stress": dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append) case "memory-stress": dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append) case "sat-stress": dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append) case "audit": result, e := a.RunAuditNow(q.opts.RuntimeMode) if e != nil { err = e } else { for _, line := range splitLines(result.Body) { j.append(line) } } case "install-to-ram": err = a.RunInstallToRAM(ctx, j.append) default: j.append("ERROR: unknown target: " + t.Target) j.finish("unknown target") return } if err != nil { if ctx.Err() != nil { j.append("Aborted.") j.finish("aborted") } else { j.append("ERROR: " + err.Error()) j.finish(err.Error()) } return } if archive != "" { j.append("Archive: " + archive) } j.finish("") } func splitLines(s string) []string { var out []string for _, l := range splitNL(s) { if l != "" { out = append(out, l) } } return out } func splitNL(s string) []string { var out []string start := 0 for i, c := range s { if c == '\n' { out = append(out, s[start:i]) start = i + 1 } } out = append(out, s[start:]) return out } // ── HTTP handlers ───────────────────────────────────────────────────────────── func (h *handler) handleAPITasksList(w http.ResponseWriter, _ *http.Request) { tasks := globalQueue.snapshot() writeJSON(w, tasks) } func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") t, ok := globalQueue.findByID(id) if !ok { writeError(w, http.StatusNotFound, "task not found") return } globalQueue.mu.Lock() defer globalQueue.mu.Unlock() switch t.Status { case TaskPending: t.Status = TaskCancelled now := time.Now() t.DoneAt = &now globalQueue.persistLocked() writeJSON(w, map[string]string{"status": "cancelled"}) case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled now := time.Now() t.DoneAt = &now globalQueue.persistLocked() writeJSON(w, map[string]string{"status": "cancelled"}) default: writeError(w, http.StatusConflict, "task is not running or pending") } } func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") t, ok := globalQueue.findByID(id) if !ok { writeError(w, http.StatusNotFound, "task not found") return } var req struct { Delta int `json:"delta"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid body") return } globalQueue.mu.Lock() defer globalQueue.mu.Unlock() if t.Status != TaskPending { writeError(w, http.StatusConflict, "only pending tasks can be reprioritised") return } t.Priority += req.Delta globalQueue.persistLocked() writeJSON(w, map[string]int{"priority": t.Priority}) } func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request) { globalQueue.mu.Lock() now := time.Now() n := 0 for _, t := range globalQueue.tasks { switch t.Status { case TaskPending: t.Status = TaskCancelled t.DoneAt = &now n++ case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled t.DoneAt = &now n++ } } globalQueue.persistLocked() globalQueue.mu.Unlock() writeJSON(w, map[string]int{"cancelled": n}) } func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Wait up to 5s for the task to get a job (it may be pending) deadline := time.Now().Add(5 * time.Second) var j *jobState for time.Now().Before(deadline) { if jj, ok := globalQueue.findJob(id); ok { j = jj break } time.Sleep(200 * time.Millisecond) } if j == nil { http.Error(w, "task not found or not yet started", http.StatusNotFound) return } streamJob(w, r, j) } func (q *taskQueue) assignTaskLogPathLocked(t *Task) { if t.LogPath != "" || q.logsDir == "" || t.ID == "" { return } t.LogPath = filepath.Join(q.logsDir, t.ID+".log") } func (q *taskQueue) loadLocked() { if q.statePath == "" { return } data, err := os.ReadFile(q.statePath) if err != nil || len(data) == 0 { return } var persisted []persistedTask if err := json.Unmarshal(data, &persisted); err != nil { return } for _, pt := range persisted { t := &Task{ ID: pt.ID, Name: pt.Name, Target: pt.Target, Priority: pt.Priority, Status: pt.Status, CreatedAt: pt.CreatedAt, StartedAt: pt.StartedAt, DoneAt: pt.DoneAt, ErrMsg: pt.ErrMsg, LogPath: pt.LogPath, params: pt.Params, } q.assignTaskLogPathLocked(t) if t.Status == TaskPending || t.Status == TaskRunning { t.Status = TaskPending t.DoneAt = nil t.ErrMsg = "" } q.tasks = append(q.tasks, t) } q.prune() q.persistLocked() } func (q *taskQueue) persistLocked() { if q.statePath == "" { return } state := make([]persistedTask, 0, len(q.tasks)) for _, t := range q.tasks { state = append(state, persistedTask{ ID: t.ID, Name: t.Name, Target: t.Target, Priority: t.Priority, Status: t.Status, CreatedAt: t.CreatedAt, StartedAt: t.StartedAt, DoneAt: t.DoneAt, ErrMsg: t.ErrMsg, LogPath: t.LogPath, Params: t.params, }) } data, err := json.MarshalIndent(state, "", " ") if err != nil { return } tmp := q.statePath + ".tmp" if err := os.WriteFile(tmp, data, 0644); err != nil { return } _ = os.Rename(tmp, q.statePath) }