release: v3.1
This commit is contained in:
@@ -10,6 +10,8 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"bee/audit/internal/app"
|
||||
)
|
||||
|
||||
// Task statuses.
|
||||
@@ -23,10 +25,10 @@ const (
|
||||
|
||||
// taskNames maps target → human-readable name.
|
||||
var taskNames = map[string]string{
|
||||
"nvidia": "NVIDIA SAT",
|
||||
"memory": "Memory SAT",
|
||||
"storage": "Storage SAT",
|
||||
"cpu": "CPU SAT",
|
||||
"nvidia": "NVIDIA SAT",
|
||||
"memory": "Memory SAT",
|
||||
"storage": "Storage SAT",
|
||||
"cpu": "CPU SAT",
|
||||
"amd": "AMD GPU SAT",
|
||||
"amd-stress": "AMD GPU Burn-in",
|
||||
"memory-stress": "Memory Burn-in",
|
||||
@@ -47,6 +49,7 @@ type Task struct {
|
||||
StartedAt *time.Time `json:"started_at,omitempty"`
|
||||
DoneAt *time.Time `json:"done_at,omitempty"`
|
||||
ErrMsg string `json:"error,omitempty"`
|
||||
LogPath string `json:"log_path,omitempty"`
|
||||
|
||||
// runtime fields (not serialised)
|
||||
job *jobState
|
||||
@@ -55,29 +58,90 @@ type Task struct {
|
||||
|
||||
// taskParams holds optional parameters parsed from the run request.
|
||||
type taskParams struct {
|
||||
Duration int
|
||||
DiagLevel int
|
||||
GPUIndices []int
|
||||
Device string // for install
|
||||
Duration int `json:"duration,omitempty"`
|
||||
DiagLevel int `json:"diag_level,omitempty"`
|
||||
GPUIndices []int `json:"gpu_indices,omitempty"`
|
||||
BurnProfile string `json:"burn_profile,omitempty"`
|
||||
DisplayName string `json:"display_name,omitempty"`
|
||||
Device string `json:"device,omitempty"` // for install
|
||||
}
|
||||
|
||||
type persistedTask struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Target string `json:"target"`
|
||||
Priority int `json:"priority"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
StartedAt *time.Time `json:"started_at,omitempty"`
|
||||
DoneAt *time.Time `json:"done_at,omitempty"`
|
||||
ErrMsg string `json:"error,omitempty"`
|
||||
LogPath string `json:"log_path,omitempty"`
|
||||
Params taskParams `json:"params,omitempty"`
|
||||
}
|
||||
|
||||
type burnPreset struct {
|
||||
NvidiaDiag int
|
||||
DurationSec int
|
||||
}
|
||||
|
||||
func resolveBurnPreset(profile string) burnPreset {
|
||||
switch profile {
|
||||
case "overnight":
|
||||
return burnPreset{NvidiaDiag: 4, DurationSec: 8 * 60 * 60}
|
||||
case "acceptance":
|
||||
return burnPreset{NvidiaDiag: 3, DurationSec: 60 * 60}
|
||||
default:
|
||||
return burnPreset{NvidiaDiag: 1, DurationSec: 5 * 60}
|
||||
}
|
||||
}
|
||||
|
||||
// taskQueue manages a priority-ordered list of tasks and runs them one at a time.
|
||||
type taskQueue struct {
|
||||
mu sync.Mutex
|
||||
tasks []*Task
|
||||
trigger chan struct{}
|
||||
opts *HandlerOptions // set by startWorker
|
||||
mu sync.Mutex
|
||||
tasks []*Task
|
||||
trigger chan struct{}
|
||||
opts *HandlerOptions // set by startWorker
|
||||
statePath string
|
||||
logsDir string
|
||||
started bool
|
||||
}
|
||||
|
||||
var globalQueue = &taskQueue{trigger: make(chan struct{}, 1)}
|
||||
|
||||
const maxTaskHistory = 50
|
||||
|
||||
var (
|
||||
runMemoryAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
||||
return a.RunMemoryAcceptancePackCtx(ctx, baseDir, logFunc)
|
||||
}
|
||||
runStorageAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
||||
return a.RunStorageAcceptancePackCtx(ctx, baseDir, logFunc)
|
||||
}
|
||||
runCPUAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
||||
return a.RunCPUAcceptancePackCtx(ctx, baseDir, durationSec, logFunc)
|
||||
}
|
||||
runAMDAcceptancePackCtx = func(a *app.App, ctx context.Context, baseDir string, logFunc func(string)) (string, error) {
|
||||
return a.RunAMDAcceptancePackCtx(ctx, baseDir, logFunc)
|
||||
}
|
||||
runAMDStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
||||
return a.RunAMDStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
||||
}
|
||||
runMemoryStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
||||
return a.RunMemoryStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
||||
}
|
||||
runSATStressPackCtx = func(a *app.App, ctx context.Context, baseDir string, durationSec int, logFunc func(string)) (string, error) {
|
||||
return a.RunSATStressPackCtx(ctx, baseDir, durationSec, logFunc)
|
||||
}
|
||||
)
|
||||
|
||||
// enqueue adds a task to the queue and notifies the worker.
|
||||
func (q *taskQueue) enqueue(t *Task) {
|
||||
q.mu.Lock()
|
||||
q.assignTaskLogPathLocked(t)
|
||||
q.tasks = append(q.tasks, t)
|
||||
q.prune()
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
select {
|
||||
case q.trigger <- struct{}{}:
|
||||
@@ -139,6 +203,20 @@ func (q *taskQueue) findJob(id string) (*jobState, bool) {
|
||||
return t.job, true
|
||||
}
|
||||
|
||||
func (q *taskQueue) hasActiveTarget(target string) bool {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
for _, t := range q.tasks {
|
||||
if t.Target != target {
|
||||
continue
|
||||
}
|
||||
if t.Status == TaskPending || t.Status == TaskRunning {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// snapshot returns a copy of all tasks sorted for display (running first, then pending by priority, then done by doneAt desc).
|
||||
func (q *taskQueue) snapshot() []Task {
|
||||
q.mu.Lock()
|
||||
@@ -174,8 +252,24 @@ func statusOrder(s string) int {
|
||||
|
||||
// startWorker launches the queue runner goroutine.
|
||||
func (q *taskQueue) startWorker(opts *HandlerOptions) {
|
||||
q.mu.Lock()
|
||||
q.opts = opts
|
||||
go q.worker()
|
||||
q.statePath = filepath.Join(opts.ExportDir, "tasks-state.json")
|
||||
q.logsDir = filepath.Join(opts.ExportDir, "tasks")
|
||||
_ = os.MkdirAll(q.logsDir, 0755)
|
||||
if !q.started {
|
||||
q.loadLocked()
|
||||
q.started = true
|
||||
go q.worker()
|
||||
}
|
||||
hasPending := q.nextPending() != nil
|
||||
q.mu.Unlock()
|
||||
if hasPending {
|
||||
select {
|
||||
case q.trigger <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *taskQueue) worker() {
|
||||
@@ -192,10 +286,13 @@ func (q *taskQueue) worker() {
|
||||
now := time.Now()
|
||||
t.Status = TaskRunning
|
||||
t.StartedAt = &now
|
||||
j := &jobState{}
|
||||
t.DoneAt = nil
|
||||
t.ErrMsg = ""
|
||||
j := newTaskJobState(t.LogPath)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
j.cancel = cancel
|
||||
t.job = j
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
|
||||
q.runTask(t, j, ctx)
|
||||
@@ -212,6 +309,7 @@ func (q *taskQueue) worker() {
|
||||
}
|
||||
}
|
||||
q.prune()
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
}
|
||||
setCPUGovernor("powersave")
|
||||
@@ -240,6 +338,9 @@ func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
|
||||
a := q.opts.App
|
||||
|
||||
j.append(fmt.Sprintf("Starting %s...", t.Name))
|
||||
if len(j.lines) > 0 {
|
||||
j.append(fmt.Sprintf("Recovered after bee-web restart at %s", time.Now().UTC().Format(time.RFC3339)))
|
||||
}
|
||||
|
||||
var (
|
||||
archive string
|
||||
@@ -248,9 +349,13 @@ func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
|
||||
|
||||
switch t.Target {
|
||||
case "nvidia":
|
||||
if len(t.params.GPUIndices) > 0 || t.params.DiagLevel > 0 {
|
||||
diagLevel := t.params.DiagLevel
|
||||
if t.params.BurnProfile != "" && diagLevel <= 0 {
|
||||
diagLevel = resolveBurnPreset(t.params.BurnProfile).NvidiaDiag
|
||||
}
|
||||
if len(t.params.GPUIndices) > 0 || diagLevel > 0 {
|
||||
result, e := a.RunNvidiaAcceptancePackWithOptions(
|
||||
ctx, "", t.params.DiagLevel, t.params.GPUIndices, j.append,
|
||||
ctx, "", diagLevel, t.params.GPUIndices, j.append,
|
||||
)
|
||||
if e != nil {
|
||||
err = e
|
||||
@@ -261,23 +366,38 @@ func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
|
||||
archive, err = a.RunNvidiaAcceptancePack("", j.append)
|
||||
}
|
||||
case "memory":
|
||||
archive, err = a.RunMemoryAcceptancePack("", j.append)
|
||||
archive, err = runMemoryAcceptancePackCtx(a, ctx, "", j.append)
|
||||
case "storage":
|
||||
archive, err = a.RunStorageAcceptancePack("", j.append)
|
||||
archive, err = runStorageAcceptancePackCtx(a, ctx, "", j.append)
|
||||
case "cpu":
|
||||
dur := t.params.Duration
|
||||
if t.params.BurnProfile != "" && dur <= 0 {
|
||||
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
||||
}
|
||||
if dur <= 0 {
|
||||
dur = 60
|
||||
}
|
||||
archive, err = a.RunCPUAcceptancePack("", dur, j.append)
|
||||
archive, err = runCPUAcceptancePackCtx(a, ctx, "", dur, j.append)
|
||||
case "amd":
|
||||
archive, err = a.RunAMDAcceptancePack("", j.append)
|
||||
archive, err = runAMDAcceptancePackCtx(a, ctx, "", j.append)
|
||||
case "amd-stress":
|
||||
archive, err = a.RunAMDStressPack("", j.append)
|
||||
dur := t.params.Duration
|
||||
if t.params.BurnProfile != "" && dur <= 0 {
|
||||
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
||||
}
|
||||
archive, err = runAMDStressPackCtx(a, ctx, "", dur, j.append)
|
||||
case "memory-stress":
|
||||
archive, err = a.RunMemoryStressPack("", j.append)
|
||||
dur := t.params.Duration
|
||||
if t.params.BurnProfile != "" && dur <= 0 {
|
||||
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
||||
}
|
||||
archive, err = runMemoryStressPackCtx(a, ctx, "", dur, j.append)
|
||||
case "sat-stress":
|
||||
archive, err = a.RunSATStressPack("", j.append)
|
||||
dur := t.params.Duration
|
||||
if t.params.BurnProfile != "" && dur <= 0 {
|
||||
dur = resolveBurnPreset(t.params.BurnProfile).DurationSec
|
||||
}
|
||||
archive, err = runSATStressPackCtx(a, ctx, "", dur, j.append)
|
||||
case "audit":
|
||||
result, e := a.RunAuditNow(q.opts.RuntimeMode)
|
||||
if e != nil {
|
||||
@@ -288,7 +408,7 @@ func (q *taskQueue) runTask(t *Task, j *jobState, ctx context.Context) {
|
||||
}
|
||||
}
|
||||
case "install-to-ram":
|
||||
err = a.RunInstallToRAM(j.append)
|
||||
err = a.RunInstallToRAM(ctx, j.append)
|
||||
default:
|
||||
j.append("ERROR: unknown target: " + t.Target)
|
||||
j.finish("unknown target")
|
||||
@@ -355,6 +475,7 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
|
||||
t.Status = TaskCancelled
|
||||
now := time.Now()
|
||||
t.DoneAt = &now
|
||||
globalQueue.persistLocked()
|
||||
writeJSON(w, map[string]string{"status": "cancelled"})
|
||||
case TaskRunning:
|
||||
if t.job != nil {
|
||||
@@ -363,6 +484,7 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
|
||||
t.Status = TaskCancelled
|
||||
now := time.Now()
|
||||
t.DoneAt = &now
|
||||
globalQueue.persistLocked()
|
||||
writeJSON(w, map[string]string{"status": "cancelled"})
|
||||
default:
|
||||
writeError(w, http.StatusConflict, "task is not running or pending")
|
||||
@@ -390,6 +512,7 @@ func (h *handler) handleAPITasksPriority(w http.ResponseWriter, r *http.Request)
|
||||
return
|
||||
}
|
||||
t.Priority += req.Delta
|
||||
globalQueue.persistLocked()
|
||||
writeJSON(w, map[string]int{"priority": t.Priority})
|
||||
}
|
||||
|
||||
@@ -412,6 +535,7 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request
|
||||
n++
|
||||
}
|
||||
}
|
||||
globalQueue.persistLocked()
|
||||
globalQueue.mu.Unlock()
|
||||
writeJSON(w, map[string]int{"cancelled": n})
|
||||
}
|
||||
@@ -434,3 +558,79 @@ func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
streamJob(w, r, j)
|
||||
}
|
||||
|
||||
func (q *taskQueue) assignTaskLogPathLocked(t *Task) {
|
||||
if t.LogPath != "" || q.logsDir == "" || t.ID == "" {
|
||||
return
|
||||
}
|
||||
t.LogPath = filepath.Join(q.logsDir, t.ID+".log")
|
||||
}
|
||||
|
||||
func (q *taskQueue) loadLocked() {
|
||||
if q.statePath == "" {
|
||||
return
|
||||
}
|
||||
data, err := os.ReadFile(q.statePath)
|
||||
if err != nil || len(data) == 0 {
|
||||
return
|
||||
}
|
||||
var persisted []persistedTask
|
||||
if err := json.Unmarshal(data, &persisted); err != nil {
|
||||
return
|
||||
}
|
||||
for _, pt := range persisted {
|
||||
t := &Task{
|
||||
ID: pt.ID,
|
||||
Name: pt.Name,
|
||||
Target: pt.Target,
|
||||
Priority: pt.Priority,
|
||||
Status: pt.Status,
|
||||
CreatedAt: pt.CreatedAt,
|
||||
StartedAt: pt.StartedAt,
|
||||
DoneAt: pt.DoneAt,
|
||||
ErrMsg: pt.ErrMsg,
|
||||
LogPath: pt.LogPath,
|
||||
params: pt.Params,
|
||||
}
|
||||
q.assignTaskLogPathLocked(t)
|
||||
if t.Status == TaskPending || t.Status == TaskRunning {
|
||||
t.Status = TaskPending
|
||||
t.DoneAt = nil
|
||||
t.ErrMsg = ""
|
||||
}
|
||||
q.tasks = append(q.tasks, t)
|
||||
}
|
||||
q.prune()
|
||||
q.persistLocked()
|
||||
}
|
||||
|
||||
func (q *taskQueue) persistLocked() {
|
||||
if q.statePath == "" {
|
||||
return
|
||||
}
|
||||
state := make([]persistedTask, 0, len(q.tasks))
|
||||
for _, t := range q.tasks {
|
||||
state = append(state, persistedTask{
|
||||
ID: t.ID,
|
||||
Name: t.Name,
|
||||
Target: t.Target,
|
||||
Priority: t.Priority,
|
||||
Status: t.Status,
|
||||
CreatedAt: t.CreatedAt,
|
||||
StartedAt: t.StartedAt,
|
||||
DoneAt: t.DoneAt,
|
||||
ErrMsg: t.ErrMsg,
|
||||
LogPath: t.LogPath,
|
||||
Params: t.params,
|
||||
})
|
||||
}
|
||||
data, err := json.MarshalIndent(state, "", " ")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tmp := q.statePath + ".tmp"
|
||||
if err := os.WriteFile(tmp, data, 0644); err != nil {
|
||||
return
|
||||
}
|
||||
_ = os.Rename(tmp, q.statePath)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user