diff --git a/audit/cmd/bee/main.go b/audit/cmd/bee/main.go index 1e84410..a839bd3 100644 --- a/audit/cmd/bee/main.go +++ b/audit/cmd/bee/main.go @@ -71,6 +71,8 @@ func run(args []string, stdout, stderr io.Writer) (exitCode int) { return runSAT(args[1:], stdout, stderr) case "benchmark": return runBenchmark(args[1:], stdout, stderr) + case "bee-worker": + return runBeeWorker(args[1:], stdout, stderr) case "version", "--version", "-version": fmt.Fprintln(stdout, Version) return 0 @@ -90,6 +92,7 @@ func printRootUsage(w io.Writer) { bee web --listen :80 [--audit-path `+app.DefaultAuditJSONPath+`] bee sat nvidia|memory|storage|cpu [--duration ] bee benchmark nvidia [--profile standard|stability|overnight] + bee bee-worker --export-dir `+app.DefaultExportDir+` --task-id TASK-001 bee version bee help [command]`) } @@ -110,6 +113,8 @@ func runHelp(args []string, stdout, stderr io.Writer) int { return runSAT([]string{"--help"}, stdout, stderr) case "benchmark": return runBenchmark([]string{"--help"}, stdout, stderr) + case "bee-worker": + return runBeeWorker([]string{"--help"}, stdout, stderr) case "version": fmt.Fprintln(stdout, "usage: bee version") return 0 @@ -462,6 +467,28 @@ func runBenchmark(args []string, stdout, stderr io.Writer) int { return 0 } +func runBeeWorker(args []string, stdout, stderr io.Writer) int { + fs := flag.NewFlagSet("bee-worker", flag.ContinueOnError) + fs.SetOutput(stderr) + exportDir := fs.String("export-dir", app.DefaultExportDir, "directory with task state and artifacts") + taskID := fs.String("task-id", "", "task identifier, e.g. TASK-001") + fs.Usage = func() { + fmt.Fprintf(stderr, "usage: bee bee-worker --export-dir %s --task-id TASK-001\n", app.DefaultExportDir) + fs.PrintDefaults() + } + if err := fs.Parse(args); err != nil { + if err == flag.ErrHelp { + return 0 + } + return 2 + } + if fs.NArg() != 0 { + fs.Usage() + return 2 + } + return webui.RunPersistedTask(*exportDir, *taskID, stdout, stderr) +} + func parseBenchmarkIndexCSV(raw string) ([]int, error) { raw = strings.TrimSpace(raw) if raw == "" { diff --git a/audit/internal/platform/kill_workers.go b/audit/internal/platform/kill_workers.go index 6687756..1b890a8 100644 --- a/audit/internal/platform/kill_workers.go +++ b/audit/internal/platform/kill_workers.go @@ -18,6 +18,7 @@ var workerPatterns = []string{ "stress-ng", "stressapptest", "memtester", + "nvbandwidth", // DCGM diagnostic workers — nvvs is spawned by dcgmi diag and survives // if dcgmi is killed mid-run, leaving the GPU occupied (DCGM_ST_IN_USE). "nvvs", @@ -71,13 +72,19 @@ func KillTestWorkers() []KilledProcess { if idx := strings.LastIndexByte(exe, '/'); idx >= 0 { base = exe[idx+1:] } - for _, pat := range workerPatterns { - if strings.Contains(base, pat) || strings.Contains(exe, pat) { - _ = syscall.Kill(pid, syscall.SIGKILL) - killed = append(killed, KilledProcess{PID: pid, Name: base}) - break - } + if shouldKillWorkerProcess(exe, base) { + _ = syscall.Kill(pid, syscall.SIGKILL) + killed = append(killed, KilledProcess{PID: pid, Name: base}) } } return killed } + +func shouldKillWorkerProcess(exe, base string) bool { + for _, pat := range workerPatterns { + if strings.Contains(base, pat) || strings.Contains(exe, pat) { + return true + } + } + return false +} diff --git a/audit/internal/platform/kill_workers_test.go b/audit/internal/platform/kill_workers_test.go new file mode 100644 index 0000000..e8537fd --- /dev/null +++ b/audit/internal/platform/kill_workers_test.go @@ -0,0 +1,39 @@ +package platform + +import "testing" + +func TestShouldKillWorkerProcess(t *testing.T) { + tests := []struct { + name string + exe string + base string + want bool + }{ + { + name: "nvbandwidth executable", + exe: "/usr/libexec/datacenter-gpu-manager-4/plugins/cuda13/nvbandwidth", + base: "nvbandwidth", + want: true, + }, + { + name: "dcgmi executable", + exe: "/usr/bin/dcgmi", + base: "dcgmi", + want: true, + }, + { + name: "unrelated process", + exe: "/usr/bin/bash", + base: "bash", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := shouldKillWorkerProcess(tt.exe, tt.base); got != tt.want { + t.Fatalf("shouldKillWorkerProcess(%q, %q)=%v want %v", tt.exe, tt.base, got, tt.want) + } + }) + } +} diff --git a/audit/internal/webui/api.go b/audit/internal/webui/api.go index e5a53ca..036c6e7 100644 --- a/audit/internal/webui/api.go +++ b/audit/internal/webui/api.go @@ -806,15 +806,14 @@ func (h *handler) handleAPISATAbort(w http.ResponseWriter, r *http.Request) { now := time.Now() t.DoneAt = &now case TaskRunning: - if t.job != nil { - t.job.abort() + if t.job == nil || !t.job.abort() { + globalQueue.mu.Unlock() + writeJSON(w, map[string]string{"status": "not_running"}) + return } - if taskMayLeaveOrphanWorkers(t.Target) { - platform.KillTestWorkers() - } - t.Status = TaskCancelled - now := time.Now() - t.DoneAt = &now + globalQueue.mu.Unlock() + writeJSON(w, map[string]string{"status": "aborting"}) + return } globalQueue.mu.Unlock() writeJSON(w, map[string]string{"status": "aborted"}) diff --git a/audit/internal/webui/jobs.go b/audit/internal/webui/jobs.go index 5719091..496537a 100644 --- a/audit/internal/webui/jobs.go +++ b/audit/internal/webui/jobs.go @@ -20,7 +20,7 @@ type jobState struct { cancel func() // optional cancel function; nil if job is not cancellable logPath string serialPrefix string - logFile *os.File // kept open for the task lifetime to avoid per-line open/close + logFile *os.File // kept open for the task lifetime to avoid per-line open/close logBuf *bufio.Writer } @@ -53,13 +53,21 @@ func (j *jobState) abort() bool { } func (j *jobState) append(line string) { + j.appendWithOptions(line, true, true) +} + +func (j *jobState) appendFromLog(line string) { + j.appendWithOptions(line, false, false) +} + +func (j *jobState) appendWithOptions(line string, persistLog, serialMirror bool) { j.mu.Lock() defer j.mu.Unlock() j.lines = append(j.lines, line) - if j.logPath != "" { + if persistLog && j.logPath != "" { j.writeLogLineLocked(line) } - if j.serialPrefix != "" { + if serialMirror && j.serialPrefix != "" { taskSerialWriteLine(j.serialPrefix + line) } for _, ch := range j.subs { diff --git a/audit/internal/webui/page_install_tasks.go b/audit/internal/webui/page_install_tasks.go index 0f63851..b13fbcc 100644 --- a/audit/internal/webui/page_install_tasks.go +++ b/audit/internal/webui/page_install_tasks.go @@ -207,7 +207,7 @@ func renderInstall() string { func renderTasks() string { return `
- + Open a task to view its saved logs and charts.
@@ -289,7 +289,7 @@ function cancelAll() { fetch('/api/tasks/cancel-all',{method:'POST'}).then(()=>loadTasks()); } function killWorkers() { - if (!confirm('Send SIGKILL to all running test workers (bee-gpu-burn, stress-ng, stressapptest, memtester)?\n\nThis will also cancel all queued and running tasks.')) return; + if (!confirm('Abort all queued/running tasks and kill orphaned test workers (bee-gpu-burn, dcgmi, nvvs, nvbandwidth, stress-ng, stressapptest, memtester)?\n\nRunning bee-worker processes will first be asked to stop gracefully; orphaned test processes will then be killed.')) return; fetch('/api/tasks/kill-workers',{method:'POST'}) .then(r=>r.json()) .then(d=>{ diff --git a/audit/internal/webui/task_runner.go b/audit/internal/webui/task_runner.go new file mode 100644 index 0000000..feb2424 --- /dev/null +++ b/audit/internal/webui/task_runner.go @@ -0,0 +1,505 @@ +package webui + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + "bee/audit/internal/app" + "bee/audit/internal/platform" + "bee/audit/internal/runtimeenv" +) + +type taskRunnerState struct { + PID int `json:"pid"` + Status string `json:"status"` + Error string `json:"error,omitempty"` + UpdatedAt time.Time `json:"updated_at"` +} + +func taskRunnerStatePath(t *Task) string { + if t == nil || strings.TrimSpace(t.ArtifactsDir) == "" { + return "" + } + return filepath.Join(t.ArtifactsDir, "runner-state.json") +} + +func writeTaskRunnerState(t *Task, state taskRunnerState) error { + path := taskRunnerStatePath(t) + if path == "" { + return nil + } + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return err + } + tmp := path + ".tmp" + if err := os.WriteFile(tmp, data, 0644); err != nil { + return err + } + return os.Rename(tmp, path) +} + +func readTaskRunnerState(t *Task) (taskRunnerState, bool) { + path := taskRunnerStatePath(t) + if path == "" { + return taskRunnerState{}, false + } + data, err := os.ReadFile(path) + if err != nil || len(data) == 0 { + return taskRunnerState{}, false + } + var state taskRunnerState + if err := json.Unmarshal(data, &state); err != nil { + return taskRunnerState{}, false + } + return state, true +} + +func processAlive(pid int) bool { + if pid <= 0 { + return false + } + err := syscall.Kill(pid, 0) + return err == nil || err == syscall.EPERM +} + +func finalizeTaskForResult(t *Task, errMsg string, cancelled bool) { + now := time.Now() + t.DoneAt = &now + switch { + case cancelled: + t.Status = TaskCancelled + t.ErrMsg = "aborted" + case strings.TrimSpace(errMsg) != "": + t.Status = TaskFailed + t.ErrMsg = errMsg + default: + t.Status = TaskDone + t.ErrMsg = "" + } +} + +func executeTaskWithOptions(opts *HandlerOptions, t *Task, j *jobState, ctx context.Context) { + if opts == nil { + j.append("ERROR: handler options not configured") + j.finish("handler options not configured") + return + } + a := opts.App + + recovered := len(j.lines) > 0 + j.append(fmt.Sprintf("Starting %s...", t.Name)) + if recovered { + j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339))) + } + + var ( + archive string + err error + ) + + switch t.Target { + case "nvidia": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + diagLevel := 2 + if t.params.StressMode { + diagLevel = 3 + } + if len(t.params.GPUIndices) > 0 || diagLevel > 0 { + result, e := a.RunNvidiaAcceptancePackWithOptions(ctx, "", diagLevel, t.params.GPUIndices, j.append) + if e != nil { + err = e + } else { + archive = result.Body + } + } else { + archive, err = a.RunNvidiaAcceptancePack("", j.append) + } + case "nvidia-targeted-stress": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if dur <= 0 { + dur = 300 + } + archive, err = a.RunNvidiaTargetedStressValidatePack(ctx, "", dur, t.params.GPUIndices, j.append) + case "nvidia-bench-perf": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = a.RunNvidiaBenchmarkCtx(ctx, "", platform.NvidiaBenchmarkOptions{ + Profile: t.params.BenchmarkProfile, + SizeMB: t.params.SizeMB, + GPUIndices: t.params.GPUIndices, + ExcludeGPUIndices: t.params.ExcludeGPUIndices, + RunNCCL: t.params.RunNCCL, + ParallelGPUs: t.params.ParallelGPUs, + RampStep: t.params.RampStep, + RampTotal: t.params.RampTotal, + RampRunID: t.params.RampRunID, + }, j.append) + case "nvidia-bench-power": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = a.RunNvidiaPowerBenchCtx(ctx, app.DefaultBeeBenchPowerDir, platform.NvidiaBenchmarkOptions{ + Profile: t.params.BenchmarkProfile, + GPUIndices: t.params.GPUIndices, + ExcludeGPUIndices: t.params.ExcludeGPUIndices, + RampStep: t.params.RampStep, + RampTotal: t.params.RampTotal, + RampRunID: t.params.RampRunID, + }, j.append) + case "nvidia-bench-autotune": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = a.RunNvidiaPowerSourceAutotuneCtx(ctx, app.DefaultBeeBenchAutotuneDir, platform.NvidiaBenchmarkOptions{ + Profile: t.params.BenchmarkProfile, + SizeMB: t.params.SizeMB, + }, t.params.BenchmarkKind, j.append) + case "nvidia-compute": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices) + if planErr != nil { + err = planErr + break + } + if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 { + dur = rampPlan.DurationSec + } + if rampPlan.StaggerSeconds > 0 { + j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec)) + } + archive, err = a.RunNvidiaOfficialComputePack(ctx, "", dur, t.params.GPUIndices, rampPlan.StaggerSeconds, j.append) + case "nvidia-targeted-power": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + archive, err = a.RunNvidiaTargetedPowerPack(ctx, "", dur, t.params.GPUIndices, j.append) + case "nvidia-pulse": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + archive, err = a.RunNvidiaPulseTestPack(ctx, "", dur, t.params.GPUIndices, j.append) + case "nvidia-bandwidth": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = a.RunNvidiaBandwidthPack(ctx, "", t.params.GPUIndices, j.append) + case "nvidia-interconnect": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = a.RunNCCLTests(ctx, "", t.params.GPUIndices, j.append) + case "nvidia-stress": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + rampPlan, planErr := resolveNvidiaRampPlan(t.params.BurnProfile, t.params.StaggerGPUStart, t.params.GPUIndices) + if planErr != nil { + err = planErr + break + } + if t.params.BurnProfile != "" && t.params.StaggerGPUStart && dur <= 0 { + dur = rampPlan.DurationSec + } + if rampPlan.StaggerSeconds > 0 { + j.append(fmt.Sprintf("NVIDIA staggered ramp-up enabled: %ds per GPU; post-ramp hold: %ds; total runtime: %ds", rampPlan.StaggerSeconds, dur, rampPlan.TotalDurationSec)) + } + archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{ + DurationSec: dur, + Loader: t.params.Loader, + GPUIndices: t.params.GPUIndices, + ExcludeGPUIndices: t.params.ExcludeGPUIndices, + StaggerSeconds: rampPlan.StaggerSeconds, + }, j.append) + case "memory": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + sizeMB, passes := resolveMemoryValidatePreset(t.params.BurnProfile, t.params.StressMode) + j.append(fmt.Sprintf("Memory validate preset: %d MB x %d pass(es)", sizeMB, passes)) + archive, err = runMemoryAcceptancePackCtx(a, ctx, "", sizeMB, passes, j.append) + case "storage": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = runStorageAcceptancePackCtx(a, ctx, "", t.params.StressMode, j.append) + case "cpu": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + if dur <= 0 { + if t.params.StressMode { + dur = 1800 + } else { + dur = 60 + } + } + j.append(fmt.Sprintf("CPU stress duration: %ds", dur)) + archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append) + case "amd": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append) + case "amd-mem": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append) + case "amd-bandwidth": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append) + case "amd-stress": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append) + case "memory-stress": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append) + case "sat-stress": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + dur := t.params.Duration + if t.params.BurnProfile != "" && dur <= 0 { + dur = resolveBurnPreset(t.params.BurnProfile).DurationSec + } + archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append) + case "platform-stress": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + runOpts := resolvePlatformStressPreset(t.params.BurnProfile) + runOpts.Components = t.params.PlatformComponents + archive, err = a.RunPlatformStress(ctx, "", runOpts, j.append) + case "audit": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + result, e := a.RunAuditNow(opts.RuntimeMode) + if e != nil { + err = e + } else { + for _, line := range splitLines(result.Body) { + j.append(line) + } + } + case "support-bundle": + j.append("Building support bundle...") + archive, err = buildSupportBundle(opts.ExportDir) + case "install": + if strings.TrimSpace(t.params.Device) == "" { + err = fmt.Errorf("device is required") + break + } + installLogPath := platform.InstallLogPath(t.params.Device) + j.append("Install log: " + installLogPath) + err = streamCmdJob(j, installCommand(ctx, t.params.Device, installLogPath)) + case "install-to-ram": + if a == nil { + err = fmt.Errorf("app not configured") + break + } + err = a.RunInstallToRAM(ctx, j.append) + default: + j.append("ERROR: unknown target: " + t.Target) + j.finish("unknown target") + return + } + + if archive != "" { + archivePath := app.ExtractArchivePath(archive) + if err == nil && app.ReadSATOverallStatus(archivePath) == "FAILED" { + err = fmt.Errorf("SAT overall_status=FAILED (see summary.txt)") + } + if opts.App != nil && opts.App.StatusDB != nil { + app.ApplySATResultToDB(opts.App.StatusDB, t.Target, archivePath) + } + } + + if err != nil { + if ctx.Err() != nil { + j.append("Aborted.") + j.finish("aborted") + } else { + j.append("ERROR: " + err.Error()) + j.finish(err.Error()) + } + return + } + if archive != "" { + j.append("Archive: " + archive) + } + j.finish("") +} + +func loadPersistedTask(statePath, taskID string) (*Task, error) { + data, err := os.ReadFile(statePath) + if err != nil { + return nil, err + } + var persisted []persistedTask + if err := json.Unmarshal(data, &persisted); err != nil { + return nil, err + } + for _, pt := range persisted { + if pt.ID != taskID { + continue + } + t := &Task{ + ID: pt.ID, + Name: pt.Name, + Target: pt.Target, + Priority: pt.Priority, + Status: pt.Status, + CreatedAt: pt.CreatedAt, + StartedAt: pt.StartedAt, + DoneAt: pt.DoneAt, + ErrMsg: pt.ErrMsg, + LogPath: pt.LogPath, + ArtifactsDir: pt.ArtifactsDir, + ReportJSONPath: pt.ReportJSONPath, + ReportHTMLPath: pt.ReportHTMLPath, + params: pt.Params, + } + ensureTaskReportPaths(t) + return t, nil + } + return nil, fmt.Errorf("task %s not found", taskID) +} + +func RunPersistedTask(exportDir, taskID string, stdout, stderr io.Writer) int { + if strings.TrimSpace(exportDir) == "" || strings.TrimSpace(taskID) == "" { + fmt.Fprintln(stderr, "bee task-run: --export-dir and --task-id are required") + return 2 + } + + runtimeInfo, err := runtimeenv.Detect("auto") + if err != nil { + slog.Warn("resolve runtime for task-run", "err", err) + } + opts := &HandlerOptions{ + ExportDir: exportDir, + App: app.New(platform.New()), + RuntimeMode: runtimeInfo.Mode, + } + statePath := filepath.Join(exportDir, "tasks-state.json") + task, err := loadPersistedTask(statePath, taskID) + if err != nil { + fmt.Fprintln(stderr, err.Error()) + return 1 + } + if task.StartedAt == nil || task.StartedAt.IsZero() { + now := time.Now() + task.StartedAt = &now + } + if task.Status == "" { + task.Status = TaskRunning + } + if err := writeTaskRunnerState(task, taskRunnerState{ + PID: os.Getpid(), + Status: TaskRunning, + UpdatedAt: time.Now().UTC(), + }); err != nil { + fmt.Fprintln(stderr, err.Error()) + return 1 + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + j := newTaskJobState(task.LogPath, taskSerialPrefix(task)) + executeTaskWithOptions(opts, task, j, ctx) + finalizeTaskForResult(task, j.err, ctx.Err() != nil) + if err := writeTaskReportArtifacts(task); err != nil { + appendJobLog(task.LogPath, "WARN: task report generation failed: "+err.Error()) + } + j.closeLog() + if err := writeTaskRunnerState(task, taskRunnerState{ + PID: os.Getpid(), + Status: task.Status, + Error: task.ErrMsg, + UpdatedAt: time.Now().UTC(), + }); err != nil { + fmt.Fprintln(stderr, err.Error()) + } + if task.ErrMsg != "" { + return 1 + } + return 0 +} diff --git a/audit/internal/webui/tasks.go b/audit/internal/webui/tasks.go index 6672369..631aa09 100644 --- a/audit/internal/webui/tasks.go +++ b/audit/internal/webui/tasks.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "log/slog" "net/http" "os" @@ -13,6 +14,7 @@ import ( "sort" "strings" "sync" + "syscall" "time" "bee/audit/internal/app" @@ -110,8 +112,9 @@ type Task struct { ReportHTMLPath string `json:"report_html_path,omitempty"` // runtime fields (not serialised) - job *jobState - params taskParams + job *jobState + runnerPID int + params taskParams } // taskParams holds optional parameters parsed from the run request. @@ -328,6 +331,13 @@ var ( installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd { return exec.CommandContext(ctx, "bee-install", device, logPath) } + externalTaskRunnerCommand = func(exportDir, taskID string) (*exec.Cmd, error) { + exe, err := os.Executable() + if err != nil { + return nil, err + } + return exec.Command(exe, "bee-worker", "--export-dir", exportDir, "--task-id", taskID), nil + } ) // enqueue adds a task to the queue and notifies the worker. @@ -365,6 +375,11 @@ func (q *taskQueue) prune() { // nextPending returns the highest-priority pending task (nil if none). func (q *taskQueue) nextPending() *Task { + for _, t := range q.tasks { + if t.Status == TaskRunning { + return nil + } + } var best *Task for _, t := range q.tasks { if t.Status != TaskPending { @@ -484,6 +499,7 @@ func (q *taskQueue) startWorker(opts *HandlerOptions) { if !q.started { q.loadLocked() q.started = true + q.resumeRunningTasksLocked() goRecoverLoop("task worker", 2*time.Second, q.worker) } hasPending := q.nextPending() != nil @@ -517,15 +533,12 @@ func (q *taskQueue) worker() { t.StartedAt = &now t.DoneAt = nil t.ErrMsg = "" - j := newTaskJobState(t.LogPath, taskSerialPrefix(t)) + j := newTaskJobState(t.LogPath) t.job = j q.persistLocked() q.mu.Unlock() - taskCtx, taskCancel := context.WithCancel(context.Background()) - j.cancel = taskCancel - q.executeTask(t, j, taskCtx) - taskCancel() + q.runTaskExternal(t, j) q.mu.Lock() q.prune() @@ -537,6 +550,207 @@ func (q *taskQueue) worker() { } } +func (q *taskQueue) resumeRunningTasksLocked() { + for _, t := range q.tasks { + if t.Status != TaskRunning { + continue + } + if t.job == nil { + t.job = newTaskJobState(t.LogPath) + } + q.attachExternalTaskControlsLocked(t, t.job) + q.startRecoveredTaskMonitorLocked(t, t.job) + } +} + +func (q *taskQueue) attachExternalTaskControlsLocked(t *Task, j *jobState) { + if t == nil || j == nil { + return + } + j.cancel = func() { + pid := t.runnerPID + if pid <= 0 { + if state, ok := readTaskRunnerState(t); ok { + pid = state.PID + } + } + if pid > 0 { + _ = syscall.Kill(pid, syscall.SIGTERM) + } + } +} + +func (q *taskQueue) startRecoveredTaskMonitorLocked(t *Task, j *jobState) { + if t == nil || j == nil || t.runnerPID <= 0 { + return + } + goRecoverOnce("task runner monitor", func() { + stopTail := make(chan struct{}) + doneTail := make(chan struct{}) + go q.followTaskLog(t, j, stopTail, doneTail) + for processAlive(t.runnerPID) { + time.Sleep(500 * time.Millisecond) + } + close(stopTail) + <-doneTail + q.finishExternalTask(t, j, nil) + }) +} + +func (q *taskQueue) runTaskExternal(t *Task, j *jobState) { + stopTail := make(chan struct{}) + doneTail := make(chan struct{}) + defer func() { + close(stopTail) + <-doneTail + }() + go q.followTaskLog(t, j, stopTail, doneTail) + + cmd, err := externalTaskRunnerCommand(q.opts.ExportDir, t.ID) + if err != nil { + j.appendFromLog("ERROR: " + err.Error()) + q.finishExternalTask(t, j, err) + return + } + if err := cmd.Start(); err != nil { + j.appendFromLog("ERROR: " + err.Error()) + q.finishExternalTask(t, j, err) + return + } + + q.mu.Lock() + t.runnerPID = cmd.Process.Pid + q.attachExternalTaskControlsLocked(t, j) + q.persistLocked() + q.mu.Unlock() + + waitErr := cmd.Wait() + time.Sleep(200 * time.Millisecond) + q.finishExternalTask(t, j, waitErr) +} + +func (q *taskQueue) followTaskLog(t *Task, j *jobState, stop <-chan struct{}, done chan<- struct{}) { + defer close(done) + path := "" + if t != nil { + path = t.LogPath + } + if strings.TrimSpace(path) == "" { + return + } + offset := int64(0) + if info, err := os.Stat(path); err == nil { + offset = info.Size() + } + var partial string + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() + flush := func() { + data, newOffset, err := readTaskLogDelta(path, offset) + if err != nil || len(data) == 0 { + offset = newOffset + return + } + offset = newOffset + text := partial + strings.ReplaceAll(string(data), "\r\n", "\n") + lines := strings.Split(text, "\n") + partial = lines[len(lines)-1] + for _, line := range lines[:len(lines)-1] { + if line == "" { + continue + } + j.appendFromLog(line) + } + } + for { + select { + case <-ticker.C: + flush() + case <-stop: + flush() + if strings.TrimSpace(partial) != "" { + j.appendFromLog(partial) + } + return + } + } +} + +func readTaskLogDelta(path string, offset int64) ([]byte, int64, error) { + f, err := os.Open(path) + if err != nil { + return nil, offset, err + } + defer f.Close() + info, err := f.Stat() + if err != nil { + return nil, offset, err + } + if info.Size() < offset { + offset = 0 + } + if _, err := f.Seek(offset, io.SeekStart); err != nil { + return nil, offset, err + } + data, err := io.ReadAll(io.LimitReader(f, 1<<20)) + return data, offset + int64(len(data)), err +} + +func (q *taskQueue) finishExternalTask(t *Task, j *jobState, waitErr error) { + q.mu.Lock() + defer q.mu.Unlock() + if t.Status == TaskDone || t.Status == TaskFailed || t.Status == TaskCancelled { + if j != nil && !j.isDone() { + j.finish(t.ErrMsg) + j.closeLog() + } + select { + case q.trigger <- struct{}{}: + default: + } + return + } + + state, ok := readTaskRunnerState(t) + switch { + case ok && state.Status != TaskRunning: + t.Status = state.Status + t.ErrMsg = state.Error + now := state.UpdatedAt + if now.IsZero() { + now = time.Now() + } + t.DoneAt = &now + case waitErr != nil: + now := time.Now() + t.Status = TaskFailed + t.ErrMsg = waitErr.Error() + t.DoneAt = &now + default: + now := time.Now() + t.Status = TaskFailed + t.ErrMsg = "task runner exited without final state" + t.DoneAt = &now + } + t.runnerPID = 0 + q.finalizeTaskArtifactPathsLocked(t) + q.persistLocked() + + if j != nil && !j.isDone() { + j.finish(t.ErrMsg) + j.closeLog() + } + if t.ErrMsg != "" { + taskSerialEvent(t, "finished with status="+t.Status+" error="+t.ErrMsg) + } else { + taskSerialEvent(t, "finished with status="+t.Status) + } + select { + case q.trigger <- struct{}{}: + default: + } +} + func (q *taskQueue) executeTask(t *Task, j *jobState, ctx context.Context) { startedKmsgWatch := false defer q.finalizeTaskRun(t, j) @@ -985,15 +1199,11 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) { taskSerialEvent(t, "finished with status="+t.Status) writeJSON(w, map[string]string{"status": "cancelled"}) case TaskRunning: - if t.job != nil { - t.job.abort() + if t.job == nil || !t.job.abort() { + writeError(w, http.StatusConflict, "task is not cancellable") + return } - t.Status = TaskCancelled - now := time.Now() - t.DoneAt = &now - globalQueue.persistLocked() - taskSerialEvent(t, "finished with status="+t.Status) - writeJSON(w, map[string]string{"status": "cancelled"}) + writeJSON(w, map[string]string{"status": "aborting"}) default: writeError(w, http.StatusConflict, "task is not running or pending") } @@ -1039,12 +1249,6 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request if t.job != nil { t.job.abort() } - if taskMayLeaveOrphanWorkers(t.Target) { - platform.KillTestWorkers() - } - t.Status = TaskCancelled - t.DoneAt = &now - taskSerialEvent(t, "finished with status="+t.Status) n++ } } @@ -1175,18 +1379,29 @@ func (q *taskQueue) loadLocked() { } q.assignTaskLogPathLocked(t) if t.Status == TaskRunning { - // The task was interrupted by a bee-web restart. Child processes - // (e.g. bee-gpu-burn-worker, dcgmi/nvvs) survive the restart in - // their own process groups. Kill any matching stale workers before - // marking the task failed so the next GPU test does not inherit a - // busy DCGM slot or duplicate workers. - if taskMayLeaveOrphanWorkers(t.Target) { - _ = platform.KillTestWorkers() + state, ok := readTaskRunnerState(t) + switch { + case ok && state.Status == TaskRunning && processAlive(state.PID): + t.runnerPID = state.PID + t.job = newTaskJobState(t.LogPath) + case ok && state.Status != TaskRunning: + t.runnerPID = state.PID + t.Status = state.Status + t.ErrMsg = state.Error + now := state.UpdatedAt + if now.IsZero() { + now = time.Now() + } + t.DoneAt = &now + default: + if taskMayLeaveOrphanWorkers(t.Target) { + _ = platform.KillTestWorkers() + } + now := time.Now() + t.Status = TaskFailed + t.DoneAt = &now + t.ErrMsg = "interrupted by bee-web restart" } - now := time.Now() - t.Status = TaskFailed - t.DoneAt = &now - t.ErrMsg = "interrupted by bee-web restart" } else if t.Status == TaskPending { t.StartedAt = nil t.DoneAt = nil diff --git a/scripts/deploy.sh b/scripts/deploy.sh index 38ee9ed..a551d7b 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -47,6 +47,13 @@ echo "==> Сборка бинарника..." ) echo " OK: $(ls -lh "${LOCAL_BIN}" | awk '{print $5, $9}')" +LOCAL_SHA="$(shasum -a 256 "${LOCAL_BIN}" | awk '{print $1}')" +REMOTE_SHA="$("${SSH_CMD[@]}" "$REMOTE" "if [ -f '${REMOTE_BIN}' ] && command -v sha256sum >/dev/null 2>&1; then sha256sum '${REMOTE_BIN}' | awk '{print \\$1}'; fi" 2>/dev/null || true)" +if [[ -n "${REMOTE_SHA}" && "${LOCAL_SHA}" == "${REMOTE_SHA}" ]]; then + echo "==> Бинарник не изменился (${LOCAL_SHA}); копирование и перезапуск сервисов пропущены." + exit 0 +fi + # --- Deploy --- echo "==> Копирование на ${REMOTE}..." "${SCP_CMD[@]}" "${LOCAL_BIN}" "${REMOTE}:/tmp/bee-new"