package webui import ( "context" "encoding/json" "fmt" "net/http" "os" "os/exec" "path/filepath" "sort" "strings" "sync" "time" "bee/audit/internal/app" "bee/audit/internal/platform" ) // Task statuses. const ( TaskPending = "pending" TaskRunning = "running" TaskDone = "done" TaskFailed = "failed" TaskCancelled = "cancelled" ) // taskNames maps target → human-readable name for validate (SAT) runs. var taskNames = map[string]string{ "nvidia": "NVIDIA SAT", "nvidia-stress": "NVIDIA GPU Stress", "memory": "Memory SAT", "storage": "Storage SAT", "cpu": "CPU SAT", "amd": "AMD GPU SAT", "amd-mem": "AMD GPU MEM Integrity", "amd-bandwidth": "AMD GPU MEM Bandwidth", "amd-stress": "AMD GPU Burn-in", "memory-stress": "Memory Burn-in", "sat-stress": "SAT Stress (stressapptest)", "platform-stress": "Platform Thermal Cycling", "audit": "Audit", "support-bundle": "Support Bundle", "install": "Install to Disk", "install-to-ram": "Install to RAM", } // burnNames maps target → human-readable name when a burn profile is set. var burnNames = map[string]string{ "nvidia": "NVIDIA Burn-in", "memory": "Memory Burn-in", "cpu": "CPU Burn-in", "amd": "AMD GPU Burn-in", } func nvidiaStressTaskName(loader string) string { switch strings.TrimSpace(strings.ToLower(loader)) { case platform.NvidiaStressLoaderJohn: return "NVIDIA GPU Stress (John/OpenCL)" case platform.NvidiaStressLoaderNCCL: return "NVIDIA GPU Stress (NCCL)" default: return "NVIDIA GPU Stress (bee-gpu-burn)" } } func taskDisplayName(target, profile, loader string) string { name := taskNames[target] if profile != "" { if n, ok := burnNames[target]; ok { name = n } } if target == "nvidia-stress" { name = nvidiaStressTaskName(loader) } if name == "" { name = target } return name } // Task represents one unit of work in the queue. type Task struct { ID string `json:"id"` Name string `json:"name"` Target string `json:"target"` Priority int `json:"priority"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` DoneAt *time.Time `json:"done_at,omitempty"` ElapsedSec int `json:"elapsed_sec,omitempty"` ErrMsg string `json:"error,omitempty"` LogPath string `json:"log_path,omitempty"` // runtime fields (not serialised) job *jobState params taskParams } // taskParams holds optional parameters parsed from the run request. type taskParams struct { Duration int `json:"duration,omitempty"` DiagLevel int `json:"diag_level,omitempty"` GPUIndices []int `json:"gpu_indices,omitempty"` ExcludeGPUIndices []int `json:"exclude_gpu_indices,omitempty"` Loader string `json:"loader,omitempty"` BurnProfile string `json:"burn_profile,omitempty"` DisplayName string `json:"display_name,omitempty"` Device string `json:"device,omitempty"` // for install PlatformComponents []string `json:"platform_components,omitempty"` } type persistedTask struct { ID string `json:"id"` Name string `json:"name"` Target string `json:"target"` Priority int `json:"priority"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` DoneAt *time.Time `json:"done_at,omitempty"` ErrMsg string `json:"error,omitempty"` LogPath string `json:"log_path,omitempty"` Params taskParams `json:"params,omitempty"` } type burnPreset struct { NvidiaDiag int DurationSec int } func resolveBurnPreset(profile string) burnPreset { switch profile { case "overnight": return burnPreset{NvidiaDiag: 4, DurationSec: 8 * 60 * 60} case "acceptance": return burnPreset{NvidiaDiag: 3, DurationSec: 60 * 60} default: return burnPreset{NvidiaDiag: 1, DurationSec: 5 * 60} } } func resolvePlatformStressPreset(profile string) platform.PlatformStressOptions { switch profile { case "overnight": return platform.PlatformStressOptions{Cycles: []platform.PlatformStressCycle{ {LoadSec: 600, IdleSec: 120}, {LoadSec: 600, IdleSec: 60}, {LoadSec: 600, IdleSec: 30}, {LoadSec: 600, IdleSec: 120}, {LoadSec: 600, IdleSec: 60}, {LoadSec: 600, IdleSec: 30}, {LoadSec: 600, IdleSec: 120}, {LoadSec: 600, IdleSec: 60}, }} case "acceptance": return platform.PlatformStressOptions{Cycles: []platform.PlatformStressCycle{ {LoadSec: 300, IdleSec: 60}, {LoadSec: 300, IdleSec: 30}, {LoadSec: 300, IdleSec: 60}, {LoadSec: 300, IdleSec: 30}, }} default: // smoke return platform.PlatformStressOptions{Cycles: []platform.PlatformStressCycle{ {LoadSec: 90, IdleSec: 60}, {LoadSec: 90, IdleSec: 30}, }} } } // taskQueue manages a priority-ordered list of tasks and runs them one at a time. type taskQueue struct { mu sync.Mutex tasks []*Task trigger chan struct{} opts *HandlerOptions // set by startWorker statePath string logsDir string started bool kmsgWatcher *kmsgWatcher } var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)} const maxTaskHistory = 50 var ( runMemoryAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunMemoryAcceptancePackCtx(ctx, baseDir, logFunc) } runStorageAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunStorageAcceptancePackCtx(ctx, baseDir, logFunc) } runCPUAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunCPUAcceptancePackCtx(ctx, baseDir, durationSec, logFunc) } runAMDAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunAMDAcceptancePackCtx(ctx, baseDir, logFunc) } runAMDMemIntegrityPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunAMDMemIntegrityPackCtx(ctx, baseDir, logFunc) } runAMDMemBandwidthPackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) { return a.RunAMDMemBandwidthPackCtx(ctx, baseDir, logFunc) } runNvidiaStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, opts platform.NvidiaStressOptions, logFunc func(string)) (string, error) { return a.RunNvidiaStressPackCtx(ctx, baseDir, opts, logFunc) } runAMDStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunAMDStressPackCtx(ctx, baseDir, durationSec, logFunc) } runMemoryStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunMemoryStressPackCtx(ctx, baseDir, durationSec, logFunc) } runSATStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) { return a.RunSATStressPackCtx(ctx, baseDir, durationSec, logFunc) } buildSupportBundle = app.BuildSupportBundle installCommand = func(ctx context.Context, device string, logPath string) *exec.Cmd { return exec.CommandContext(ctx, "bee-install", device, logPath) } ) // enqueue adds a task to the queue and notifies the worker. func (q *taskQueue) enqueue(t *Task) { q.mu.Lock() q.assignTaskLogPathLocked(t) q.tasks = append(q.tasks, t) q.prune() q.persistLocked() q.mu.Unlock() select { case q.trigger <- struct{}{}: default: } } // prune removes oldest completed tasks beyond maxTaskHistory. func (q *taskQueue) prune() { var done []*Task var active []*Task for _, t := range q.tasks { switch t.Status { case TaskDone, TaskFailed, TaskCancelled: done = append(done, t) default: active = append(active, t) } } if len(done) > maxTaskHistory { done = done[len(done)-maxTaskHistory:] } q.tasks = append(active, done...) } // nextPending returns the highest-priority pending task (nil if none). func (q *taskQueue) nextPending() *Task { var best *Task for _, t := range q.tasks { if t.Status != TaskPending { continue } if best == nil || t.Priority > best.Priority || (t.Priority == best.Priority && t.CreatedAt.Before(best.CreatedAt)) { best = t } } return best } // findByID looks up a task by ID. func (q *taskQueue) findByID(id string) (*Task, bool) { q.mu.Lock() defer q.mu.Unlock() for _, t := range q.tasks { if t.ID == id { return t, true } } return nil, false } // findJob returns the jobState for a task ID (for SSE streaming compatibility). func (q *taskQueue) findJob(id string) (*jobState, bool) { t, ok := q.findByID(id) if !ok || t.job == nil { return nil, false } return t.job, true } type taskStreamSource struct { status string errMsg string logPath string job *jobState } func (q *taskQueue) taskStreamSource(id string) (taskStreamSource, bool) { q.mu.Lock() defer q.mu.Unlock() for _, t := range q.tasks { if t.ID != id { continue } return taskStreamSource{ status: t.Status, errMsg: t.ErrMsg, logPath: t.LogPath, job: t.job, }, true } return taskStreamSource{}, false } func (q *taskQueue) hasActiveTarget(target string) bool { q.mu.Lock() defer q.mu.Unlock() for _, t := range q.tasks { if t.Target != target { continue } if t.Status == TaskPending || t.Status == TaskRunning { return true } } return false } // snapshot returns a copy of all tasks sorted for display with newest tasks first. func (q *taskQueue) snapshot() []Task { q.mu.Lock() defer q.mu.Unlock() out := make([]Task, len(q.tasks)) for i, t := range q.tasks { out[i] = *t out[i].ElapsedSec = taskElapsedSec(&out[i], time.Now()) } sort.SliceStable(out, func(i, j int) bool { if !out[i].CreatedAt.Equal(out[j].CreatedAt) { return out[i].CreatedAt.After(out[j].CreatedAt) } si := statusOrder(out[i].Status) sj := statusOrder(out[j].Status) if si != sj { return si < sj } if out[i].Priority != out[j].Priority { return out[i].Priority > out[j].Priority } return out[i].Name < out[j].Name }) return out } func statusOrder(s string) int { switch s { case TaskRunning: return 0 case TaskPending: return 1 default: return 2 } } // startWorker launches the queue runner goroutine. func (q *taskQueue) startWorker(opts *HandlerOptions) { q.mu.Lock() q.opts = opts q.statePath = filepath.Join(opts.ExportDir, "tasks-state.json") q.logsDir = filepath.Join(opts.ExportDir, "tasks") _ = os.MkdirAll(q.logsDir, 0755) if !q.started { q.loadLocked() q.started = true go q.worker() } hasPending := q.nextPending() != nil q.mu.Unlock() if hasPending { select { case q.trigger <- struct{}{}: default: } } } func (q *taskQueue) worker() { for { <-q.trigger setCPUGovernor("performance") // Drain all pending tasks and start them in parallel. q.mu.Lock() var batch []*Task for { t := q.nextPending() if t == nil { break } now := time.Now() t.Status = TaskRunning t.StartedAt = &now t.DoneAt = nil t.ErrMsg = "" j := newTaskJobState(t.LogPath) t.job = j batch = append(batch, t) } if len(batch) > 0 { q.persistLocked() } q.mu.Unlock() var wg sync.WaitGroup for _, t := range batch { t := t j := t.job taskCtx, taskCancel := context.WithCancel(context.Background()) j.cancel = taskCancel wg.Add(1) go func() { defer wg.Done() if q.kmsgWatcher != nil && isSATTarget(t.Target) { q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target) } q.runTask(t, j, taskCtx) if q.kmsgWatcher != nil { q.kmsgWatcher.NotifyTaskFinished(t.ID) } q.mu.Lock() now2 := time.Now() t.DoneAt = &now2 if t.Status == TaskRunning { if j.err != "" { t.Status = TaskFailed t.ErrMsg = j.err } else { t.Status = TaskDone } } q.persistLocked() q.mu.Unlock() }() } wg.Wait() if len(batch) > 0 { q.mu.Lock() q.prune() q.persistLocked() q.mu.Unlock() } setCPUGovernor("powersave") } } // setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files. // Silently ignores errors (e.g. when cpufreq is not available). func setCPUGovernor(governor string) { matches, err := filepath.Glob("/sys/devices/system/cpu/cpu*/cpufreq/scaling_governor") if err != nil || len(matches) == 0 { return } for _, path := range matches { _ = os.WriteFile(path, []byte(governor), 0644) } } // runTask executes the work for a task, writing output to j. func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) { if q.opts == nil { j.append("ERROR: handler options not configured") j.finish("handler options not configured") return } a := q.opts.App j.append(fmt.Sprintf("Starting %s...", t.Name)) if len(j.lines) > 0 { j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339))) } var ( archive string err error ) switch t.Target { case "nvidia": if a == nil { err = fmt.Errorf("app not configured") break } diagLevel := t.params.DiagLevel if t.params.BurnProfile != "" && diagLevel <= 0 { diagLevel = resolveBurnPreset(t.params.BurnProfile).NvidiaDiag } if len(t.params.GPUIndices) > 0 || diagLevel > 0 { result, e := a.RunNvidiaAcceptancePackWithOptions( ctx, "", diagLevel, t.params.GPUIndices, j.append, ) if e != nil { err = e } else { archive = result.Body } } else { archive, err = a.RunNvidiaAcceptancePack("", j.append) } case "nvidia-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runNvidiaStressPackCtx(a, ctx, "", platform.NvidiaStressOptions{ DurationSec: dur, Loader: t.params.Loader, GPUIndices: t.params.GPUIndices, ExcludeGPUIndices: t.params.ExcludeGPUIndices, }, j.append) case "memory": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runMemoryAcceptancePackCtx(a, ctx, "", j.append) case "storage": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runStorageAcceptancePackCtx(a, ctx, "", j.append) case "cpu": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } if dur <= 0 { dur = 60 } j.append(fmt.Sprintf("CPU stress duration: %ds", dur)) archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append) case "amd": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append) case "amd-mem": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runAMDMemIntegrityPackCtx(a, ctx, "", j.append) case "amd-bandwidth": if a == nil { err = fmt.Errorf("app not configured") break } archive, err = runAMDMemBandwidthPackCtx(a, ctx, "", j.append) case "amd-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append) case "memory-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append) case "sat-stress": if a == nil { err = fmt.Errorf("app not configured") break } dur := t.params.Duration if t.params.BurnProfile != "" && dur <= 0 { dur = resolveBurnPreset(t.params.BurnProfile).DurationSec } archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append) case "platform-stress": if a == nil { err = fmt.Errorf("app not configured") break } opts := resolvePlatformStressPreset(t.params.BurnProfile) opts.Components = t.params.PlatformComponents archive, err = a.RunPlatformStress(ctx, "", opts, j.append) case "audit": if a == nil { err = fmt.Errorf("app not configured") break } result, e := a.RunAuditNow(q.opts.RuntimeMode) if e != nil { err = e } else { for _, line := range splitLines(result.Body) { j.append(line) } } case "support-bundle": j.append("Building support bundle...") archive, err = buildSupportBundle(q.opts.ExportDir) case "install": if strings.TrimSpace(t.params.Device) == "" { err = fmt.Errorf("device is required") break } installLogPath := platform.InstallLogPath(t.params.Device) j.append("Install log: " + installLogPath) err = streamCmdJob(j, installCommand(ctx, t.params.Device, installLogPath)) case "install-to-ram": if a == nil { err = fmt.Errorf("app not configured") break } err = a.RunInstallToRAM(ctx, j.append) default: j.append("ERROR: unknown target: " + t.Target) j.finish("unknown target") return } // If the SAT archive was produced, check overall_status and write to component DB. if archive != "" { archivePath := app.ExtractArchivePath(archive) if err == nil { if app.ReadSATOverallStatus(archivePath) == "FAILED" { err = fmt.Errorf("SAT overall_status=FAILED (see summary.txt)") } } if db := q.statusDB(); db != nil { app.ApplySATResultToDB(db, t.Target, archivePath) } } if err != nil { if ctx.Err() != nil { j.append("Aborted.") j.finish("aborted") } else { j.append("ERROR: " + err.Error()) j.finish(err.Error()) } return } if archive != "" { j.append("Archive: " + archive) } j.finish("") } func (q *taskQueue) statusDB() *app.ComponentStatusDB { if q.opts == nil || q.opts.App == nil { return nil } return q.opts.App.StatusDB } func splitLines(s string) []string { var out []string for _, l := range splitNL(s) { if l != "" { out = append(out, l) } } return out } func splitNL(s string) []string { var out []string start := 0 for i, c := range s { if c == '\n' { out = append(out, s[start:i]) start = i + 1 } } out = append(out, s[start:]) return out } // ── HTTP handlers ───────────────────────────────────────────────────────────── func (h *handler) handleAPITasksList(w http.ResponseWriter, _ *http.Request) { tasks := globalQueue.snapshot() writeJSON(w, tasks) } func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") t, ok := globalQueue.findByID(id) if !ok { writeError(w, http.StatusNotFound, "task not found") return } globalQueue.mu.Lock() defer globalQueue.mu.Unlock() switch t.Status { case TaskPending: t.Status = TaskCancelled now := time.Now() t.DoneAt = &now globalQueue.persistLocked() writeJSON(w, map[string]string{"status": "cancelled"}) case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled now := time.Now() t.DoneAt = &now globalQueue.persistLocked() writeJSON(w, map[string]string{"status": "cancelled"}) default: writeError(w, http.StatusConflict, "task is not running or pending") } } func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") t, ok := globalQueue.findByID(id) if !ok { writeError(w, http.StatusNotFound, "task not found") return } var req struct { Delta int `json:"delta"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid body") return } globalQueue.mu.Lock() defer globalQueue.mu.Unlock() if t.Status != TaskPending { writeError(w, http.StatusConflict, "only pending tasks can be reprioritised") return } t.Priority += req.Delta globalQueue.persistLocked() writeJSON(w, map[string]int{"priority": t.Priority}) } func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request) { globalQueue.mu.Lock() now := time.Now() n := 0 for _, t := range globalQueue.tasks { switch t.Status { case TaskPending: t.Status = TaskCancelled t.DoneAt = &now n++ case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled t.DoneAt = &now n++ } } globalQueue.persistLocked() globalQueue.mu.Unlock() writeJSON(w, map[string]int{"cancelled": n}) } func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Request) { // Cancel all queued/running tasks in the queue first. globalQueue.mu.Lock() now := time.Now() cancelled := 0 for _, t := range globalQueue.tasks { switch t.Status { case TaskPending: t.Status = TaskCancelled t.DoneAt = &now cancelled++ case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled t.DoneAt = &now cancelled++ } } globalQueue.persistLocked() globalQueue.mu.Unlock() // Kill orphaned test worker processes at the OS level. killed := platform.KillTestWorkers() writeJSON(w, map[string]any{ "cancelled": cancelled, "killed": len(killed), "processes": killed, }) } func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") src, ok := globalQueue.taskStreamSource(id) if !ok { http.Error(w, "task not found", http.StatusNotFound) return } if src.job != nil { streamJob(w, r, src.job) return } if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled { j := newTaskJobState(src.logPath) j.finish(src.errMsg) streamJob(w, r, j) return } if !sseStart(w) { return } sseWrite(w, "", "Task is queued. Waiting for worker...") ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: src, ok = globalQueue.taskStreamSource(id) if !ok { sseWrite(w, "done", "task not found") return } if src.job != nil { streamSubscribedJob(w, r, src.job) return } if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled { j := newTaskJobState(src.logPath) j.finish(src.errMsg) streamSubscribedJob(w, r, j) return } case <-r.Context().Done(): return } } } func (q *taskQueue) assignTaskLogPathLocked(t *Task) { if t.LogPath != "" || q.logsDir == "" || t.ID == "" { return } t.LogPath = filepath.Join(q.logsDir, t.ID+".log") } func (q *taskQueue) loadLocked() { if q.statePath == "" { return } data, err := os.ReadFile(q.statePath) if err != nil || len(data) == 0 { return } var persisted []persistedTask if err := json.Unmarshal(data, &persisted); err != nil { return } for _, pt := range persisted { t := &Task{ ID: pt.ID, Name: pt.Name, Target: pt.Target, Priority: pt.Priority, Status: pt.Status, CreatedAt: pt.CreatedAt, StartedAt: pt.StartedAt, DoneAt: pt.DoneAt, ErrMsg: pt.ErrMsg, LogPath: pt.LogPath, params: pt.Params, } q.assignTaskLogPathLocked(t) if t.Status == TaskRunning { // The task was interrupted by a bee-web restart. Child processes // (e.g. bee-gpu-burn-worker) survive the restart in their own // process groups and cannot be cancelled retroactively. Mark the // task as failed so the user can decide whether to re-run it // rather than blindly re-launching duplicate workers. now := time.Now() t.Status = TaskFailed t.DoneAt = &now t.ErrMsg = "interrupted by bee-web restart" } else if t.Status == TaskPending { t.StartedAt = nil t.DoneAt = nil t.ErrMsg = "" } q.tasks = append(q.tasks, t) } q.prune() q.persistLocked() } func (q *taskQueue) persistLocked() { if q.statePath == "" { return } state := make([]persistedTask, 0, len(q.tasks)) for _, t := range q.tasks { state = append(state, persistedTask{ ID: t.ID, Name: t.Name, Target: t.Target, Priority: t.Priority, Status: t.Status, CreatedAt: t.CreatedAt, StartedAt: t.StartedAt, DoneAt: t.DoneAt, ErrMsg: t.ErrMsg, LogPath: t.LogPath, Params: t.params, }) } data, err := json.MarshalIndent(state, "", " ") if err != nil { return } tmp := q.statePath + ".tmp" if err := os.WriteFile(tmp, data, 0644); err != nil { return } _ = os.Rename(tmp, q.statePath) } func taskElapsedSec(t *Task, now time.Time) int { if t == nil || t.StartedAt == nil || t.StartedAt.IsZero() { return 0 } start := *t.StartedAt if !t.CreatedAt.IsZero() && start.Before(t.CreatedAt) { start = t.CreatedAt } end := now if t.DoneAt != nil && !t.DoneAt.IsZero() { end = *t.DoneAt } if end.Before(start) { return 0 } return int(end.Sub(start).Round(time.Second) / time.Second) }