Files
bee/audit/internal/webui/task_runner.go

506 lines
14 KiB
Go

package webui
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"bee/audit/internal/app"
"bee/audit/internal/platform"
"bee/audit/internal/runtimeenv"
)
type taskRunnerState struct {
PID int `json:"pid"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
func taskRunnerStatePath(t *Task) string {
if t == nil || strings.TrimSpace(t.ArtifactsDir) == "" {
return ""
}
return filepath.Join(t.ArtifactsDir, "runner-state.json")
}
func writeTaskRunnerState(t *Task, state taskRunnerState) error {
path := taskRunnerStatePath(t)
if path == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
data, err := json.MarshalIndent(state, "", " ")
if err != nil {
return err
}
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, 0644); err != nil {
return err
}
return os.Rename(tmp, path)
}
func readTaskRunnerState(t *Task) (taskRunnerState, bool) {
path := taskRunnerStatePath(t)
if path == "" {
return taskRunnerState{}, false
}
data, err := os.ReadFile(path)
if err != nil || len(data) == 0 {
return taskRunnerState{}, false
}
var state taskRunnerState
if err := json.Unmarshal(data, &state); err != nil {
return taskRunnerState{}, false
}
return state, true
}
func processAlive(pid int) bool {
if pid <= 0 {
return false
}
err := syscall.Kill(pid, 0)
return err == nil || err == syscall.EPERM
}
func finalizeTaskForResult(t *Task, errMsg string, cancelled bool) {
now := time.Now()
t.DoneAt = &now
switch {
case cancelled:
t.Status = TaskCancelled
t.ErrMsg = "aborted"
case strings.TrimSpace(errMsg) != "":
t.Status = TaskFailed
t.ErrMsg = errMsg
default:
t.Status = TaskDone
t.ErrMsg = ""
}
}
func executeTaskWithOptions(opts *HandlerOptions, t *Task, j *jobState, ctx context.Context) {
if opts == nil {
j.append("ERROR: handler options not configured")
j.finish("handler options not configured")
return
}
a := opts.App
recovered := len(j.lines) > 0
j.append(fmt.Sprintf("Starting %s...", t.Name))
if recovered {
j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339)))
}
var (
archive string
err error
)
switch t.Target {
case "nvidia":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
diagLevel := 2
if t.params.StressMode {
diagLevel = 3
}
if len(t.params.GPUIndices) > 0 || diagLevel > 0 {
result, e := a.RunNvidiaAcceptancePackWithOptions(ctx, "", diagLevel, t.params.GPUIndices, j.append)
if e != nil {
err = e
} else {
archive = result.Body
}
} else {
archive, err = a.RunNvidiaAcceptancePack("", j.append)
}
case "nvidia-targeted-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if dur <= 0 {
dur = 300
}
archive, err = a.RunNvidiaTargetedStressValidatePack(ctx, "", dur, t.params.GPUIndices, j.append)
case "nvidia-bench-perf":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNvidiaBenchmarkCtx(ctx, "", platform.NvidiaBenchmarkOptions{
Profile: t.params.BenchmarkProfile,
SizeMB: t.params.SizeMB,
GPUIndices: t.params.GPUIndices,
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
RunNCCL: t.params.RunNCCL,
ParallelGPUs: t.params.ParallelGPUs,
RampStep: t.params.RampStep,
RampTotal: t.params.RampTotal,
RampRunID: t.params.RampRunID,
}, j.append)
case "nvidia-bench-power":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNvidiaPowerBenchCtx(ctx, app.DefaultBeeBenchPowerDir, platform.NvidiaBenchmarkOptions{
Profile: t.params.BenchmarkProfile,
GPUIndices: t.params.GPUIndices,
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
RampStep: t.params.RampStep,
RampTotal: t.params.RampTotal,
RampRunID: t.params.RampRunID,
}, j.append)
case "nvidia-bench-autotune":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNvidiaPowerSourceAutotuneCtx(ctx, app.DefaultBeeBenchAutotuneDir, platform.NvidiaBenchmarkOptions{
Profile: t.params.BenchmarkProfile,
SizeMB: t.params.SizeMB,
}, t.params.BenchmarkKind, j.append)
case "nvidia-compute":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices)
if planErr != nil {
err = planErr
break
}
if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 {
dur = rampPlan.DurationSec
}
if rampPlan.StaggerSeconds > 0 {
j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec))
}
archive, err = a.RunNvidiaOfficialComputePack(ctx, "", dur, t.params.GPUIndices, rampPlan.StaggerSeconds, j.append)
case "nvidia-targeted-power":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = a.RunNvidiaTargetedPowerPack(ctx, "", dur, t.params.GPUIndices, j.append)
case "nvidia-pulse":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = a.RunNvidiaPulseTestPack(ctx, "", dur, t.params.GPUIndices, j.append)
case "nvidia-bandwidth":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNvidiaBandwidthPack(ctx, "", t.params.GPUIndices, j.append)
case "nvidia-interconnect":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = a.RunNCCLTests(ctx, "", t.params.GPUIndices, j.append)
case "nvidia-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices)
if planErr != nil {
err = planErr
break
}
if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 {
dur = rampPlan.DurationSec
}
if rampPlan.StaggerSeconds > 0 {
j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec))
}
archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{
DurationSec: dur,
Loader: t.params.Loader,
GPUIndices: t.params.GPUIndices,
ExcludeGPUIndices: t.params.ExcludeGPUIndices,
StaggerSeconds: rampPlan.StaggerSeconds,
}, j.append)
case "memory":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
sizeMB, passes := resolveMemoryValidatePreset(t.params.BurnProfile, t.params.StressMode)
j.append(fmt.Sprintf("Memory validate preset: %d MB x %d pass(es)", sizeMB, passes))
archive, err = runMemoryAcceptancePackCtx(a, ctx, "", sizeMB, passes, j.append)
case "storage":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runStorageAcceptancePackCtx(a, ctx, "", t.params.StressMode, j.append)
case "cpu":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
if dur <= 0 {
if t.params.StressMode {
dur = 1800
} else {
dur = 60
}
}
j.append(fmt.Sprintf("CPU stress duration: %ds", dur))
archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append)
case "amd":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append)
case "amd-mem":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append)
case "amd-bandwidth":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append)
case "amd-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append)
case "memory-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append)
case "sat-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
dur := t.params.Duration
if t.params.BurnProfile != "" && dur <= 0 {
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
}
archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append)
case "platform-stress":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
runOpts := resolvePlatformStressPreset(t.params.BurnProfile)
runOpts.Components = t.params.PlatformComponents
archive, err = a.RunPlatformStress(ctx, "", runOpts, j.append)
case "audit":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
result, e := a.RunAuditNow(opts.RuntimeMode)
if e != nil {
err = e
} else {
for _, line := range splitLines(result.Body) {
j.append(line)
}
}
case "support-bundle":
j.append("Building support bundle...")
archive, err = buildSupportBundle(opts.ExportDir)
case "install":
if strings.TrimSpace(t.params.Device) == "" {
err = fmt.Errorf("device is required")
break
}
installLogPath := platform.InstallLogPath(t.params.Device)
j.append("Install log: " + installLogPath)
err = streamCmdJob(j, installCommand(ctx, t.params.Device, installLogPath))
case "install-to-ram":
if a == nil {
err = fmt.Errorf("app not configured")
break
}
err = a.RunInstallToRAM(ctx, j.append)
default:
j.append("ERROR: unknown target: " + t.Target)
j.finish("unknown target")
return
}
if archive != "" {
archivePath := app.ExtractArchivePath(archive)
if err == nil && app.ReadSATOverallStatus(archivePath) == "FAILED" {
err = fmt.Errorf("SAT overall_status=FAILED (see summary.txt)")
}
if opts.App != nil && opts.App.StatusDB != nil {
app.ApplySATResultToDB(opts.App.StatusDB, t.Target, archivePath)
}
}
if err != nil {
if ctx.Err() != nil {
j.append("Aborted.")
j.finish("aborted")
} else {
j.append("ERROR: " + err.Error())
j.finish(err.Error())
}
return
}
if archive != "" {
j.append("Archive: " + archive)
}
j.finish("")
}
func loadPersistedTask(statePath, taskID string) (*Task, error) {
data, err := os.ReadFile(statePath)
if err != nil {
return nil, err
}
var persisted []persistedTask
if err := json.Unmarshal(data, &persisted); err != nil {
return nil, err
}
for _, pt := range persisted {
if pt.ID != taskID {
continue
}
t := &Task{
ID: pt.ID,
Name: pt.Name,
Target: pt.Target,
Priority: pt.Priority,
Status: pt.Status,
CreatedAt: pt.CreatedAt,
StartedAt: pt.StartedAt,
DoneAt: pt.DoneAt,
ErrMsg: pt.ErrMsg,
LogPath: pt.LogPath,
ArtifactsDir: pt.ArtifactsDir,
ReportJSONPath: pt.ReportJSONPath,
ReportHTMLPath: pt.ReportHTMLPath,
params: pt.Params,
}
ensureTaskReportPaths(t)
return t, nil
}
return nil, fmt.Errorf("task %s not found", taskID)
}
func RunPersistedTask(exportDir, taskID string, stdout, stderr io.Writer) int {
if strings.TrimSpace(exportDir) == "" || strings.TrimSpace(taskID) == "" {
fmt.Fprintln(stderr, "bee task-run: --export-dir and --task-id are required")
return 2
}
runtimeInfo, err := runtimeenv.Detect("auto")
if err != nil {
slog.Warn("resolve runtime for task-run", "err", err)
}
opts := &HandlerOptions{
ExportDir: exportDir,
App: app.New(platform.New()),
RuntimeMode: runtimeInfo.Mode,
}
statePath := filepath.Join(exportDir, "tasks-state.json")
task, err := loadPersistedTask(statePath, taskID)
if err != nil {
fmt.Fprintln(stderr, err.Error())
return 1
}
if task.StartedAt == nil || task.StartedAt.IsZero() {
now := time.Now()
task.StartedAt = &now
}
if task.Status == "" {
task.Status = TaskRunning
}
if err := writeTaskRunnerState(task, taskRunnerState{
PID: os.Getpid(),
Status: TaskRunning,
UpdatedAt: time.Now().UTC(),
}); err != nil {
fmt.Fprintln(stderr, err.Error())
return 1
}
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
j := newTaskJobState(task.LogPath, taskSerialPrefix(task))
executeTaskWithOptions(opts, task, j, ctx)
finalizeTaskForResult(task, j.err, ctx.Err() != nil)
if err := writeTaskReportArtifacts(task); err != nil {
appendJobLog(task.LogPath, "WARN: task report generation failed: "+err.Error())
}
j.closeLog()
if err := writeTaskRunnerState(task, taskRunnerState{
PID: os.Getpid(),
Status: task.Status,
Error: task.ErrMsg,
UpdatedAt: time.Now().UTC(),
}); err != nil {
fmt.Fprintln(stderr, err.Error())
}
if task.ErrMsg != "" {
return 1
}
return 0
}