649 lines
16 KiB
Go
649 lines
16 KiB
Go
package webui
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"bee/audit/internal/app"
|
|
)
|
|
|
|
// Task statuses.
|
|
const (
|
|
TaskPending = "pending"
|
|
TaskRunning = "running"
|
|
TaskDone = "done"
|
|
TaskFailed = "failed"
|
|
TaskCancelled = "cancelled"
|
|
)
|
|
|
|
// taskNames maps target → human-readable name.
|
|
var taskNames = map[string]string{
|
|
"nvidia": "NVIDIA SAT",
|
|
"memory": "Memory SAT",
|
|
"storage": "Storage SAT",
|
|
"cpu": "CPU SAT",
|
|
"amd": "AMD GPU SAT",
|
|
"amd-mem": "AMD GPU MEM Integrity",
|
|
"amd-bandwidth": "AMD GPU MEM Bandwidth",
|
|
"amd-stress": "AMD GPU Burn-in",
|
|
"memory-stress": "Memory Burn-in",
|
|
"sat-stress": "SAT Stress (stressapptest)",
|
|
"audit": "Audit",
|
|
"install": "Install to Disk",
|
|
"install-to-ram": "Install to RAM",
|
|
}
|
|
|
|
// Task represents one unit of work in the queue.
|
|
type Task struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Target string `json:"target"`
|
|
Priority int `json:"priority"`
|
|
Status string `json:"status"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
|
DoneAt *time.Time `json:"done_at,omitempty"`
|
|
ErrMsg string `json:"error,omitempty"`
|
|
LogPath string `json:"log_path,omitempty"`
|
|
|
|
// runtime fields (not serialised)
|
|
job *jobState
|
|
params taskParams
|
|
}
|
|
|
|
// taskParams holds optional parameters parsed from the run request.
|
|
type taskParams struct {
|
|
Duration int `json:"duration,omitempty"`
|
|
DiagLevel int `json:"diag_level,omitempty"`
|
|
GPUIndices []int `json:"gpu_indices,omitempty"`
|
|
BurnProfile string `json:"burn_profile,omitempty"`
|
|
DisplayName string `json:"display_name,omitempty"`
|
|
Device string `json:"device,omitempty"` // for install
|
|
}
|
|
|
|
type persistedTask struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Target string `json:"target"`
|
|
Priority int `json:"priority"`
|
|
Status string `json:"status"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
|
DoneAt *time.Time `json:"done_at,omitempty"`
|
|
ErrMsg string `json:"error,omitempty"`
|
|
LogPath string `json:"log_path,omitempty"`
|
|
Params taskParams `json:"params,omitempty"`
|
|
}
|
|
|
|
type burnPreset struct {
|
|
NvidiaDiag int
|
|
DurationSec int
|
|
}
|
|
|
|
func resolveBurnPreset(profile string) burnPreset {
|
|
switch profile {
|
|
case "overnight":
|
|
return burnPreset{NvidiaDiag: 4, DurationSec: 8 * 60 * 60}
|
|
case "acceptance":
|
|
return burnPreset{NvidiaDiag: 3, DurationSec: 60 * 60}
|
|
default:
|
|
return burnPreset{NvidiaDiag: 1, DurationSec: 5 * 60}
|
|
}
|
|
}
|
|
|
|
// taskQueue manages a priority-ordered list of tasks and runs them one at a time.
|
|
type taskQueue struct {
|
|
mu sync.Mutex
|
|
tasks []*Task
|
|
trigger chan struct{}
|
|
opts *HandlerOptions // set by startWorker
|
|
statePath string
|
|
logsDir string
|
|
started bool
|
|
}
|
|
|
|
var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)}
|
|
|
|
const maxTaskHistory = 50
|
|
|
|
var (
|
|
runMemoryAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunMemoryAcceptancePackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runStorageAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunStorageAcceptancePackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runCPUAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunCPUAcceptancePackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
runAMDAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunAMDAcceptancePackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runAMDMemIntegrityPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunAMDMemIntegrityPackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runAMDMemBandwidthPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunAMDMemBandwidthPackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runAMDStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunAMDStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
runMemoryStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunMemoryStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
runSATStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunSATStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
)
|
|
|
|
// enqueue adds a task to the queue and notifies the worker.
|
|
func (q *taskQueue) enqueue(t *Task) {
|
|
q.mu.Lock()
|
|
q.assignTaskLogPathLocked(t)
|
|
q.tasks = append(q.tasks, t)
|
|
q.prune()
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
select {
|
|
case q.trigger <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// prune removes oldest completed tasks beyond maxTaskHistory.
|
|
func (q *taskQueue) prune() {
|
|
var done []*Task
|
|
var active []*Task
|
|
for _, t := range q.tasks {
|
|
switch t.Status {
|
|
case TaskDone, TaskFailed, TaskCancelled:
|
|
done = append(done, t)
|
|
default:
|
|
active = append(active, t)
|
|
}
|
|
}
|
|
if len(done) > maxTaskHistory {
|
|
done = done[len(done)-maxTaskHistory:]
|
|
}
|
|
q.tasks = append(active, done...)
|
|
}
|
|
|
|
// nextPending returns the highest-priority pending task (nil if none).
|
|
func (q *taskQueue) nextPending() *Task {
|
|
var best *Task
|
|
for _, t := range q.tasks {
|
|
if t.Status != TaskPending {
|
|
continue
|
|
}
|
|
if best == nil || t.Priority > best.Priority ||
|
|
(t.Priority == best.Priority && t.CreatedAt.Before(best.CreatedAt)) {
|
|
best = t
|
|
}
|
|
}
|
|
return best
|
|
}
|
|
|
|
// findByID looks up a task by ID.
|
|
func (q *taskQueue) findByID(id string) (*Task, bool) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
for _, t := range q.tasks {
|
|
if t.ID == id {
|
|
return t, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// findJob returns the jobState for a task ID (for SSE streaming compatibility).
|
|
func (q *taskQueue) findJob(id string) (*jobState, bool) {
|
|
t, ok := q.findByID(id)
|
|
if !ok || t.job == nil {
|
|
return nil, false
|
|
}
|
|
return t.job, true
|
|
}
|
|
|
|
func (q *taskQueue) hasActiveTarget(target string) bool {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
for _, t := range q.tasks {
|
|
if t.Target != target {
|
|
continue
|
|
}
|
|
if t.Status == TaskPending || t.Status == TaskRunning {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// snapshot returns a copy of all tasks sorted for display (running first, then pending by priority, then done by doneAt desc).
|
|
func (q *taskQueue) snapshot() []Task {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
out := make([]Task, len(q.tasks))
|
|
for i, t := range q.tasks {
|
|
out[i] = *t
|
|
}
|
|
sort.SliceStable(out, func(i, j int) bool {
|
|
si := statusOrder(out[i].Status)
|
|
sj := statusOrder(out[j].Status)
|
|
if si != sj {
|
|
return si < sj
|
|
}
|
|
if out[i].Priority != out[j].Priority {
|
|
return out[i].Priority > out[j].Priority
|
|
}
|
|
return out[i].CreatedAt.Before(out[j].CreatedAt)
|
|
})
|
|
return out
|
|
}
|
|
|
|
func statusOrder(s string) int {
|
|
switch s {
|
|
case TaskRunning:
|
|
return 0
|
|
case TaskPending:
|
|
return 1
|
|
default:
|
|
return 2
|
|
}
|
|
}
|
|
|
|
// startWorker launches the queue runner goroutine.
|
|
func (q *taskQueue) startWorker(opts *HandlerOptions) {
|
|
q.mu.Lock()
|
|
q.opts = opts
|
|
q.statePath = filepath.Join(opts.ExportDir, "tasks-state.json")
|
|
q.logsDir = filepath.Join(opts.ExportDir, "tasks")
|
|
_ = os.MkdirAll(q.logsDir, 0755)
|
|
if !q.started {
|
|
q.loadLocked()
|
|
q.started = true
|
|
go q.worker()
|
|
}
|
|
hasPending := q.nextPending() != nil
|
|
q.mu.Unlock()
|
|
if hasPending {
|
|
select {
|
|
case q.trigger <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) worker() {
|
|
for {
|
|
<-q.trigger
|
|
setCPUGovernor("performance")
|
|
for {
|
|
q.mu.Lock()
|
|
t := q.nextPending()
|
|
if t == nil {
|
|
q.mu.Unlock()
|
|
break
|
|
}
|
|
now := time.Now()
|
|
t.Status = TaskRunning
|
|
t.StartedAt = &now
|
|
t.DoneAt = nil
|
|
t.ErrMsg = ""
|
|
j := newTaskJobState(t.LogPath)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
j.cancel = cancel
|
|
t.job = j
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
|
|
q.runTask(t, j, ctx)
|
|
|
|
q.mu.Lock()
|
|
now2 := time.Now()
|
|
t.DoneAt = &now2
|
|
if t.Status == TaskRunning { // not cancelled externally
|
|
if j.err != "" {
|
|
t.Status = TaskFailed
|
|
t.ErrMsg = j.err
|
|
} else {
|
|
t.Status = TaskDone
|
|
}
|
|
}
|
|
q.prune()
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
}
|
|
setCPUGovernor("powersave")
|
|
}
|
|
}
|
|
|
|
// setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files.
|
|
// Silently ignores errors (e.g. when cpufreq is not available).
|
|
func setCPUGovernor(governor string) {
|
|
matches, err := filepath.Glob("/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor")
|
|
if err != nil || len(matches) == 0 {
|
|
return
|
|
}
|
|
for _, path := range matches {
|
|
_ = os.WriteFile(path, []byte(governor), 0644)
|
|
}
|
|
}
|
|
|
|
// runTask executes the work for a task, writing output to j.
|
|
func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
|
|
if q.opts == nil || q.opts.App == nil {
|
|
j.append("ERROR: app not configured")
|
|
j.finish("app not configured")
|
|
return
|
|
}
|
|
a := q.opts.App
|
|
|
|
j.append(fmt.Sprintf("Starting %s...", t.Name))
|
|
if len(j.lines) > 0 {
|
|
j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339)))
|
|
}
|
|
|
|
var (
|
|
archive string
|
|
err error
|
|
)
|
|
|
|
switch t.Target {
|
|
case "nvidia":
|
|
diagLevel := t.params.DiagLevel
|
|
if t.params.BurnProfile != "" && diagLevel <= 0 {
|
|
diagLevel = resolveBurnPreset(t.params.BurnProfile).NvidiaDiag
|
|
}
|
|
if len(t.params.GPUIndices) > 0 || diagLevel > 0 {
|
|
result, e := a.RunNvidiaAcceptancePackWithOptions(
|
|
ctx, "", diagLevel, t.params.GPUIndices, j.append,
|
|
)
|
|
if e != nil {
|
|
err = e
|
|
} else {
|
|
archive = result.Body
|
|
}
|
|
} else {
|
|
archive, err = a.RunNvidiaAcceptancePack("", j.append)
|
|
}
|
|
case "memory":
|
|
archive, err = runMemoryAcceptancePackCtx(a, ctx, "", j.append)
|
|
case "storage":
|
|
archive, err = runStorageAcceptancePackCtx(a, ctx, "", j.append)
|
|
case "cpu":
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
if dur <= 0 {
|
|
dur = 60
|
|
}
|
|
archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append)
|
|
case "amd":
|
|
archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append)
|
|
case "amd-mem":
|
|
archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append)
|
|
case "amd-bandwidth":
|
|
archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append)
|
|
case "amd-stress":
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append)
|
|
case "memory-stress":
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append)
|
|
case "sat-stress":
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append)
|
|
case "audit":
|
|
result, e := a.RunAuditNow(q.opts.RuntimeMode)
|
|
if e != nil {
|
|
err = e
|
|
} else {
|
|
for _, line := range splitLines(result.Body) {
|
|
j.append(line)
|
|
}
|
|
}
|
|
case "install-to-ram":
|
|
err = a.RunInstallToRAM(ctx, j.append)
|
|
default:
|
|
j.append("ERROR: unknown target: " + t.Target)
|
|
j.finish("unknown target")
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
j.append("Aborted.")
|
|
j.finish("aborted")
|
|
} else {
|
|
j.append("ERROR: " + err.Error())
|
|
j.finish(err.Error())
|
|
}
|
|
return
|
|
}
|
|
if archive != "" {
|
|
j.append("Archive: " + archive)
|
|
}
|
|
j.finish("")
|
|
}
|
|
|
|
func splitLines(s string) []string {
|
|
var out []string
|
|
for _, l := range splitNL(s) {
|
|
if l != "" {
|
|
out = append(out, l)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func splitNL(s string) []string {
|
|
var out []string
|
|
start := 0
|
|
for i, c := range s {
|
|
if c == '\n' {
|
|
out = append(out, s[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
out = append(out, s[start:])
|
|
return out
|
|
}
|
|
|
|
// ── HTTP handlers ─────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPITasksList(w http.ResponseWriter, _ *http.Request) {
|
|
tasks := globalQueue.snapshot()
|
|
writeJSON(w, tasks)
|
|
}
|
|
|
|
func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
t, ok := globalQueue.findByID(id)
|
|
if !ok {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
globalQueue.mu.Lock()
|
|
defer globalQueue.mu.Unlock()
|
|
switch t.Status {
|
|
case TaskPending:
|
|
t.Status = TaskCancelled
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
globalQueue.persistLocked()
|
|
writeJSON(w, map[string]string{"status": "cancelled"})
|
|
case TaskRunning:
|
|
if t.job != nil {
|
|
t.job.abort()
|
|
}
|
|
t.Status = TaskCancelled
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
globalQueue.persistLocked()
|
|
writeJSON(w, map[string]string{"status": "cancelled"})
|
|
default:
|
|
writeError(w, http.StatusConflict, "task is not running or pending")
|
|
}
|
|
}
|
|
|
|
func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
t, ok := globalQueue.findByID(id)
|
|
if !ok {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
var req struct {
|
|
Delta int `json:"delta"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
globalQueue.mu.Lock()
|
|
defer globalQueue.mu.Unlock()
|
|
if t.Status != TaskPending {
|
|
writeError(w, http.StatusConflict, "only pending tasks can be reprioritised")
|
|
return
|
|
}
|
|
t.Priority += req.Delta
|
|
globalQueue.persistLocked()
|
|
writeJSON(w, map[string]int{"priority": t.Priority})
|
|
}
|
|
|
|
func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request) {
|
|
globalQueue.mu.Lock()
|
|
now := time.Now()
|
|
n := 0
|
|
for _, t := range globalQueue.tasks {
|
|
switch t.Status {
|
|
case TaskPending:
|
|
t.Status = TaskCancelled
|
|
t.DoneAt = &now
|
|
n++
|
|
case TaskRunning:
|
|
if t.job != nil {
|
|
t.job.abort()
|
|
}
|
|
t.Status = TaskCancelled
|
|
t.DoneAt = &now
|
|
n++
|
|
}
|
|
}
|
|
globalQueue.persistLocked()
|
|
globalQueue.mu.Unlock()
|
|
writeJSON(w, map[string]int{"cancelled": n})
|
|
}
|
|
|
|
func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
// Wait up to 5s for the task to get a job (it may be pending)
|
|
deadline := time.Now().Add(5 * time.Second)
|
|
var j *jobState
|
|
for time.Now().Before(deadline) {
|
|
if jj, ok := globalQueue.findJob(id); ok {
|
|
j = jj
|
|
break
|
|
}
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
if j == nil {
|
|
http.Error(w, "task not found or not yet started", http.StatusNotFound)
|
|
return
|
|
}
|
|
streamJob(w, r, j)
|
|
}
|
|
|
|
func (q *taskQueue) assignTaskLogPathLocked(t *Task) {
|
|
if t.LogPath != "" || q.logsDir == "" || t.ID == "" {
|
|
return
|
|
}
|
|
t.LogPath = filepath.Join(q.logsDir, t.ID+".log")
|
|
}
|
|
|
|
func (q *taskQueue) loadLocked() {
|
|
if q.statePath == "" {
|
|
return
|
|
}
|
|
data, err := os.ReadFile(q.statePath)
|
|
if err != nil || len(data) == 0 {
|
|
return
|
|
}
|
|
var persisted []persistedTask
|
|
if err := json.Unmarshal(data, &persisted); err != nil {
|
|
return
|
|
}
|
|
for _, pt := range persisted {
|
|
t := &Task{
|
|
ID: pt.ID,
|
|
Name: pt.Name,
|
|
Target: pt.Target,
|
|
Priority: pt.Priority,
|
|
Status: pt.Status,
|
|
CreatedAt: pt.CreatedAt,
|
|
StartedAt: pt.StartedAt,
|
|
DoneAt: pt.DoneAt,
|
|
ErrMsg: pt.ErrMsg,
|
|
LogPath: pt.LogPath,
|
|
params: pt.Params,
|
|
}
|
|
q.assignTaskLogPathLocked(t)
|
|
if t.Status == TaskPending || t.Status == TaskRunning {
|
|
t.Status = TaskPending
|
|
t.DoneAt = nil
|
|
t.ErrMsg = ""
|
|
}
|
|
q.tasks = append(q.tasks, t)
|
|
}
|
|
q.prune()
|
|
q.persistLocked()
|
|
}
|
|
|
|
func (q *taskQueue) persistLocked() {
|
|
if q.statePath == "" {
|
|
return
|
|
}
|
|
state := make([]persistedTask, 0, len(q.tasks))
|
|
for _, t := range q.tasks {
|
|
state = append(state, persistedTask{
|
|
ID: t.ID,
|
|
Name: t.Name,
|
|
Target: t.Target,
|
|
Priority: t.Priority,
|
|
Status: t.Status,
|
|
CreatedAt: t.CreatedAt,
|
|
StartedAt: t.StartedAt,
|
|
DoneAt: t.DoneAt,
|
|
ErrMsg: t.ErrMsg,
|
|
LogPath: t.LogPath,
|
|
Params: t.params,
|
|
})
|
|
}
|
|
data, err := json.MarshalIndent(state, "", " ")
|
|
if err != nil {
|
|
return
|
|
}
|
|
tmp := q.statePath + ".tmp"
|
|
if err := os.WriteFile(tmp, data, 0644); err != nil {
|
|
return
|
|
}
|
|
_ = os.Rename(tmp, q.statePath)
|
|
}
|