package webui import ( "context" "encoding/json" "fmt" "net/http" "sort" "sync" "time" ) // Task statuses. const ( TaskPending = "pending" TaskRunning = "running" TaskDone = "done" TaskFailed = "failed" TaskCancelled = "cancelled" ) // taskNames maps target → human-readable name. var taskNames = map[string]string{ "nvidia": "NVIDIA SAT", "memory": "Memory SAT", "storage": "Storage SAT", "cpu": "CPU SAT", "amd": "AMD GPU SAT", "amd-stress": "AMD GPU Burn-in", "memory-stress": "Memory Burn-in", "sat-stress": "SAT Stress (stressapptest)", "audit": "Audit", "install": "Install to Disk", "install-to-ram": "Install to RAM", } // Task represents one unit of work in the queue. type Task struct { ID string `json:"id"` Name string `json:"name"` Target string `json:"target"` Priority int `json:"priority"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` DoneAt *time.Time `json:"done_at,omitempty"` ErrMsg string `json:"error,omitempty"` // runtime fields (not serialised) job *jobState params taskParams } // taskParams holds optional parameters parsed from the run request. type taskParams struct { Duration int DiagLevel int GPUIndices []int Device string // for install } // taskQueue manages a priority-ordered list of tasks and runs them one at a time. type taskQueue struct { mu sync.Mutex tasks []*Task trigger chan struct{} opts *HandlerOptions // set by startWorker } var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)} const maxTaskHistory = 50 // enqueue adds a task to the queue and notifies the worker. func (q *taskQueue) enqueue(t *Task) { q.mu.Lock() q.tasks = append(q.tasks, t) q.prune() q.mu.Unlock() select { case q.trigger <- struct{}{}: default: } } // prune removes oldest completed tasks beyond maxTaskHistory. func (q *taskQueue) prune() { var done []*Task var active []*Task for _, t := range q.tasks { switch t.Status { case TaskDone, TaskFailed, TaskCancelled: done = append(done, t) default: active = append(active, t) } } if len(done) > maxTaskHistory { done = done[len(done)-maxTaskHistory:] } q.tasks = append(active, done...) } // nextPending returns the highest-priority pending task (nil if none). func (q *taskQueue) nextPending() *Task { var best *Task for _, t := range q.tasks { if t.Status != TaskPending { continue } if best == nil || t.Priority > best.Priority || (t.Priority == best.Priority && t.CreatedAt.Before(best.CreatedAt)) { best = t } } return best } // findByID looks up a task by ID. func (q *taskQueue) findByID(id string) (*Task, bool) { q.mu.Lock() defer q.mu.Unlock() for _, t := range q.tasks { if t.ID == id { return t, true } } return nil, false } // findJob returns the jobState for a task ID (for SSE streaming compatibility). func (q *taskQueue) findJob(id string) (*jobState, bool) { t, ok := q.findByID(id) if !ok || t.job == nil { return nil, false } return t.job, true } // snapshot returns a copy of all tasks sorted for display (running first, then pending by priority, then done by doneAt desc). func (q *taskQueue) snapshot() []Task { q.mu.Lock() defer q.mu.Unlock() out := make([]Task, len(q.tasks)) for i, t := range q.tasks { out[i] = *t } sort.SliceStable(out, func(i, j int) bool { si := statusOrder(out[i].Status) sj := statusOrder(out[j].Status) if si != sj { return si < sj } if out[i].Priority != out[j].Priority { return out[i].Priority > out[j].Priority } return out[i].CreatedAt.Before(out[j].CreatedAt) }) return out } func statusOrder(s string) int { switch s { case TaskRunning: return 0 case TaskPending: return 1 default: return 2 } } // startWorker launches the queue runner goroutine. func (q *taskQueue) startWorker(opts *HandlerOptions) { q.opts = opts go q.worker() } func (q *taskQueue) worker() { for { <-q.trigger for { q.mu.Lock() t := q.nextPending() if t == nil { q.mu.Unlock() break } now := time.Now() t.Status = TaskRunning t.StartedAt = &now j := &jobState{} ctx, cancel := context.WithCancel(context.Background()) j.cancel = cancel t.job = j q.mu.Unlock() q.runTask(t, j, ctx) q.mu.Lock() now2 := time.Now() t.DoneAt = &now2 if t.Status == TaskRunning { // not cancelled externally if j.err != "" { t.Status = TaskFailed t.ErrMsg = j.err } else { t.Status = TaskDone } } q.prune() q.mu.Unlock() } } } // runTask executes the work for a task, writing output to j. func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) { if q.opts == nil || q.opts.App == nil { j.append("ERROR: app not configured") j.finish("app not configured") return } a := q.opts.App j.append(fmt.Sprintf("Starting %s...", t.Name)) var ( archive string err error ) switch t.Target { case "nvidia": if len(t.params.GPUIndices) > 0 || t.params.DiagLevel > 0 { result, e := a.RunNvidiaAcceptancePackWithOptions( ctx, "", t.params.DiagLevel, t.params.GPUIndices, j.append, ) if e != nil { err = e } else { archive = result.Body } } else { archive, err = a.RunNvidiaAcceptancePack("", j.append) } case "memory": archive, err = a.RunMemoryAcceptancePack("", j.append) case "storage": archive, err = a.RunStorageAcceptancePack("", j.append) case "cpu": dur := t.params.Duration if dur <= 0 { dur = 60 } archive, err = a.RunCPUAcceptancePack("", dur, j.append) case "amd": archive, err = a.RunAMDAcceptancePack("", j.append) case "amd-stress": archive, err = a.RunAMDStressPack("", j.append) case "memory-stress": archive, err = a.RunMemoryStressPack("", j.append) case "sat-stress": archive, err = a.RunSATStressPack("", j.append) case "audit": result, e := a.RunAuditNow(q.opts.RuntimeMode) if e != nil { err = e } else { for _, line := range splitLines(result.Body) { j.append(line) } } case "install-to-ram": err = a.RunInstallToRAM(j.append) default: j.append("ERROR: unknown target: " + t.Target) j.finish("unknown target") return } if err != nil { if ctx.Err() != nil { j.append("Aborted.") j.finish("aborted") } else { j.append("ERROR: " + err.Error()) j.finish(err.Error()) } return } if archive != "" { j.append("Archive: " + archive) } j.finish("") } func splitLines(s string) []string { var out []string for _, l := range splitNL(s) { if l != "" { out = append(out, l) } } return out } func splitNL(s string) []string { var out []string start := 0 for i, c := range s { if c == '\n' { out = append(out, s[start:i]) start = i + 1 } } out = append(out, s[start:]) return out } // ── HTTP handlers ───────────────────────────────────────────────────────────── func (h *handler) handleAPITasksList(w http.ResponseWriter, _ *http.Request) { tasks := globalQueue.snapshot() writeJSON(w, tasks) } func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") t, ok := globalQueue.findByID(id) if !ok { writeError(w, http.StatusNotFound, "task not found") return } globalQueue.mu.Lock() defer globalQueue.mu.Unlock() switch t.Status { case TaskPending: t.Status = TaskCancelled now := time.Now() t.DoneAt = &now writeJSON(w, map[string]string{"status": "cancelled"}) case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled now := time.Now() t.DoneAt = &now writeJSON(w, map[string]string{"status": "cancelled"}) default: writeError(w, http.StatusConflict, "task is not running or pending") } } func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") t, ok := globalQueue.findByID(id) if !ok { writeError(w, http.StatusNotFound, "task not found") return } var req struct { Delta int `json:"delta"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid body") return } globalQueue.mu.Lock() defer globalQueue.mu.Unlock() if t.Status != TaskPending { writeError(w, http.StatusConflict, "only pending tasks can be reprioritised") return } t.Priority += req.Delta writeJSON(w, map[string]int{"priority": t.Priority}) } func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request) { globalQueue.mu.Lock() now := time.Now() n := 0 for _, t := range globalQueue.tasks { switch t.Status { case TaskPending: t.Status = TaskCancelled t.DoneAt = &now n++ case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled t.DoneAt = &now n++ } } globalQueue.mu.Unlock() writeJSON(w, map[string]int{"cancelled": n}) } func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") // Wait up to 5s for the task to get a job (it may be pending) deadline := time.Now().Add(5 * time.Second) var j *jobState for time.Now().Before(deadline) { if jj, ok := globalQueue.findJob(id); ok { j = jj break } time.Sleep(200 * time.Millisecond) } if j == nil { http.Error(w, "task not found or not yet started", http.StatusNotFound) return } streamJob(w, r, j) }