8575cf06f8
RAID Controller Management previously hid any LSI drive that wasn't already Frgn/UGood/JBOD, and scoped VROC "free drives" from all system disks instead of the ones actually wired to the VROC controller's ports - drives attached directly to the CPU or another HBA could leak in. Now every drive is listed per its own controller, and LSI drives not already ready for array creation get a "Prepare" button that forces them to Unconfigured Good via storcli. Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
1597 lines
42 KiB
Go
1597 lines
42 KiB
Go
package webui
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"bee/audit/internal/app"
|
|
"bee/audit/internal/platform"
|
|
)
|
|
|
|
// Task statuses.
|
|
const (
|
|
TaskPending = "pending"
|
|
TaskRunning = "running"
|
|
TaskDone = "done"
|
|
TaskFailed = "failed"
|
|
TaskCancelled = "cancelled"
|
|
)
|
|
|
|
// taskNames maps target → human-readable name for validate (SAT) runs.
|
|
var taskNames = map[string]string{
|
|
"nvidia": "NVIDIA SAT",
|
|
"nvidia-targeted-stress": "NVIDIA Targeted Stress Validate (dcgmi diag targeted_stress)",
|
|
"nvidia-bench-perf": "NVIDIA Bee Bench Perf",
|
|
"nvidia-bench-power": "NVIDIA Bee Bench Power",
|
|
"nvidia-bench-autotune": "NVIDIA Bee Bench Power Source Autotune",
|
|
"nvidia-compute": "NVIDIA Max Compute Load (dcgmproftester)",
|
|
"nvidia-targeted-power": "NVIDIA Targeted Power (dcgmi diag targeted_power)",
|
|
"nvidia-pulse": "NVIDIA Pulse Test (dcgmi diag pulse_test)",
|
|
"nvidia-interconnect": "NVIDIA Interconnect Test (NCCL all_reduce_perf)",
|
|
"nvidia-bandwidth": "NVIDIA Bandwidth Test (NVBandwidth)",
|
|
"nvidia-stress": "NVIDIA GPU Stress",
|
|
"memory": "Memory SAT",
|
|
"storage": "Storage SAT",
|
|
"cpu": "CPU SAT",
|
|
"amd": "AMD GPU SAT",
|
|
"amd-mem": "AMD GPU MEM Integrity",
|
|
"amd-bandwidth": "AMD GPU MEM Bandwidth",
|
|
"amd-stress": "AMD GPU Burn-in",
|
|
"memory-stress": "Memory Burn-in",
|
|
"sat-stress": "SAT Stress (stressapptest)",
|
|
"platform-stress": "Platform Thermal Cycling",
|
|
"audit": "Audit",
|
|
"support-bundle": "Support Bundle",
|
|
"install": "Install to Disk",
|
|
"install-to-ram": "Install to RAM",
|
|
"nvme-format": "NVMe Block Format Change",
|
|
}
|
|
|
|
// burnNames maps target → human-readable name when a burn profile is set.
|
|
var burnNames = map[string]string{
|
|
"nvidia": "NVIDIA Burn-in",
|
|
"memory": "Memory Burn-in",
|
|
"cpu": "CPU Burn-in",
|
|
"amd": "AMD GPU Burn-in",
|
|
}
|
|
|
|
func nvidiaStressTaskName(loader string) string {
|
|
switch strings.TrimSpace(strings.ToLower(loader)) {
|
|
case platform.NvidiaStressLoaderJohn:
|
|
return "NVIDIA GPU Stress (John/OpenCL)"
|
|
case platform.NvidiaStressLoaderNCCL:
|
|
return "NVIDIA GPU Stress (NCCL)"
|
|
default:
|
|
return "NVIDIA GPU Stress (bee-gpu-burn)"
|
|
}
|
|
}
|
|
|
|
func taskDisplayName(target, profile, loader string) string {
|
|
name := taskNames[target]
|
|
if profile != "" {
|
|
if n, ok := burnNames[target]; ok {
|
|
name = n
|
|
}
|
|
}
|
|
if target == "nvidia-stress" {
|
|
name = nvidiaStressTaskName(loader)
|
|
}
|
|
if name == "" {
|
|
name = target
|
|
}
|
|
return name
|
|
}
|
|
|
|
// Task represents one unit of work in the queue.
|
|
type Task struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Target string `json:"target"`
|
|
Priority int `json:"priority"`
|
|
Status string `json:"status"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
|
DoneAt *time.Time `json:"done_at,omitempty"`
|
|
ElapsedSec int `json:"elapsed_sec,omitempty"`
|
|
ErrMsg string `json:"error,omitempty"`
|
|
LogPath string `json:"log_path,omitempty"`
|
|
ArtifactsDir string `json:"artifacts_dir,omitempty"`
|
|
ReportJSONPath string `json:"report_json_path,omitempty"`
|
|
ReportHTMLPath string `json:"report_html_path,omitempty"`
|
|
|
|
// runtime fields (not serialised)
|
|
job *jobState
|
|
runnerPID int
|
|
params taskParams
|
|
}
|
|
|
|
// taskParams holds optional parameters parsed from the run request.
|
|
type taskParams struct {
|
|
Duration int `json:"duration,omitempty"`
|
|
StressMode bool `json:"stress_mode,omitempty"`
|
|
GPUIndices []int `json:"gpu_indices,omitempty"`
|
|
ExcludeGPUIndices []int `json:"exclude_gpu_indices,omitempty"`
|
|
StaggerGPUStart bool `json:"stagger_gpu_start,omitempty"`
|
|
SizeMB int `json:"size_mb,omitempty"`
|
|
Passes int `json:"passes,omitempty"`
|
|
Loader string `json:"loader,omitempty"`
|
|
BurnProfile string `json:"burn_profile,omitempty"`
|
|
BenchmarkProfile string `json:"benchmark_profile,omitempty"`
|
|
BenchmarkKind string `json:"benchmark_kind,omitempty"`
|
|
RunNCCL bool `json:"run_nccl,omitempty"`
|
|
ParallelGPUs bool `json:"parallel_gpus,omitempty"`
|
|
RampStep int `json:"ramp_step,omitempty"`
|
|
RampTotal int `json:"ramp_total,omitempty"`
|
|
RampRunID string `json:"ramp_run_id,omitempty"`
|
|
DisplayName string `json:"display_name,omitempty"`
|
|
Device string `json:"device,omitempty"` // for install
|
|
LBAF int `json:"lbaf,omitempty"`
|
|
PlatformComponents []string `json:"platform_components,omitempty"`
|
|
SAADmiChanges []saaChange `json:"saa_dmi_changes,omitempty"`
|
|
FRUChanges []fruChange `json:"fru_changes,omitempty"`
|
|
HuaweiElabelChanges []huaweiChange `json:"huawei_elabel_changes,omitempty"`
|
|
RAIDController int `json:"raid_controller,omitempty"`
|
|
RAIDDevices []string `json:"raid_devices,omitempty"`
|
|
RAIDArrayName string `json:"raid_array_name,omitempty"`
|
|
RAIDSlot string `json:"raid_slot,omitempty"`
|
|
}
|
|
|
|
type persistedTask struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Target string `json:"target"`
|
|
Priority int `json:"priority"`
|
|
Status string `json:"status"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
StartedAt *time.Time `json:"started_at,omitempty"`
|
|
DoneAt *time.Time `json:"done_at,omitempty"`
|
|
ErrMsg string `json:"error,omitempty"`
|
|
LogPath string `json:"log_path,omitempty"`
|
|
ArtifactsDir string `json:"artifacts_dir,omitempty"`
|
|
ReportJSONPath string `json:"report_json_path,omitempty"`
|
|
ReportHTMLPath string `json:"report_html_path,omitempty"`
|
|
Params taskParams `json:"params,omitempty"`
|
|
}
|
|
|
|
type burnPreset struct {
|
|
DurationSec int
|
|
}
|
|
|
|
type nvidiaRampSpec struct {
|
|
DurationSec int
|
|
StaggerSeconds int
|
|
TotalDurationSec int
|
|
}
|
|
|
|
func resolveMemoryValidatePreset(profile string, stress bool) (sizeMB, passes int) {
|
|
switch strings.TrimSpace(strings.ToLower(profile)) {
|
|
case "overnight":
|
|
return 1024, 2
|
|
case "acceptance":
|
|
return 1024, 1
|
|
case "smoke":
|
|
return 256, 1
|
|
}
|
|
if stress {
|
|
return 512, 1
|
|
}
|
|
return 256, 1
|
|
}
|
|
|
|
func taskMayLeaveOrphanWorkers(target string) bool {
|
|
switch strings.TrimSpace(strings.ToLower(target)) {
|
|
case "nvidia", "nvidia-targeted-stress", "nvidia-targeted-power", "nvidia-pulse",
|
|
"nvidia-bandwidth", "nvidia-stress", "nvidia-compute", "nvidia-bench-perf",
|
|
"memory", "memory-stress", "cpu", "sat-stress", "platform-stress":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func resolveBurnPreset(profile string) burnPreset {
|
|
switch profile {
|
|
case "overnight":
|
|
return burnPreset{DurationSec: 8 * 60 * 60}
|
|
case "acceptance":
|
|
return burnPreset{DurationSec: 60 * 60}
|
|
default:
|
|
return burnPreset{DurationSec: 5 * 60}
|
|
}
|
|
}
|
|
|
|
func resolveNvidiaRampPlan(profile string, enabled bool, selected []int) (nvidiaRampSpec, error) {
|
|
base := resolveBurnPreset(profile).DurationSec
|
|
plan := nvidiaRampSpec{
|
|
DurationSec: base,
|
|
TotalDurationSec: base,
|
|
}
|
|
if !enabled {
|
|
return plan, nil
|
|
}
|
|
count := len(selected)
|
|
if count == 0 {
|
|
return nvidiaRampSpec{}, fmt.Errorf("staggered NVIDIA burn requires explicit GPU selection")
|
|
}
|
|
if count == 1 {
|
|
return plan, nil
|
|
}
|
|
|
|
switch profile {
|
|
case "acceptance":
|
|
plan.StaggerSeconds = 10 * 60
|
|
plan.TotalDurationSec = plan.DurationSec + plan.StaggerSeconds*(count-1)
|
|
case "overnight":
|
|
plan.StaggerSeconds = 60 * 60
|
|
plan.TotalDurationSec = 8 * 60 * 60
|
|
minTotal := count * 60 * 60
|
|
if plan.TotalDurationSec < minTotal {
|
|
plan.TotalDurationSec = minTotal
|
|
}
|
|
if plan.TotalDurationSec > 10*60*60 {
|
|
return nvidiaRampSpec{}, fmt.Errorf("overnight staggered NVIDIA burn supports at most 10 GPUs")
|
|
}
|
|
plan.DurationSec = plan.TotalDurationSec - plan.StaggerSeconds*(count-1)
|
|
default:
|
|
plan.StaggerSeconds = 2 * 60
|
|
plan.TotalDurationSec = plan.DurationSec + plan.StaggerSeconds*(count-1)
|
|
}
|
|
return plan, nil
|
|
}
|
|
|
|
func resolvePlatformStressPreset(profile string) platform.PlatformStressOptions {
|
|
acceptanceCycles := []platform.PlatformStressCycle{
|
|
{LoadSec: 85, IdleSec: 5},
|
|
{LoadSec: 80, IdleSec: 10},
|
|
{LoadSec: 55, IdleSec: 5},
|
|
{LoadSec: 60, IdleSec: 0},
|
|
{LoadSec: 100, IdleSec: 10},
|
|
{LoadSec: 145, IdleSec: 15},
|
|
{LoadSec: 190, IdleSec: 20},
|
|
{LoadSec: 235, IdleSec: 25},
|
|
{LoadSec: 280, IdleSec: 30},
|
|
{LoadSec: 325, IdleSec: 35},
|
|
{LoadSec: 370, IdleSec: 40},
|
|
{LoadSec: 415, IdleSec: 45},
|
|
{LoadSec: 460, IdleSec: 50},
|
|
{LoadSec: 510, IdleSec: 0},
|
|
}
|
|
|
|
switch profile {
|
|
case "overnight":
|
|
cycles := make([]platform.PlatformStressCycle, 0, len(acceptanceCycles)*8)
|
|
for range 8 {
|
|
cycles = append(cycles, acceptanceCycles...)
|
|
}
|
|
return platform.PlatformStressOptions{Cycles: cycles}
|
|
case "acceptance":
|
|
return platform.PlatformStressOptions{Cycles: acceptanceCycles}
|
|
default: // smoke
|
|
return platform.PlatformStressOptions{Cycles: []platform.PlatformStressCycle{
|
|
{LoadSec: 85, IdleSec: 5},
|
|
{LoadSec: 80, IdleSec: 10},
|
|
{LoadSec: 55, IdleSec: 5},
|
|
{LoadSec: 60, IdleSec: 0},
|
|
}}
|
|
}
|
|
}
|
|
|
|
// taskQueue manages a priority-ordered list of tasks and runs them one at a time.
|
|
type taskQueue struct {
|
|
mu sync.Mutex
|
|
tasks []*Task
|
|
trigger chan struct{}
|
|
opts *HandlerOptions // set by startWorker
|
|
statePath string
|
|
logsDir string
|
|
started bool
|
|
kmsgWatcher *kmsgWatcher
|
|
}
|
|
|
|
var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)}
|
|
|
|
const maxTaskHistory = 50
|
|
|
|
var (
|
|
runMemoryAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, sizeMB, passes int, logFunc func(string)) (string, error) {
|
|
return a.RunMemoryAcceptancePackCtx(ctx, baseDir, sizeMB, passes, logFunc)
|
|
}
|
|
runStorageAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, extended bool, logFunc func(string)) (string, error) {
|
|
return a.RunStorageAcceptancePackCtx(ctx, baseDir, extended, logFunc)
|
|
}
|
|
runCPUAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunCPUAcceptancePackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
runAMDAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunAMDAcceptancePackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runAMDMemIntegrityPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunAMDMemIntegrityPackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runAMDMemBandwidthPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
|
return a.RunAMDMemBandwidthPackCtx(ctx, baseDir, logFunc)
|
|
}
|
|
runNvidiaStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, opts platform.NvidiaStressOptions, logFunc func(string)) (string, error) {
|
|
return a.RunNvidiaStressPackCtx(ctx, baseDir, opts, logFunc)
|
|
}
|
|
runAMDStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunAMDStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
runMemoryStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunMemoryStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
runSATStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
|
return a.RunSATStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
|
}
|
|
buildSupportBundle = app.BuildSupportBundle
|
|
installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd {
|
|
return exec.CommandContext(ctx, "bee-install", device, logPath)
|
|
}
|
|
externalTaskRunnerCommand = func(exportDir, taskID string) (*exec.Cmd, error) {
|
|
exe, err := os.Executable()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return exec.Command(exe, "bee-worker", "--export-dir", exportDir, "--task-id", taskID), nil
|
|
}
|
|
)
|
|
|
|
// enqueue adds a task to the queue and notifies the worker.
|
|
func (q *taskQueue) enqueue(t *Task) {
|
|
q.mu.Lock()
|
|
q.assignTaskLogPathLocked(t)
|
|
q.tasks = append(q.tasks, t)
|
|
q.prune()
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
taskSerialEvent(t, "queued")
|
|
select {
|
|
case q.trigger <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// prune removes oldest completed tasks beyond maxTaskHistory.
|
|
func (q *taskQueue) prune() {
|
|
var done []*Task
|
|
var active []*Task
|
|
for _, t := range q.tasks {
|
|
switch t.Status {
|
|
case TaskDone, TaskFailed, TaskCancelled:
|
|
done = append(done, t)
|
|
default:
|
|
active = append(active, t)
|
|
}
|
|
}
|
|
if len(done) > maxTaskHistory {
|
|
done = done[len(done)-maxTaskHistory:]
|
|
}
|
|
q.tasks = append(active, done...)
|
|
}
|
|
|
|
// nextPending returns the highest-priority pending task (nil if none).
|
|
func (q *taskQueue) nextPending() *Task {
|
|
for _, t := range q.tasks {
|
|
if t.Status == TaskRunning {
|
|
return nil
|
|
}
|
|
}
|
|
var best *Task
|
|
for _, t := range q.tasks {
|
|
if t.Status != TaskPending {
|
|
continue
|
|
}
|
|
if best == nil || t.Priority > best.Priority ||
|
|
(t.Priority == best.Priority && t.CreatedAt.Before(best.CreatedAt)) {
|
|
best = t
|
|
}
|
|
}
|
|
return best
|
|
}
|
|
|
|
// findByID looks up a task by ID.
|
|
func (q *taskQueue) findByID(id string) (*Task, bool) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
for _, t := range q.tasks {
|
|
if t.ID == id {
|
|
return t, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
// findJob returns the jobState for a task ID (for SSE streaming compatibility).
|
|
func (q *taskQueue) findJob(id string) (*jobState, bool) {
|
|
t, ok := q.findByID(id)
|
|
if !ok || t.job == nil {
|
|
return nil, false
|
|
}
|
|
return t.job, true
|
|
}
|
|
|
|
type taskStreamSource struct {
|
|
status string
|
|
errMsg string
|
|
logPath string
|
|
job *jobState
|
|
}
|
|
|
|
func (q *taskQueue) taskStreamSource(id string) (taskStreamSource, bool) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
for _, t := range q.tasks {
|
|
if t.ID != id {
|
|
continue
|
|
}
|
|
return taskStreamSource{
|
|
status: t.Status,
|
|
errMsg: t.ErrMsg,
|
|
logPath: t.LogPath,
|
|
job: t.job,
|
|
}, true
|
|
}
|
|
return taskStreamSource{}, false
|
|
}
|
|
|
|
func (q *taskQueue) hasActiveTarget(target string) bool {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
for _, t := range q.tasks {
|
|
if t.Target != target {
|
|
continue
|
|
}
|
|
if t.Status == TaskPending || t.Status == TaskRunning {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// snapshot returns a copy of all tasks sorted for display with newest tasks first.
|
|
func (q *taskQueue) snapshot() []Task {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
out := make([]Task, len(q.tasks))
|
|
for i, t := range q.tasks {
|
|
out[i] = *t
|
|
out[i].ElapsedSec = taskElapsedSec(&out[i], time.Now())
|
|
}
|
|
sort.SliceStable(out, func(i, j int) bool {
|
|
if !out[i].CreatedAt.Equal(out[j].CreatedAt) {
|
|
return out[i].CreatedAt.After(out[j].CreatedAt)
|
|
}
|
|
si := statusOrder(out[i].Status)
|
|
sj := statusOrder(out[j].Status)
|
|
if si != sj {
|
|
return si < sj
|
|
}
|
|
if out[i].Priority != out[j].Priority {
|
|
return out[i].Priority > out[j].Priority
|
|
}
|
|
return out[i].Name < out[j].Name
|
|
})
|
|
return out
|
|
}
|
|
|
|
func statusOrder(s string) int {
|
|
switch s {
|
|
case TaskRunning:
|
|
return 0
|
|
case TaskPending:
|
|
return 1
|
|
default:
|
|
return 2
|
|
}
|
|
}
|
|
|
|
// startWorker launches the queue runner goroutine.
|
|
func (q *taskQueue) startWorker(opts *HandlerOptions) {
|
|
q.mu.Lock()
|
|
q.opts = opts
|
|
q.statePath = filepath.Join(opts.ExportDir, "tasks-state.json")
|
|
q.logsDir = filepath.Join(opts.ExportDir, "tasks")
|
|
_ = os.MkdirAll(q.logsDir, 0755)
|
|
if !q.started {
|
|
q.loadLocked()
|
|
q.started = true
|
|
q.resumeRunningTasksLocked()
|
|
goRecoverLoop("task worker", 2*time.Second, q.worker)
|
|
}
|
|
hasPending := q.nextPending() != nil
|
|
q.mu.Unlock()
|
|
if hasPending {
|
|
select {
|
|
case q.trigger <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) worker() {
|
|
for {
|
|
<-q.trigger
|
|
func() {
|
|
setCPUGovernor("performance")
|
|
defer setCPUGovernor("powersave")
|
|
|
|
for {
|
|
q.mu.Lock()
|
|
t := q.nextPending()
|
|
if t == nil {
|
|
q.prune()
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
return
|
|
}
|
|
now := time.Now()
|
|
t.Status = TaskRunning
|
|
t.StartedAt = &now
|
|
t.DoneAt = nil
|
|
t.ErrMsg = ""
|
|
j := newTaskJobState(t.LogPath)
|
|
t.job = j
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
|
|
q.runTaskExternal(t, j)
|
|
|
|
q.mu.Lock()
|
|
q.prune()
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
}
|
|
}()
|
|
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) resumeRunningTasksLocked() {
|
|
for _, t := range q.tasks {
|
|
if t.Status != TaskRunning {
|
|
continue
|
|
}
|
|
if t.job == nil {
|
|
t.job = newTaskJobState(t.LogPath)
|
|
}
|
|
q.attachExternalTaskControlsLocked(t, t.job)
|
|
q.startRecoveredTaskMonitorLocked(t, t.job)
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) attachExternalTaskControlsLocked(t *Task, j *jobState) {
|
|
if t == nil || j == nil {
|
|
return
|
|
}
|
|
j.cancel = func() {
|
|
pid := t.runnerPID
|
|
if pid <= 0 {
|
|
if state, ok := readTaskRunnerState(t); ok {
|
|
pid = state.PID
|
|
}
|
|
}
|
|
if pid > 0 {
|
|
_ = syscall.Kill(pid, syscall.SIGTERM)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) startRecoveredTaskMonitorLocked(t *Task, j *jobState) {
|
|
if t == nil || j == nil || t.runnerPID <= 0 {
|
|
return
|
|
}
|
|
goRecoverOnce("task runner monitor", func() {
|
|
stopTail := make(chan struct{})
|
|
doneTail := make(chan struct{})
|
|
go q.followTaskLog(t, j, stopTail, doneTail)
|
|
for processAlive(t.runnerPID) {
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
close(stopTail)
|
|
<-doneTail
|
|
q.finishExternalTask(t, j, nil)
|
|
})
|
|
}
|
|
|
|
func (q *taskQueue) runTaskExternal(t *Task, j *jobState) {
|
|
startedKmsgWatch := false
|
|
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
|
|
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
|
|
startedKmsgWatch = true
|
|
}
|
|
defer func() {
|
|
if startedKmsgWatch && q.kmsgWatcher != nil {
|
|
q.kmsgWatcher.NotifyTaskFinished(t.ID)
|
|
}
|
|
}()
|
|
|
|
stopTail := make(chan struct{})
|
|
doneTail := make(chan struct{})
|
|
defer func() {
|
|
close(stopTail)
|
|
<-doneTail
|
|
}()
|
|
go q.followTaskLog(t, j, stopTail, doneTail)
|
|
|
|
cmd, err := externalTaskRunnerCommand(q.opts.ExportDir, t.ID)
|
|
if err != nil {
|
|
j.appendFromLog("ERROR: " + err.Error())
|
|
q.finishExternalTask(t, j, err)
|
|
return
|
|
}
|
|
if err := cmd.Start(); err != nil {
|
|
j.appendFromLog("ERROR: " + err.Error())
|
|
q.finishExternalTask(t, j, err)
|
|
return
|
|
}
|
|
|
|
q.mu.Lock()
|
|
t.runnerPID = cmd.Process.Pid
|
|
q.attachExternalTaskControlsLocked(t, j)
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
|
|
waitErr := cmd.Wait()
|
|
time.Sleep(200 * time.Millisecond)
|
|
q.finishExternalTask(t, j, waitErr)
|
|
}
|
|
|
|
func (q *taskQueue) followTaskLog(t *Task, j *jobState, stop <-chan struct{}, done chan<- struct{}) {
|
|
defer close(done)
|
|
path := ""
|
|
if t != nil {
|
|
path = t.LogPath
|
|
}
|
|
if strings.TrimSpace(path) == "" {
|
|
return
|
|
}
|
|
offset := int64(0)
|
|
if info, err := os.Stat(path); err == nil {
|
|
offset = info.Size()
|
|
}
|
|
var partial string
|
|
ticker := time.NewTicker(250 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
flush := func() {
|
|
data, newOffset, err := readTaskLogDelta(path, offset)
|
|
if err != nil || len(data) == 0 {
|
|
offset = newOffset
|
|
return
|
|
}
|
|
offset = newOffset
|
|
text := partial + strings.ReplaceAll(string(data), "\r\n", "\n")
|
|
lines := strings.Split(text, "\n")
|
|
partial = lines[len(lines)-1]
|
|
for _, line := range lines[:len(lines)-1] {
|
|
if line == "" {
|
|
continue
|
|
}
|
|
j.appendFromLog(line)
|
|
}
|
|
}
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
flush()
|
|
case <-stop:
|
|
flush()
|
|
if strings.TrimSpace(partial) != "" {
|
|
j.appendFromLog(partial)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func readTaskLogDelta(path string, offset int64) ([]byte, int64, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, offset, err
|
|
}
|
|
defer f.Close()
|
|
info, err := f.Stat()
|
|
if err != nil {
|
|
return nil, offset, err
|
|
}
|
|
if info.Size() < offset {
|
|
offset = 0
|
|
}
|
|
if _, err := f.Seek(offset, io.SeekStart); err != nil {
|
|
return nil, offset, err
|
|
}
|
|
data, err := io.ReadAll(io.LimitReader(f, 1<<20))
|
|
return data, offset + int64(len(data)), err
|
|
}
|
|
|
|
func (q *taskQueue) finishExternalTask(t *Task, j *jobState, waitErr error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if t.Status == TaskDone || t.Status == TaskFailed || t.Status == TaskCancelled {
|
|
if j != nil && !j.isDone() {
|
|
j.finish(t.ErrMsg)
|
|
j.closeLog()
|
|
}
|
|
select {
|
|
case q.trigger <- struct{}{}:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
|
|
state, ok := readTaskRunnerState(t)
|
|
switch {
|
|
case ok && state.Status != TaskRunning:
|
|
t.Status = state.Status
|
|
t.ErrMsg = state.Error
|
|
now := state.UpdatedAt
|
|
if now.IsZero() {
|
|
now = time.Now()
|
|
}
|
|
t.DoneAt = &now
|
|
case waitErr != nil:
|
|
now := time.Now()
|
|
t.Status = TaskFailed
|
|
t.ErrMsg = waitErr.Error()
|
|
t.DoneAt = &now
|
|
default:
|
|
now := time.Now()
|
|
t.Status = TaskFailed
|
|
t.ErrMsg = "task runner exited without final state"
|
|
t.DoneAt = &now
|
|
}
|
|
t.runnerPID = 0
|
|
q.finalizeTaskArtifactPathsLocked(t)
|
|
q.persistLocked()
|
|
|
|
if j != nil && !j.isDone() {
|
|
j.finish(t.ErrMsg)
|
|
j.closeLog()
|
|
}
|
|
if t.ErrMsg != "" {
|
|
taskSerialEvent(t, "finished with status="+t.Status+" error="+t.ErrMsg)
|
|
} else {
|
|
taskSerialEvent(t, "finished with status="+t.Status)
|
|
}
|
|
select {
|
|
case q.trigger <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) executeTask(t *Task, j *jobState, ctx context.Context) {
|
|
startedKmsgWatch := false
|
|
defer q.finalizeTaskRun(t, j)
|
|
defer func() {
|
|
if startedKmsgWatch && q.kmsgWatcher != nil {
|
|
q.kmsgWatcher.NotifyTaskFinished(t.ID)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if rec := recover(); rec != nil {
|
|
msg := fmt.Sprintf("task panic: %v", rec)
|
|
slog.Error("task panic",
|
|
"task_id", t.ID,
|
|
"target", t.Target,
|
|
"panic", fmt.Sprint(rec),
|
|
"stack", string(debug.Stack()),
|
|
)
|
|
j.append("ERROR: " + msg)
|
|
j.finish(msg)
|
|
}
|
|
}()
|
|
|
|
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
|
|
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
|
|
startedKmsgWatch = true
|
|
}
|
|
|
|
q.runTask(t, j, ctx)
|
|
}
|
|
|
|
func (q *taskQueue) finalizeTaskRun(t *Task, j *jobState) {
|
|
q.mu.Lock()
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
if t.Status == TaskRunning {
|
|
if j.err != "" {
|
|
t.Status = TaskFailed
|
|
t.ErrMsg = j.err
|
|
} else {
|
|
t.Status = TaskDone
|
|
t.ErrMsg = ""
|
|
}
|
|
}
|
|
q.finalizeTaskArtifactPathsLocked(t)
|
|
q.persistLocked()
|
|
q.mu.Unlock()
|
|
|
|
if err := writeTaskReportArtifacts(t); err != nil {
|
|
appendJobLog(t.LogPath, "WARN: task report generation failed: "+err.Error())
|
|
}
|
|
j.closeLog()
|
|
if t.ErrMsg != "" {
|
|
taskSerialEvent(t, "finished with status="+t.Status+" error="+t.ErrMsg)
|
|
return
|
|
}
|
|
taskSerialEvent(t, "finished with status="+t.Status)
|
|
}
|
|
|
|
// setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files.
|
|
// Silently ignores errors (e.g. when cpufreq is not available).
|
|
func setCPUGovernor(governor string) {
|
|
matches, err := filepath.Glob("/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor")
|
|
if err != nil || len(matches) == 0 {
|
|
return
|
|
}
|
|
for _, path := range matches {
|
|
_ = os.WriteFile(path, []byte(governor), 0644)
|
|
}
|
|
}
|
|
|
|
// runTask executes the work for a task, writing output to j.
|
|
func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
|
|
if q.opts == nil {
|
|
j.append("ERROR: handler options not configured")
|
|
j.finish("handler options not configured")
|
|
return
|
|
}
|
|
a := q.opts.App
|
|
|
|
recovered := len(j.lines) > 0
|
|
j.append(fmt.Sprintf("Starting %s...", t.Name))
|
|
if recovered {
|
|
j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339)))
|
|
}
|
|
|
|
var (
|
|
archive string
|
|
err error
|
|
)
|
|
|
|
switch t.Target {
|
|
case "nvidia":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
diagLevel := 2
|
|
if t.params.StressMode {
|
|
diagLevel = 3
|
|
}
|
|
if len(t.params.GPUIndices) > 0 || diagLevel > 0 {
|
|
result, e := a.RunNvidiaAcceptancePackWithOptions(
|
|
ctx, "", diagLevel, t.params.GPUIndices, j.append,
|
|
)
|
|
if e != nil {
|
|
err = e
|
|
} else {
|
|
archive = result.Body
|
|
}
|
|
} else {
|
|
archive, err = a.RunNvidiaAcceptancePack("", j.append)
|
|
}
|
|
case "nvidia-targeted-stress":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if dur <= 0 {
|
|
dur = 300
|
|
}
|
|
archive, err = a.RunNvidiaTargetedStressValidatePack(ctx, "", dur, t.params.GPUIndices, j.append)
|
|
case "nvidia-bench-perf":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = a.RunNvidiaBenchmarkCtx(ctx, "", platform.NvidiaBenchmarkOptions{
|
|
Profile: t.params.BenchmarkProfile,
|
|
SizeMB: t.params.SizeMB,
|
|
GPUIndices: t.params.GPUIndices,
|
|
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
|
|
RunNCCL: t.params.RunNCCL,
|
|
ParallelGPUs: t.params.ParallelGPUs,
|
|
RampStep: t.params.RampStep,
|
|
RampTotal: t.params.RampTotal,
|
|
RampRunID: t.params.RampRunID,
|
|
}, j.append)
|
|
case "nvidia-bench-power":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = a.RunNvidiaPowerBenchCtx(ctx, app.DefaultBeeBenchPowerDir, platform.NvidiaBenchmarkOptions{
|
|
Profile: t.params.BenchmarkProfile,
|
|
GPUIndices: t.params.GPUIndices,
|
|
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
|
|
RampStep: t.params.RampStep,
|
|
RampTotal: t.params.RampTotal,
|
|
RampRunID: t.params.RampRunID,
|
|
}, j.append)
|
|
case "nvidia-bench-autotune":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = a.RunNvidiaPowerSourceAutotuneCtx(ctx, app.DefaultBeeBenchAutotuneDir, platform.NvidiaBenchmarkOptions{
|
|
Profile: t.params.BenchmarkProfile,
|
|
SizeMB: t.params.SizeMB,
|
|
}, t.params.BenchmarkKind, j.append)
|
|
case "nvidia-compute":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices)
|
|
if planErr != nil {
|
|
err = planErr
|
|
break
|
|
}
|
|
if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 {
|
|
dur = rampPlan.DurationSec
|
|
}
|
|
if rampPlan.StaggerSeconds > 0 {
|
|
j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec))
|
|
}
|
|
archive, err = a.RunNvidiaOfficialComputePack(ctx, "", dur, t.params.GPUIndices, rampPlan.StaggerSeconds, j.append)
|
|
case "nvidia-targeted-power":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = a.RunNvidiaTargetedPowerPack(ctx, "", dur, t.params.GPUIndices, j.append)
|
|
case "nvidia-pulse":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = a.RunNvidiaPulseTestPack(ctx, "", dur, t.params.GPUIndices, j.append)
|
|
case "nvidia-bandwidth":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = a.RunNvidiaBandwidthPack(ctx, "", t.params.GPUIndices, j.append)
|
|
case "nvidia-interconnect":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = a.RunNCCLTests(ctx, "", t.params.GPUIndices, j.append)
|
|
case "nvidia-stress":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices)
|
|
if planErr != nil {
|
|
err = planErr
|
|
break
|
|
}
|
|
if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 {
|
|
dur = rampPlan.DurationSec
|
|
}
|
|
if rampPlan.StaggerSeconds > 0 {
|
|
j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec))
|
|
}
|
|
archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{
|
|
DurationSec: dur,
|
|
Loader: t.params.Loader,
|
|
GPUIndices: t.params.GPUIndices,
|
|
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
|
|
StaggerSeconds: rampPlan.StaggerSeconds,
|
|
}, j.append)
|
|
case "memory":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
sizeMB, passes := resolveMemoryValidatePreset(t.params.BurnProfile, t.params.StressMode)
|
|
j.append(fmt.Sprintf("Memory validate preset: %d MB x %d pass(es)", sizeMB, passes))
|
|
archive, err = runMemoryAcceptancePackCtx(a, ctx, "", sizeMB, passes, j.append)
|
|
case "storage":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = runStorageAcceptancePackCtx(a, ctx, "", t.params.StressMode, j.append)
|
|
case "cpu":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
if dur <= 0 {
|
|
if t.params.StressMode {
|
|
dur = 1800
|
|
} else {
|
|
dur = 60
|
|
}
|
|
}
|
|
j.append(fmt.Sprintf("CPU stress duration: %ds", dur))
|
|
archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append)
|
|
case "amd":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append)
|
|
case "amd-mem":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append)
|
|
case "amd-bandwidth":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append)
|
|
case "amd-stress":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append)
|
|
case "memory-stress":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append)
|
|
case "sat-stress":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
dur := t.params.Duration
|
|
if t.params.BurnProfile != "" && dur <= 0 {
|
|
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
|
}
|
|
archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append)
|
|
case "platform-stress":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
opts := resolvePlatformStressPreset(t.params.BurnProfile)
|
|
opts.Components = t.params.PlatformComponents
|
|
archive, err = a.RunPlatformStress(ctx, "", opts, j.append)
|
|
case "audit":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
result, e := a.RunAuditNow(q.opts.RuntimeMode)
|
|
if e != nil {
|
|
err = e
|
|
} else {
|
|
for _, line := range splitLines(result.Body) {
|
|
j.append(line)
|
|
}
|
|
}
|
|
case "support-bundle":
|
|
j.append("Building support bundle...")
|
|
archive, err = buildSupportBundle(q.opts.ExportDir)
|
|
case "install":
|
|
if strings.TrimSpace(t.params.Device) == "" {
|
|
err = fmt.Errorf("device is required")
|
|
break
|
|
}
|
|
installLogPath := platform.InstallLogPath(t.params.Device)
|
|
j.append("Install log: " + installLogPath)
|
|
err = streamCmdJob(j, installCommand(ctx, t.params.Device, installLogPath))
|
|
case "install-to-ram":
|
|
if a == nil {
|
|
err = fmt.Errorf("app not configured")
|
|
break
|
|
}
|
|
err = a.RunInstallToRAM(ctx, j.append)
|
|
default:
|
|
j.append("ERROR: unknown target: " + t.Target)
|
|
j.finish("unknown target")
|
|
return
|
|
}
|
|
|
|
// If the SAT archive was produced, check overall_status and write to component DB.
|
|
if archive != "" {
|
|
archivePath := app.ExtractArchivePath(archive)
|
|
if err == nil {
|
|
if app.ReadSATOverallStatus(archivePath) == "FAILED" {
|
|
err = fmt.Errorf("SAT overall_status=FAILED (see summary.txt)")
|
|
}
|
|
}
|
|
if db := q.statusDB(); db != nil {
|
|
app.ApplySATResultToDB(db, t.Target, archivePath)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
j.append("Aborted.")
|
|
j.finish("aborted")
|
|
} else {
|
|
j.append("ERROR: " + err.Error())
|
|
j.finish(err.Error())
|
|
}
|
|
return
|
|
}
|
|
if archive != "" {
|
|
j.append("Archive: " + archive)
|
|
}
|
|
j.finish("")
|
|
}
|
|
|
|
func (q *taskQueue) statusDB() *app.ComponentStatusDB {
|
|
if q.opts == nil || q.opts.App == nil {
|
|
return nil
|
|
}
|
|
return q.opts.App.StatusDB
|
|
}
|
|
|
|
func splitLines(s string) []string {
|
|
var out []string
|
|
for _, l := range splitNL(s) {
|
|
if l != "" {
|
|
out = append(out, l)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func splitNL(s string) []string {
|
|
var out []string
|
|
start := 0
|
|
for i, c := range s {
|
|
if c == '\n' {
|
|
out = append(out, s[start:i])
|
|
start = i + 1
|
|
}
|
|
}
|
|
out = append(out, s[start:])
|
|
return out
|
|
}
|
|
|
|
// ── HTTP handlers ─────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPITasksList(w http.ResponseWriter, _ *http.Request) {
|
|
tasks := globalQueue.snapshot()
|
|
writeJSON(w, tasks)
|
|
}
|
|
|
|
func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
t, ok := globalQueue.findByID(id)
|
|
if !ok {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
globalQueue.mu.Lock()
|
|
defer globalQueue.mu.Unlock()
|
|
switch t.Status {
|
|
case TaskPending:
|
|
t.Status = TaskCancelled
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
globalQueue.persistLocked()
|
|
taskSerialEvent(t, "finished with status="+t.Status)
|
|
writeJSON(w, map[string]string{"status": "cancelled"})
|
|
case TaskRunning:
|
|
if t.job == nil || !t.job.abort() {
|
|
writeError(w, http.StatusConflict, "task is not cancellable")
|
|
return
|
|
}
|
|
writeJSON(w, map[string]string{"status": "aborting"})
|
|
default:
|
|
writeError(w, http.StatusConflict, "task is not running or pending")
|
|
}
|
|
}
|
|
|
|
func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
t, ok := globalQueue.findByID(id)
|
|
if !ok {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
var req struct {
|
|
Delta int `json:"delta"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
globalQueue.mu.Lock()
|
|
defer globalQueue.mu.Unlock()
|
|
if t.Status != TaskPending {
|
|
writeError(w, http.StatusConflict, "only pending tasks can be reprioritised")
|
|
return
|
|
}
|
|
t.Priority += req.Delta
|
|
globalQueue.persistLocked()
|
|
writeJSON(w, map[string]int{"priority": t.Priority})
|
|
}
|
|
|
|
func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request) {
|
|
globalQueue.mu.Lock()
|
|
now := time.Now()
|
|
n := 0
|
|
for _, t := range globalQueue.tasks {
|
|
switch t.Status {
|
|
case TaskPending:
|
|
t.Status = TaskCancelled
|
|
t.DoneAt = &now
|
|
taskSerialEvent(t, "finished with status="+t.Status)
|
|
n++
|
|
case TaskRunning:
|
|
if t.job != nil {
|
|
t.job.abort()
|
|
}
|
|
n++
|
|
}
|
|
}
|
|
globalQueue.persistLocked()
|
|
globalQueue.mu.Unlock()
|
|
writeJSON(w, map[string]int{"cancelled": n})
|
|
}
|
|
|
|
func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Request) {
|
|
// Cancel all queued/running tasks in the queue first.
|
|
globalQueue.mu.Lock()
|
|
now := time.Now()
|
|
cancelled := 0
|
|
for _, t := range globalQueue.tasks {
|
|
switch t.Status {
|
|
case TaskPending:
|
|
t.Status = TaskCancelled
|
|
t.DoneAt = &now
|
|
taskSerialEvent(t, "finished with status="+t.Status)
|
|
cancelled++
|
|
case TaskRunning:
|
|
if t.job != nil {
|
|
t.job.abort()
|
|
}
|
|
if taskMayLeaveOrphanWorkers(t.Target) {
|
|
platform.KillTestWorkers()
|
|
}
|
|
t.Status = TaskCancelled
|
|
t.DoneAt = &now
|
|
taskSerialEvent(t, "finished with status="+t.Status)
|
|
cancelled++
|
|
}
|
|
}
|
|
globalQueue.persistLocked()
|
|
globalQueue.mu.Unlock()
|
|
|
|
// Kill orphaned test worker processes at the OS level.
|
|
killed := platform.KillTestWorkers()
|
|
writeJSON(w, map[string]any{
|
|
"cancelled": cancelled,
|
|
"killed": len(killed),
|
|
"processes": killed,
|
|
})
|
|
}
|
|
|
|
func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
src, ok := globalQueue.taskStreamSource(id)
|
|
if !ok {
|
|
http.Error(w, "task not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
if src.job != nil {
|
|
streamJob(w, r, src.job)
|
|
return
|
|
}
|
|
if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled {
|
|
j := newTaskJobState(src.logPath)
|
|
j.finish(src.errMsg)
|
|
streamJob(w, r, j)
|
|
return
|
|
}
|
|
if !sseStart(w) {
|
|
return
|
|
}
|
|
sseWrite(w, "", "Task is queued. Waiting for worker...")
|
|
ticker := time.NewTicker(200 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
src, ok = globalQueue.taskStreamSource(id)
|
|
if !ok {
|
|
sseWrite(w, "done", "task not found")
|
|
return
|
|
}
|
|
if src.job != nil {
|
|
streamSubscribedJob(w, r, src.job)
|
|
return
|
|
}
|
|
if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled {
|
|
j := newTaskJobState(src.logPath)
|
|
j.finish(src.errMsg)
|
|
streamSubscribedJob(w, r, j)
|
|
return
|
|
}
|
|
case <-r.Context().Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *taskQueue) assignTaskLogPathLocked(t *Task) {
|
|
if q.logsDir == "" || t.ID == "" {
|
|
return
|
|
}
|
|
q.ensureTaskArtifactPathsLocked(t)
|
|
}
|
|
|
|
func (q *taskQueue) loadLocked() {
|
|
if q.statePath == "" {
|
|
return
|
|
}
|
|
data, err := os.ReadFile(q.statePath)
|
|
if err != nil || len(data) == 0 {
|
|
return
|
|
}
|
|
var persisted []persistedTask
|
|
if err := json.Unmarshal(data, &persisted); err != nil {
|
|
return
|
|
}
|
|
for _, pt := range persisted {
|
|
t := &Task{
|
|
ID: pt.ID,
|
|
Name: pt.Name,
|
|
Target: pt.Target,
|
|
Priority: pt.Priority,
|
|
Status: pt.Status,
|
|
CreatedAt: pt.CreatedAt,
|
|
StartedAt: pt.StartedAt,
|
|
DoneAt: pt.DoneAt,
|
|
ErrMsg: pt.ErrMsg,
|
|
LogPath: pt.LogPath,
|
|
ArtifactsDir: pt.ArtifactsDir,
|
|
ReportJSONPath: pt.ReportJSONPath,
|
|
ReportHTMLPath: pt.ReportHTMLPath,
|
|
params: pt.Params,
|
|
}
|
|
q.assignTaskLogPathLocked(t)
|
|
if t.Status == TaskRunning {
|
|
state, ok := readTaskRunnerState(t)
|
|
switch {
|
|
case ok && state.Status == TaskRunning && processAlive(state.PID):
|
|
t.runnerPID = state.PID
|
|
t.job = newTaskJobState(t.LogPath)
|
|
case ok && state.Status != TaskRunning:
|
|
t.runnerPID = state.PID
|
|
t.Status = state.Status
|
|
t.ErrMsg = state.Error
|
|
now := state.UpdatedAt
|
|
if now.IsZero() {
|
|
now = time.Now()
|
|
}
|
|
t.DoneAt = &now
|
|
default:
|
|
if taskMayLeaveOrphanWorkers(t.Target) {
|
|
_ = platform.KillTestWorkers()
|
|
}
|
|
now := time.Now()
|
|
t.Status = TaskFailed
|
|
t.DoneAt = &now
|
|
t.ErrMsg = "interrupted by bee-web restart"
|
|
}
|
|
} else if t.Status == TaskPending {
|
|
t.StartedAt = nil
|
|
t.DoneAt = nil
|
|
t.ErrMsg = ""
|
|
}
|
|
q.tasks = append(q.tasks, t)
|
|
}
|
|
q.prune()
|
|
q.persistLocked()
|
|
}
|
|
|
|
func (q *taskQueue) persistLocked() {
|
|
if q.statePath == "" {
|
|
return
|
|
}
|
|
state := make([]persistedTask, 0, len(q.tasks))
|
|
for _, t := range q.tasks {
|
|
state = append(state, persistedTask{
|
|
ID: t.ID,
|
|
Name: t.Name,
|
|
Target: t.Target,
|
|
Priority: t.Priority,
|
|
Status: t.Status,
|
|
CreatedAt: t.CreatedAt,
|
|
StartedAt: t.StartedAt,
|
|
DoneAt: t.DoneAt,
|
|
ErrMsg: t.ErrMsg,
|
|
LogPath: t.LogPath,
|
|
ArtifactsDir: t.ArtifactsDir,
|
|
ReportJSONPath: t.ReportJSONPath,
|
|
ReportHTMLPath: t.ReportHTMLPath,
|
|
Params: t.params,
|
|
})
|
|
}
|
|
data, err := json.MarshalIndent(state, "", " ")
|
|
if err != nil {
|
|
return
|
|
}
|
|
tmp := q.statePath + ".tmp"
|
|
if err := os.WriteFile(tmp, data, 0644); err != nil {
|
|
return
|
|
}
|
|
_ = os.Rename(tmp, q.statePath)
|
|
}
|
|
|
|
func taskElapsedSec(t *Task, now time.Time) int {
|
|
if t == nil || t.StartedAt == nil || t.StartedAt.IsZero() {
|
|
return 0
|
|
}
|
|
start := *t.StartedAt
|
|
if !t.CreatedAt.IsZero() && start.Before(t.CreatedAt) {
|
|
start = t.CreatedAt
|
|
}
|
|
end := now
|
|
if t.DoneAt != nil && !t.DoneAt.IsZero() {
|
|
end = *t.DoneAt
|
|
}
|
|
if end.Before(start) {
|
|
return 0
|
|
}
|
|
return int(end.Sub(start).Round(time.Second) / time.Second)
|
|
}
|
|
|
|
func taskFolderStatus(status string) string {
|
|
status = strings.TrimSpace(strings.ToLower(status))
|
|
switch status {
|
|
case TaskRunning, TaskDone, TaskFailed, TaskCancelled:
|
|
return status
|
|
default:
|
|
return TaskPending
|
|
}
|
|
}
|
|
|
|
func sanitizeTaskFolderPart(s string) string {
|
|
s = strings.TrimSpace(strings.ToLower(s))
|
|
if s == "" {
|
|
return "task"
|
|
}
|
|
var b strings.Builder
|
|
lastDash := false
|
|
for _, r := range s {
|
|
isAlnum := (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9')
|
|
if isAlnum {
|
|
b.WriteRune(r)
|
|
lastDash = false
|
|
continue
|
|
}
|
|
if !lastDash {
|
|
b.WriteByte('-')
|
|
lastDash = true
|
|
}
|
|
}
|
|
out := strings.Trim(b.String(), "-")
|
|
if out == "" {
|
|
return "task"
|
|
}
|
|
return out
|
|
}
|
|
|
|
func taskArtifactsDir(root string, t *Task, status string) string {
|
|
if strings.TrimSpace(root) == "" || t == nil {
|
|
return ""
|
|
}
|
|
prefix := taskFolderNumberPrefix(t.ID)
|
|
return filepath.Join(root, fmt.Sprintf("%s_%s_%s", prefix, sanitizeTaskFolderPart(t.Name), taskFolderStatus(status)))
|
|
}
|
|
|
|
func taskFolderNumberPrefix(taskID string) string {
|
|
taskID = strings.TrimSpace(taskID)
|
|
if strings.HasPrefix(taskID, "TASK-") && len(taskID) >= len("TASK-000") {
|
|
num := strings.TrimSpace(strings.TrimPrefix(taskID, "TASK-"))
|
|
if len(num) == 3 {
|
|
allDigits := true
|
|
for _, r := range num {
|
|
if r < '0' || r > '9' {
|
|
allDigits = false
|
|
break
|
|
}
|
|
}
|
|
if allDigits {
|
|
return num
|
|
}
|
|
}
|
|
}
|
|
fallback := sanitizeTaskFolderPart(taskID)
|
|
if fallback == "" {
|
|
return "000"
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func ensureTaskReportPaths(t *Task) {
|
|
if t == nil || strings.TrimSpace(t.ArtifactsDir) == "" {
|
|
return
|
|
}
|
|
if t.LogPath == "" || filepath.Base(t.LogPath) == "task.log" {
|
|
t.LogPath = filepath.Join(t.ArtifactsDir, "task.log")
|
|
}
|
|
t.ReportJSONPath = filepath.Join(t.ArtifactsDir, "report.json")
|
|
t.ReportHTMLPath = filepath.Join(t.ArtifactsDir, "report.html")
|
|
}
|
|
|
|
func (q *taskQueue) ensureTaskArtifactPathsLocked(t *Task) {
|
|
if t == nil || strings.TrimSpace(q.logsDir) == "" || strings.TrimSpace(t.ID) == "" {
|
|
return
|
|
}
|
|
if strings.TrimSpace(t.ArtifactsDir) == "" {
|
|
t.ArtifactsDir = taskArtifactsDir(q.logsDir, t, t.Status)
|
|
}
|
|
if t.ArtifactsDir != "" {
|
|
_ = os.MkdirAll(t.ArtifactsDir, 0755)
|
|
}
|
|
ensureTaskReportPaths(t)
|
|
}
|
|
|
|
func (q *taskQueue) finalizeTaskArtifactPathsLocked(t *Task) {
|
|
if t == nil || strings.TrimSpace(q.logsDir) == "" || strings.TrimSpace(t.ID) == "" {
|
|
return
|
|
}
|
|
q.ensureTaskArtifactPathsLocked(t)
|
|
dstDir := taskArtifactsDir(q.logsDir, t, t.Status)
|
|
if dstDir == "" {
|
|
return
|
|
}
|
|
if t.ArtifactsDir != "" && t.ArtifactsDir != dstDir {
|
|
if _, err := os.Stat(dstDir); err != nil {
|
|
_ = os.Rename(t.ArtifactsDir, dstDir)
|
|
}
|
|
t.ArtifactsDir = dstDir
|
|
}
|
|
ensureTaskReportPaths(t)
|
|
}
|