package webui import ( "context" "encoding/json" "fmt" "io" "log/slog" "os" "os/signal" "path/filepath" "strings" "syscall" "time" "bee/audit/internal/app" "bee/audit/internal/platform" "bee/audit/internal/runtimeenv" ) type taskRunnerState struct { PID int `json:"pid"` Status string `json:"status"` Error string `json:"error,omitempty"` UpdatedAt time.Time `json:"updated_at"` } func taskRunnerStatePath(t *Task) string { if t == nil || strings.TrimSpace(t.ArtifactsDir) == "" { return "" } return filepath.Join(t.ArtifactsDir, "runner-state.json") } func writeTaskRunnerState(t *Task, state taskRunnerState) error { path := taskRunnerStatePath(t) if path == "" { return nil } if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { return err } data, err := json.MarshalIndent(state, "", " ") if err != nil { return err } tmp := path + ".tmp" if err := os.WriteFile(tmp, data, 0644); err != nil { return err } return os.Rename(tmp, path) } func readTaskRunnerState(t *Task) (taskRunnerState, bool) { path := taskRunnerStatePath(t) if path == "" { return taskRunnerState{}, false } data, err := os.ReadFile(path) if err != nil || len(data) == 0 { return taskRunnerState{}, false } var state taskRunnerState if err := json.Unmarshal(data, &state); err != nil { return taskRunnerState{}, false } return state, true } func processAlive(pid int) bool { if pid <= 0 { return false } err := syscall.Kill(pid, 0) return err == nil || err == syscall.EPERM } func finalizeTaskForResult(t *Task, errMsg string, cancelled bool) { now := time.Now() t.DoneAt = &now switch { case cancelled: t.Status = TaskCancelled t.ErrMsg = "aborted" case strings.TrimSpace(errMsg) != "": t.Status = TaskFailed t.ErrMsg = errMsg default: t.Status = TaskDone t.ErrMsg = "" } } func executeTaskWithOptions(opts *HandlerOptions, t *Task, j *jobState, ctx context.Context) { if opts == nil { j.append("ERROR: handler options not configured") j.finish("handler options not configured") return } a := opts.App recovered := len(j.lines) > 0 j.append(fmt.Sprintf("Starting %s...", t.Name)) if recovered { j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339))) } var ( archive string err error ) switch t.Target { case "nvidia": if a == nil { err = fmt.Errorf("app not configured") break } diagLevel := 2 if t.params.StressMode { diagLevel = 3 } if len(t.params.GPUIndices) > 0 || diagLevel > 0 { result, e := a.RunNvidiaAcceptancePackWithOptions(ctx, "", diagLevel, t.params.GPUIndices, j.append) if e != nil { err = e } else { archive = result.Body } } else { archive, err = a.RunNvidiaAcceptancePack("", j.append) } case "nvidia-targeted-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if dur <= 0 { dur = 300 } archive, err = a.RunNvidiaTargetedStressValidatePack(ctx, "", dur, t.params.GPUIndices, j.append) case "nvidia-bench-perf": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = a.RunNvidiaBenchmarkCtx(ctx, "", platform.NvidiaBenchmarkOptions{ Profile: t.params.BenchmarkProfile, SizeMB: t.params.SizeMB, GPUIndices: t.params.GPUIndices, ExcludeGPUIndices: t.params.ExcludeGPUIndices, RunNCCL: t.params.RunNCCL, ParallelGPUs: t.params.ParallelGPUs, RampStep: t.params.RampStep, RampTotal: t.params.RampTotal, RampRunID: t.params.RampRunID, }, j.append) case "nvidia-bench-power": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = a.RunNvidiaPowerBenchCtx(ctx, app.DefaultBeeBenchPowerDir, platform.NvidiaBenchmarkOptions{ Profile: t.params.BenchmarkProfile, GPUIndices: t.params.GPUIndices, ExcludeGPUIndices: t.params.ExcludeGPUIndices, RampStep: t.params.RampStep, RampTotal: t.params.RampTotal, RampRunID: t.params.RampRunID, }, j.append) case "nvidia-bench-autotune": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = a.RunNvidiaPowerSourceAutotuneCtx(ctx, app.DefaultBeeBenchAutotuneDir, platform.NvidiaBenchmarkOptions{ Profile: t.params.BenchmarkProfile, SizeMB: t.params.SizeMB, }, t.params.BenchmarkKind, j.append) case "nvidia-compute": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices) if planErr != nil { err = planErr break } if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 { dur = rampPlan.DurationSec } if rampPlan.StaggerSeconds > 0 { j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec)) } archive, err = a.RunNvidiaOfficialComputePack(ctx, "", dur, t.params.GPUIndices, rampPlan.StaggerSeconds, j.append) case "nvidia-targeted-power": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = a.RunNvidiaTargetedPowerPack(ctx, "", dur, t.params.GPUIndices, j.append) case "nvidia-pulse": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = a.RunNvidiaPulseTestPack(ctx, "", dur, t.params.GPUIndices, j.append) case "nvidia-bandwidth": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = a.RunNvidiaBandwidthPack(ctx, "", t.params.GPUIndices, j.append) case "nvidia-interconnect": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = a.RunNCCLTests(ctx, "", t.params.GPUIndices, j.append) case "nvidia-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices) if planErr != nil { err = planErr break } if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 { dur = rampPlan.DurationSec } if rampPlan.StaggerSeconds > 0 { j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec)) } archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{ DurationSec: dur, Loader: t.params.Loader, GPUIndices: t.params.GPUIndices, ExcludeGPUIndices: t.params.ExcludeGPUIndices, StaggerSeconds: rampPlan.StaggerSeconds, }, j.append) case "memory": if a == nil { err = fmt.Errorf("app not configured") break } sizeMB, passes := resolveMemoryValidatePreset(t.params.BurnProfile, t.params.StressMode) j.append(fmt.Sprintf("Memory validate preset: %d MB x %d pass(es)", sizeMB, passes)) archive, err = runMemoryAcceptancePackCtx(a, ctx, "", sizeMB, passes, j.append) case "storage": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runStorageAcceptancePackCtx(a, ctx, "", t.params.StressMode, j.append) case "cpu": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } if dur <= 0 { if t.params.StressMode { dur = 1800 } else { dur = 60 } } j.append(fmt.Sprintf("CPU stress duration: %ds", dur)) archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append) case "amd": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append) case "amd-mem": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append) case "amd-bandwidth": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append) case "amd-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append) case "memory-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append) case "sat-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append) case "platform-stress": if a == nil { err = fmt.Errorf("app not configured") break } runOpts := resolvePlatformStressPreset(t.params.BurnProfile) runOpts.Components = t.params.PlatformComponents archive, err = a.RunPlatformStress(ctx, "", runOpts, j.append) case "audit": if a == nil { err = fmt.Errorf("app not configured") break } result, e := a.RunAuditNow(opts.RuntimeMode) if e != nil { err = e } else { for _, line := range splitLines(result.Body) { j.append(line) } } case "support-bundle": j.append("Building support bundle...") archive, err = buildSupportBundle(opts.ExportDir) case "install": if strings.TrimSpace(t.params.Device) == "" { err = fmt.Errorf("device is required") break } installLogPath := platform.InstallLogPath(t.params.Device) j.append("Install log: " + installLogPath) err = streamCmdJob(j, installCommand(ctx, t.params.Device, installLogPath)) case "install-to-ram": if a == nil { err = fmt.Errorf("app not configured") break } err = a.RunInstallToRAM(ctx, j.append) default: j.append("ERROR: unknown target: " + t.Target) j.finish("unknown target") return } if archive != "" { archivePath := app.ExtractArchivePath(archive) if err == nil && app.ReadSATOverallStatus(archivePath) == "FAILED" { err = fmt.Errorf("SAT overall_status=FAILED (see summary.txt)") } if opts.App != nil && opts.App.StatusDB != nil { app.ApplySATResultToDB(opts.App.StatusDB, t.Target, archivePath) } } if err != nil { if ctx.Err() != nil { j.append("Aborted.") j.finish("aborted") } else { j.append("ERROR: " + err.Error()) j.finish(err.Error()) } return } if archive != "" { j.append("Archive: " + archive) } j.finish("") } func loadPersistedTask(statePath, taskID string) (*Task, error) { data, err := os.ReadFile(statePath) if err != nil { return nil, err } var persisted []persistedTask if err := json.Unmarshal(data, &persisted); err != nil { return nil, err } for _, pt := range persisted { if pt.ID != taskID { continue } t := &Task{ ID: pt.ID, Name: pt.Name, Target: pt.Target, Priority: pt.Priority, Status: pt.Status, CreatedAt: pt.CreatedAt, StartedAt: pt.StartedAt, DoneAt: pt.DoneAt, ErrMsg: pt.ErrMsg, LogPath: pt.LogPath, ArtifactsDir: pt.ArtifactsDir, ReportJSONPath: pt.ReportJSONPath, ReportHTMLPath: pt.ReportHTMLPath, params: pt.Params, } ensureTaskReportPaths(t) return t, nil } return nil, fmt.Errorf("task %s not found", taskID) } func RunPersistedTask(exportDir, taskID string, stdout, stderr io.Writer) int { if strings.TrimSpace(exportDir) == "" || strings.TrimSpace(taskID) == "" { fmt.Fprintln(stderr, "bee task-run: --export-dir and --task-id are required") return 2 } runtimeInfo, err := runtimeenv.Detect("auto") if err != nil { slog.Warn("resolve runtime for task-run", "err", err) } opts := &HandlerOptions{ ExportDir: exportDir, App: app.New(platform.New()), RuntimeMode: runtimeInfo.Mode, } statePath := filepath.Join(exportDir, "tasks-state.json") task, err := loadPersistedTask(statePath, taskID) if err != nil { fmt.Fprintln(stderr, err.Error()) return 1 } if task.StartedAt == nil || task.StartedAt.IsZero() { now := time.Now() task.StartedAt = &now } if task.Status == "" { task.Status = TaskRunning } if err := writeTaskRunnerState(task, taskRunnerState{ PID: os.Getpid(), Status: TaskRunning, UpdatedAt: time.Now().UTC(), }); err != nil { fmt.Fprintln(stderr, err.Error()) return 1 } ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() j := newTaskJobState(task.LogPath, taskSerialPrefix(task)) executeTaskWithOptions(opts, task, j, ctx) finalizeTaskForResult(task, j.err, ctx.Err() != nil) if err := writeTaskReportArtifacts(task); err != nil { appendJobLog(task.LogPath, "WARN: task report generation failed: "+err.Error()) } j.closeLog() if err := writeTaskRunnerState(task, taskRunnerState{ PID: os.Getpid(), Status: task.Status, Error: task.ErrMsg, UpdatedAt: time.Now().UTC(), }); err != nil { fmt.Fprintln(stderr, err.Error()) } if task.ErrMsg != "" { return 1 } return 0 }