RunNvidiaPowerBench already performs a full internal ramp from 1 to N GPUs in Phase 2. Spawning N tasks with growing GPU subsets meant task K repeated all steps 1..K-1 already done by tasks 1..K-1 — O(N²) work instead of O(N). Replace with a single task using all selected GPUs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1547 lines
44 KiB
Go
1547 lines
44 KiB
Go
package webui
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"os"
|
||
"os/exec"
|
||
"path/filepath"
|
||
"regexp"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"sync/atomic"
|
||
"syscall"
|
||
"time"
|
||
|
||
"bee/audit/internal/app"
|
||
"bee/audit/internal/platform"
|
||
)
|
||
|
||
var ansiEscapeRE = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]|\x1b[()][A-Z0-9]|\x1b[DABC]`)
|
||
var apiListNvidiaGPUs = func(a *app.App) ([]platform.NvidiaGPU, error) {
|
||
if a == nil {
|
||
return nil, fmt.Errorf("app not configured")
|
||
}
|
||
return a.ListNvidiaGPUs()
|
||
}
|
||
var apiListNvidiaGPUStatuses = func(a *app.App) ([]platform.NvidiaGPUStatus, error) {
|
||
if a == nil {
|
||
return nil, fmt.Errorf("app not configured")
|
||
}
|
||
return a.ListNvidiaGPUStatuses()
|
||
}
|
||
|
||
const (
|
||
taskPriorityBenchmark = 10
|
||
taskPriorityBurn = 20
|
||
taskPriorityValidateStress = 30
|
||
taskPriorityValidate = 40
|
||
taskPriorityAudit = 50
|
||
taskPriorityInstallToRAM = 60
|
||
taskPriorityInstall = 70
|
||
)
|
||
|
||
// ── Job ID counter ────────────────────────────────────────────────────────────
|
||
|
||
var jobCounter atomic.Uint64
|
||
|
||
func newJobID(_ string) string {
|
||
start := int((jobCounter.Add(1) - 1) % 1000)
|
||
globalQueue.mu.Lock()
|
||
defer globalQueue.mu.Unlock()
|
||
for offset := 0; offset < 1000; offset++ {
|
||
n := (start + offset) % 1000
|
||
id := fmt.Sprintf("TASK-%03d", n)
|
||
if !taskIDInUseLocked(id) {
|
||
return id
|
||
}
|
||
}
|
||
return fmt.Sprintf("TASK-%03d", start)
|
||
}
|
||
|
||
func taskIDInUseLocked(id string) bool {
|
||
for _, t := range globalQueue.tasks {
|
||
if t != nil && t.ID == id {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
type taskRunResponse struct {
|
||
TaskID string `json:"task_id,omitempty"`
|
||
JobID string `json:"job_id,omitempty"`
|
||
TaskIDs []string `json:"task_ids,omitempty"`
|
||
JobIDs []string `json:"job_ids,omitempty"`
|
||
TaskCount int `json:"task_count,omitempty"`
|
||
}
|
||
|
||
type nvidiaTaskSelection struct {
|
||
GPUIndices []int
|
||
Label string
|
||
}
|
||
|
||
func writeTaskRunResponse(w http.ResponseWriter, tasks []*Task) {
|
||
if len(tasks) == 0 {
|
||
writeJSON(w, taskRunResponse{})
|
||
return
|
||
}
|
||
ids := make([]string, 0, len(tasks))
|
||
for _, t := range tasks {
|
||
if t == nil || strings.TrimSpace(t.ID) == "" {
|
||
continue
|
||
}
|
||
ids = append(ids, t.ID)
|
||
}
|
||
resp := taskRunResponse{TaskCount: len(ids)}
|
||
if len(ids) > 0 {
|
||
resp.TaskID = ids[0]
|
||
resp.JobID = ids[0]
|
||
resp.TaskIDs = ids
|
||
resp.JobIDs = ids
|
||
}
|
||
writeJSON(w, resp)
|
||
}
|
||
|
||
func shouldSplitHomogeneousNvidiaTarget(target string) bool {
|
||
switch strings.TrimSpace(target) {
|
||
case "nvidia", "nvidia-targeted-stress", "nvidia-bench-perf", "nvidia-bench-power", "nvidia-compute",
|
||
"nvidia-targeted-power", "nvidia-pulse", "nvidia-interconnect",
|
||
"nvidia-bandwidth", "nvidia-stress":
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func defaultTaskPriority(target string, params taskParams) int {
|
||
switch strings.TrimSpace(target) {
|
||
case "install":
|
||
return taskPriorityInstall
|
||
case "install-to-ram":
|
||
return taskPriorityInstallToRAM
|
||
case "audit":
|
||
return taskPriorityAudit
|
||
case "nvidia-bench-perf", "nvidia-bench-power":
|
||
return taskPriorityBenchmark
|
||
case "nvidia-stress", "amd-stress", "memory-stress", "sat-stress", "platform-stress", "nvidia-compute":
|
||
return taskPriorityBurn
|
||
case "nvidia", "nvidia-targeted-stress", "nvidia-targeted-power", "nvidia-pulse",
|
||
"nvidia-interconnect", "nvidia-bandwidth", "memory", "storage", "cpu",
|
||
"amd", "amd-mem", "amd-bandwidth":
|
||
if params.StressMode {
|
||
return taskPriorityValidateStress
|
||
}
|
||
return taskPriorityValidate
|
||
default:
|
||
return 0
|
||
}
|
||
}
|
||
|
||
func expandHomogeneousNvidiaSelections(gpus []platform.NvidiaGPU, include, exclude []int) ([]nvidiaTaskSelection, error) {
|
||
if len(gpus) == 0 {
|
||
return nil, fmt.Errorf("no NVIDIA GPUs detected")
|
||
}
|
||
indexed := make(map[int]platform.NvidiaGPU, len(gpus))
|
||
allIndices := make([]int, 0, len(gpus))
|
||
for _, gpu := range gpus {
|
||
indexed[gpu.Index] = gpu
|
||
allIndices = append(allIndices, gpu.Index)
|
||
}
|
||
sort.Ints(allIndices)
|
||
|
||
selected := allIndices
|
||
if len(include) > 0 {
|
||
selected = make([]int, 0, len(include))
|
||
seen := make(map[int]struct{}, len(include))
|
||
for _, idx := range include {
|
||
if _, ok := indexed[idx]; !ok {
|
||
continue
|
||
}
|
||
if _, dup := seen[idx]; dup {
|
||
continue
|
||
}
|
||
seen[idx] = struct{}{}
|
||
selected = append(selected, idx)
|
||
}
|
||
sort.Ints(selected)
|
||
}
|
||
if len(exclude) > 0 {
|
||
skip := make(map[int]struct{}, len(exclude))
|
||
for _, idx := range exclude {
|
||
skip[idx] = struct{}{}
|
||
}
|
||
filtered := selected[:0]
|
||
for _, idx := range selected {
|
||
if _, ok := skip[idx]; ok {
|
||
continue
|
||
}
|
||
filtered = append(filtered, idx)
|
||
}
|
||
selected = filtered
|
||
}
|
||
if len(selected) == 0 {
|
||
return nil, fmt.Errorf("no NVIDIA GPUs selected")
|
||
}
|
||
|
||
modelGroups := make(map[string][]platform.NvidiaGPU)
|
||
modelOrder := make([]string, 0)
|
||
for _, idx := range selected {
|
||
gpu := indexed[idx]
|
||
model := strings.TrimSpace(gpu.Name)
|
||
if model == "" {
|
||
model = fmt.Sprintf("GPU %d", gpu.Index)
|
||
}
|
||
if _, ok := modelGroups[model]; !ok {
|
||
modelOrder = append(modelOrder, model)
|
||
}
|
||
modelGroups[model] = append(modelGroups[model], gpu)
|
||
}
|
||
sort.Slice(modelOrder, func(i, j int) bool {
|
||
left := modelGroups[modelOrder[i]]
|
||
right := modelGroups[modelOrder[j]]
|
||
if len(left) == 0 || len(right) == 0 {
|
||
return modelOrder[i] < modelOrder[j]
|
||
}
|
||
return left[0].Index < right[0].Index
|
||
})
|
||
|
||
var groups []nvidiaTaskSelection
|
||
var singles []nvidiaTaskSelection
|
||
for _, model := range modelOrder {
|
||
group := modelGroups[model]
|
||
sort.Slice(group, func(i, j int) bool { return group[i].Index < group[j].Index })
|
||
indices := make([]int, 0, len(group))
|
||
for _, gpu := range group {
|
||
indices = append(indices, gpu.Index)
|
||
}
|
||
if len(indices) >= 2 {
|
||
groups = append(groups, nvidiaTaskSelection{
|
||
GPUIndices: indices,
|
||
Label: fmt.Sprintf("%s; GPUs %s", model, joinTaskIndices(indices)),
|
||
})
|
||
continue
|
||
}
|
||
gpu := group[0]
|
||
singles = append(singles, nvidiaTaskSelection{
|
||
GPUIndices: []int{gpu.Index},
|
||
Label: fmt.Sprintf("GPU %d — %s", gpu.Index, model),
|
||
})
|
||
}
|
||
return append(groups, singles...), nil
|
||
}
|
||
|
||
func joinTaskIndices(indices []int) string {
|
||
parts := make([]string, 0, len(indices))
|
||
for _, idx := range indices {
|
||
parts = append(parts, fmt.Sprintf("%d", idx))
|
||
}
|
||
return strings.Join(parts, ",")
|
||
}
|
||
|
||
func formatGPUIndexList(indices []int) string {
|
||
parts := make([]string, len(indices))
|
||
for i, idx := range indices {
|
||
parts[i] = strconv.Itoa(idx)
|
||
}
|
||
return strings.Join(parts, ",")
|
||
}
|
||
|
||
func formatSplitTaskName(baseName, selectionLabel string) string {
|
||
baseName = strings.TrimSpace(baseName)
|
||
selectionLabel = strings.TrimSpace(selectionLabel)
|
||
if baseName == "" {
|
||
return selectionLabel
|
||
}
|
||
if selectionLabel == "" {
|
||
return baseName
|
||
}
|
||
return baseName + " (" + selectionLabel + ")"
|
||
}
|
||
|
||
func buildNvidiaTaskSet(target string, priority int, createdAt time.Time, params taskParams, baseName string, appRef *app.App, idPrefix string) ([]*Task, error) {
|
||
if !shouldSplitHomogeneousNvidiaTarget(target) || params.ParallelGPUs {
|
||
// Parallel mode (or non-splittable target): one task for all selected GPUs.
|
||
if params.ParallelGPUs && shouldSplitHomogeneousNvidiaTarget(target) {
|
||
// Resolve the selected GPU indices so ExcludeGPUIndices is applied.
|
||
gpus, err := apiListNvidiaGPUs(appRef)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
resolved, err := expandSelectedGPUIndices(gpus, params.GPUIndices, params.ExcludeGPUIndices)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
params.GPUIndices = resolved
|
||
params.ExcludeGPUIndices = nil
|
||
}
|
||
t := &Task{
|
||
ID: newJobID(idPrefix),
|
||
Name: baseName,
|
||
Target: target,
|
||
Priority: priority,
|
||
Status: TaskPending,
|
||
CreatedAt: createdAt,
|
||
params: params,
|
||
}
|
||
return []*Task{t}, nil
|
||
}
|
||
gpus, err := apiListNvidiaGPUs(appRef)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
selections, err := expandHomogeneousNvidiaSelections(gpus, params.GPUIndices, params.ExcludeGPUIndices)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
tasks := make([]*Task, 0, len(selections))
|
||
for _, selection := range selections {
|
||
taskParamsCopy := params
|
||
taskParamsCopy.GPUIndices = append([]int(nil), selection.GPUIndices...)
|
||
taskParamsCopy.ExcludeGPUIndices = nil
|
||
displayName := formatSplitTaskName(baseName, selection.Label)
|
||
taskParamsCopy.DisplayName = displayName
|
||
tasks = append(tasks, &Task{
|
||
ID: newJobID(idPrefix),
|
||
Name: displayName,
|
||
Target: target,
|
||
Priority: priority,
|
||
Status: TaskPending,
|
||
CreatedAt: createdAt,
|
||
params: taskParamsCopy,
|
||
})
|
||
}
|
||
return tasks, nil
|
||
}
|
||
|
||
// expandSelectedGPUIndices returns the sorted list of selected GPU indices after
|
||
// applying include/exclude filters, without splitting by model.
|
||
func expandSelectedGPUIndices(gpus []platform.NvidiaGPU, include, exclude []int) ([]int, error) {
|
||
indexed := make(map[int]struct{}, len(gpus))
|
||
allIndices := make([]int, 0, len(gpus))
|
||
for _, gpu := range gpus {
|
||
indexed[gpu.Index] = struct{}{}
|
||
allIndices = append(allIndices, gpu.Index)
|
||
}
|
||
sort.Ints(allIndices)
|
||
|
||
selected := allIndices
|
||
if len(include) > 0 {
|
||
selected = make([]int, 0, len(include))
|
||
seen := make(map[int]struct{}, len(include))
|
||
for _, idx := range include {
|
||
if _, ok := indexed[idx]; !ok {
|
||
continue
|
||
}
|
||
if _, dup := seen[idx]; dup {
|
||
continue
|
||
}
|
||
seen[idx] = struct{}{}
|
||
selected = append(selected, idx)
|
||
}
|
||
sort.Ints(selected)
|
||
}
|
||
if len(exclude) > 0 {
|
||
skip := make(map[int]struct{}, len(exclude))
|
||
for _, idx := range exclude {
|
||
skip[idx] = struct{}{}
|
||
}
|
||
filtered := selected[:0]
|
||
for _, idx := range selected {
|
||
if _, ok := skip[idx]; ok {
|
||
continue
|
||
}
|
||
filtered = append(filtered, idx)
|
||
}
|
||
selected = filtered
|
||
}
|
||
if len(selected) == 0 {
|
||
return nil, fmt.Errorf("no NVIDIA GPUs selected")
|
||
}
|
||
return selected, nil
|
||
}
|
||
|
||
// ── SSE helpers ───────────────────────────────────────────────────────────────
|
||
|
||
func sseWrite(w http.ResponseWriter, event, data string) bool {
|
||
f, ok := w.(http.Flusher)
|
||
if !ok {
|
||
return false
|
||
}
|
||
if event != "" {
|
||
fmt.Fprintf(w, "event: %s\n", event)
|
||
}
|
||
fmt.Fprintf(w, "data: %s\n\n", data)
|
||
f.Flush()
|
||
return true
|
||
}
|
||
|
||
func sseStart(w http.ResponseWriter) bool {
|
||
_, ok := w.(http.Flusher)
|
||
if !ok {
|
||
http.Error(w, "streaming not supported", http.StatusInternalServerError)
|
||
return false
|
||
}
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.Header().Set("Connection", "keep-alive")
|
||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||
return true
|
||
}
|
||
|
||
// streamJob streams lines from a jobState to a SSE response.
|
||
func streamJob(w http.ResponseWriter, r *http.Request, j *jobState) {
|
||
if !sseStart(w) {
|
||
return
|
||
}
|
||
streamSubscribedJob(w, r, j)
|
||
}
|
||
|
||
func streamSubscribedJob(w http.ResponseWriter, r *http.Request, j *jobState) {
|
||
existing, ch := j.subscribe()
|
||
for _, line := range existing {
|
||
sseWrite(w, "", line)
|
||
}
|
||
if ch == nil {
|
||
// Job already finished
|
||
sseWrite(w, "done", j.err)
|
||
return
|
||
}
|
||
for {
|
||
select {
|
||
case line, ok := <-ch:
|
||
if !ok {
|
||
sseWrite(w, "done", j.err)
|
||
return
|
||
}
|
||
sseWrite(w, "", line)
|
||
case <-r.Context().Done():
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// streamCmdJob runs an exec.Cmd and streams stdout+stderr lines into j.
|
||
func streamCmdJob(j *jobState, cmd *exec.Cmd) error {
|
||
pr, pw := io.Pipe()
|
||
cmd.Stdout = pw
|
||
cmd.Stderr = pw
|
||
|
||
if err := cmd.Start(); err != nil {
|
||
_ = pw.Close()
|
||
_ = pr.Close()
|
||
return err
|
||
}
|
||
// Lower the CPU scheduling priority of stress/audit subprocesses to nice+10
|
||
// so the X server and kernel interrupt handling remain responsive under load
|
||
// (prevents KVM/IPMI graphical console from freezing during GPU stress tests).
|
||
if cmd.Process != nil {
|
||
_ = syscall.Setpriority(syscall.PRIO_PROCESS, cmd.Process.Pid, 10)
|
||
}
|
||
|
||
scanDone := make(chan error, 1)
|
||
go func() {
|
||
defer func() {
|
||
if rec := recover(); rec != nil {
|
||
scanDone <- fmt.Errorf("stream scanner panic: %v", rec)
|
||
}
|
||
}()
|
||
scanner := bufio.NewScanner(pr)
|
||
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||
for scanner.Scan() {
|
||
// Split on \r to handle progress-bar style output (e.g. \r overwrites)
|
||
// and strip ANSI escape codes so logs are readable in the browser.
|
||
parts := strings.Split(scanner.Text(), "\r")
|
||
for _, part := range parts {
|
||
line := ansiEscapeRE.ReplaceAllString(part, "")
|
||
if line != "" {
|
||
j.append(line)
|
||
}
|
||
}
|
||
}
|
||
if err := scanner.Err(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
||
scanDone <- err
|
||
return
|
||
}
|
||
scanDone <- nil
|
||
}()
|
||
|
||
err := cmd.Wait()
|
||
_ = pw.Close()
|
||
scanErr := <-scanDone
|
||
_ = pr.Close()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return scanErr
|
||
}
|
||
|
||
// ── Audit ─────────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIAuditRun(w http.ResponseWriter, _ *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
t := &Task{
|
||
ID: newJobID("audit"),
|
||
Name: "Audit",
|
||
Target: "audit",
|
||
Priority: defaultTaskPriority("audit", taskParams{}),
|
||
Status: TaskPending,
|
||
CreatedAt: time.Now(),
|
||
}
|
||
globalQueue.enqueue(t)
|
||
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
|
||
}
|
||
|
||
func (h *handler) handleAPIAuditStream(w http.ResponseWriter, r *http.Request) {
|
||
id := r.URL.Query().Get("job_id")
|
||
if id == "" {
|
||
id = r.URL.Query().Get("task_id")
|
||
}
|
||
// Try task queue first, then legacy job manager
|
||
if j, ok := globalQueue.findJob(id); ok {
|
||
streamJob(w, r, j)
|
||
return
|
||
}
|
||
if j, ok := globalJobs.get(id); ok {
|
||
streamJob(w, r, j)
|
||
return
|
||
}
|
||
http.Error(w, "job not found", http.StatusNotFound)
|
||
}
|
||
|
||
// ── SAT ───────────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPISATRun(target string) http.HandlerFunc {
|
||
return func(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
|
||
var body struct {
|
||
Duration int `json:"duration"`
|
||
StressMode bool `json:"stress_mode"`
|
||
GPUIndices []int `json:"gpu_indices"`
|
||
ExcludeGPUIndices []int `json:"exclude_gpu_indices"`
|
||
StaggerGPUStart bool `json:"stagger_gpu_start"`
|
||
ParallelGPUs bool `json:"parallel_gpus"`
|
||
Loader string `json:"loader"`
|
||
Profile string `json:"profile"`
|
||
DisplayName string `json:"display_name"`
|
||
PlatformComponents []string `json:"platform_components"`
|
||
}
|
||
if r.Body != nil {
|
||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil && !errors.Is(err, io.EOF) {
|
||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
}
|
||
|
||
name := taskDisplayName(target, body.Profile, body.Loader)
|
||
if strings.TrimSpace(body.DisplayName) != "" {
|
||
name = body.DisplayName
|
||
}
|
||
params := taskParams{
|
||
Duration: body.Duration,
|
||
StressMode: body.StressMode,
|
||
GPUIndices: body.GPUIndices,
|
||
ExcludeGPUIndices: body.ExcludeGPUIndices,
|
||
StaggerGPUStart: body.StaggerGPUStart,
|
||
ParallelGPUs: body.ParallelGPUs,
|
||
Loader: body.Loader,
|
||
BurnProfile: body.Profile,
|
||
DisplayName: body.DisplayName,
|
||
PlatformComponents: body.PlatformComponents,
|
||
}
|
||
tasks, err := buildNvidiaTaskSet(target, defaultTaskPriority(target, params), time.Now(), params, name, h.opts.App, "sat-"+target)
|
||
if err != nil {
|
||
writeError(w, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
for _, t := range tasks {
|
||
globalQueue.enqueue(t)
|
||
}
|
||
writeTaskRunResponse(w, tasks)
|
||
}
|
||
}
|
||
|
||
func (h *handler) handleAPIBenchmarkNvidiaRunKind(target string) http.HandlerFunc {
|
||
return func(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
|
||
var body struct {
|
||
Profile string `json:"profile"`
|
||
SizeMB int `json:"size_mb"`
|
||
GPUIndices []int `json:"gpu_indices"`
|
||
ExcludeGPUIndices []int `json:"exclude_gpu_indices"`
|
||
RunNCCL *bool `json:"run_nccl"`
|
||
ParallelGPUs *bool `json:"parallel_gpus"`
|
||
RampUp *bool `json:"ramp_up"`
|
||
DisplayName string `json:"display_name"`
|
||
}
|
||
if r.Body != nil {
|
||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil && !errors.Is(err, io.EOF) {
|
||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
}
|
||
|
||
runNCCL := true
|
||
if body.RunNCCL != nil {
|
||
runNCCL = *body.RunNCCL
|
||
}
|
||
parallelGPUs := false
|
||
if body.ParallelGPUs != nil {
|
||
parallelGPUs = *body.ParallelGPUs
|
||
}
|
||
rampUp := false
|
||
if body.RampUp != nil {
|
||
rampUp = *body.RampUp
|
||
}
|
||
// Build a descriptive base name that includes profile and mode so the task
|
||
// list is self-explanatory without opening individual task detail pages.
|
||
profile := strings.TrimSpace(body.Profile)
|
||
if profile == "" {
|
||
profile = "standard"
|
||
}
|
||
name := taskDisplayName(target, "", "")
|
||
if strings.TrimSpace(body.DisplayName) != "" {
|
||
name = body.DisplayName
|
||
}
|
||
// Append profile tag.
|
||
name = fmt.Sprintf("%s · %s", name, profile)
|
||
|
||
if target == "nvidia-bench-power" && parallelGPUs {
|
||
writeError(w, http.StatusBadRequest, "power / thermal fit benchmark uses sequential or ramp-up modes only")
|
||
return
|
||
}
|
||
|
||
if rampUp && len(body.GPUIndices) > 1 {
|
||
// Ramp-up mode: RunNvidiaPowerBench internally ramps from 1 to N GPUs
|
||
// in Phase 2 (one additional GPU per step). A single task with all
|
||
// selected GPUs is sufficient — spawning N tasks with growing subsets
|
||
// would repeat all earlier steps redundantly.
|
||
gpus, err := apiListNvidiaGPUs(h.opts.App)
|
||
if err != nil {
|
||
writeError(w, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
resolved, err := expandSelectedGPUIndices(gpus, body.GPUIndices, body.ExcludeGPUIndices)
|
||
if err != nil {
|
||
writeError(w, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
if len(resolved) < 2 {
|
||
// Fall through to normal single-task path.
|
||
rampUp = false
|
||
} else {
|
||
now := time.Now()
|
||
rampRunID := fmt.Sprintf("ramp-%s", now.UTC().Format("20060102-150405"))
|
||
taskName := fmt.Sprintf("%s · ramp 1–%d · GPU %s", name, len(resolved), formatGPUIndexList(resolved))
|
||
t := &Task{
|
||
ID: newJobID("bee-bench-nvidia"),
|
||
Name: taskName,
|
||
Target: target,
|
||
Priority: defaultTaskPriority(target, taskParams{}),
|
||
Status: TaskPending,
|
||
CreatedAt: now,
|
||
params: taskParams{
|
||
GPUIndices: append([]int(nil), resolved...),
|
||
SizeMB: body.SizeMB,
|
||
BenchmarkProfile: body.Profile,
|
||
RunNCCL: runNCCL,
|
||
ParallelGPUs: true,
|
||
RampTotal: len(resolved),
|
||
RampRunID: rampRunID,
|
||
DisplayName: taskName,
|
||
},
|
||
}
|
||
globalQueue.enqueue(t)
|
||
writeTaskRunResponse(w, []*Task{t})
|
||
return
|
||
}
|
||
}
|
||
|
||
// For non-ramp tasks append mode tag.
|
||
if parallelGPUs {
|
||
name = fmt.Sprintf("%s · parallel", name)
|
||
} else {
|
||
name = fmt.Sprintf("%s · sequential", name)
|
||
}
|
||
|
||
params := taskParams{
|
||
GPUIndices: body.GPUIndices,
|
||
ExcludeGPUIndices: body.ExcludeGPUIndices,
|
||
SizeMB: body.SizeMB,
|
||
BenchmarkProfile: body.Profile,
|
||
RunNCCL: runNCCL,
|
||
ParallelGPUs: parallelGPUs,
|
||
DisplayName: body.DisplayName,
|
||
}
|
||
tasks, err := buildNvidiaTaskSet(target, defaultTaskPriority(target, params), time.Now(), params, name, h.opts.App, "bee-bench-nvidia")
|
||
if err != nil {
|
||
writeError(w, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
for _, t := range tasks {
|
||
globalQueue.enqueue(t)
|
||
}
|
||
writeTaskRunResponse(w, tasks)
|
||
}
|
||
}
|
||
|
||
func (h *handler) handleAPIBenchmarkNvidiaRun(w http.ResponseWriter, r *http.Request) {
|
||
h.handleAPIBenchmarkNvidiaRunKind("nvidia-bench-perf").ServeHTTP(w, r)
|
||
}
|
||
|
||
func (h *handler) handleAPISATStream(w http.ResponseWriter, r *http.Request) {
|
||
id := r.URL.Query().Get("job_id")
|
||
if id == "" {
|
||
id = r.URL.Query().Get("task_id")
|
||
}
|
||
if j, ok := globalQueue.findJob(id); ok {
|
||
streamJob(w, r, j)
|
||
return
|
||
}
|
||
if j, ok := globalJobs.get(id); ok {
|
||
streamJob(w, r, j)
|
||
return
|
||
}
|
||
http.Error(w, "job not found", http.StatusNotFound)
|
||
}
|
||
|
||
func (h *handler) handleAPISATAbort(w http.ResponseWriter, r *http.Request) {
|
||
id := r.URL.Query().Get("job_id")
|
||
if id == "" {
|
||
id = r.URL.Query().Get("task_id")
|
||
}
|
||
if t, ok := globalQueue.findByID(id); ok {
|
||
globalQueue.mu.Lock()
|
||
switch t.Status {
|
||
case TaskPending:
|
||
t.Status = TaskCancelled
|
||
now := time.Now()
|
||
t.DoneAt = &now
|
||
case TaskRunning:
|
||
if t.job != nil {
|
||
t.job.abort()
|
||
}
|
||
t.Status = TaskCancelled
|
||
now := time.Now()
|
||
t.DoneAt = &now
|
||
}
|
||
globalQueue.mu.Unlock()
|
||
writeJSON(w, map[string]string{"status": "aborted"})
|
||
return
|
||
}
|
||
if j, ok := globalJobs.get(id); ok {
|
||
if j.abort() {
|
||
writeJSON(w, map[string]string{"status": "aborted"})
|
||
} else {
|
||
writeJSON(w, map[string]string{"status": "not_running"})
|
||
}
|
||
return
|
||
}
|
||
http.Error(w, "job not found", http.StatusNotFound)
|
||
}
|
||
|
||
// ── Services ──────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIServicesList(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
names, err := h.opts.App.ListBeeServices()
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
type serviceInfo struct {
|
||
Name string `json:"name"`
|
||
State string `json:"state"`
|
||
Body string `json:"body"`
|
||
}
|
||
result := make([]serviceInfo, 0, len(names))
|
||
for _, name := range names {
|
||
state := h.opts.App.ServiceState(name)
|
||
body, _ := h.opts.App.ServiceStatus(name)
|
||
result = append(result, serviceInfo{Name: name, State: state, Body: body})
|
||
}
|
||
writeJSON(w, result)
|
||
}
|
||
|
||
func (h *handler) handleAPIServicesAction(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var req struct {
|
||
Name string `json:"name"`
|
||
Action string `json:"action"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
var action platform.ServiceAction
|
||
switch req.Action {
|
||
case "start":
|
||
action = platform.ServiceStart
|
||
case "stop":
|
||
action = platform.ServiceStop
|
||
case "restart":
|
||
action = platform.ServiceRestart
|
||
default:
|
||
writeError(w, http.StatusBadRequest, "action must be start|stop|restart")
|
||
return
|
||
}
|
||
result, err := h.opts.App.ServiceActionResult(req.Name, action)
|
||
status := "ok"
|
||
if err != nil {
|
||
status = "error"
|
||
}
|
||
// Always return 200 with output so the frontend can display the actual
|
||
// systemctl error message instead of a generic "exit status 1".
|
||
writeJSON(w, map[string]string{"status": status, "output": result.Body})
|
||
}
|
||
|
||
// ── Network ───────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPINetworkStatus(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
ifaces, err := h.opts.App.ListInterfaces()
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, map[string]any{
|
||
"interfaces": ifaces,
|
||
"default_route": h.opts.App.DefaultRoute(),
|
||
"pending_change": h.hasPendingNetworkChange(),
|
||
"rollback_in": h.pendingNetworkRollbackIn(),
|
||
})
|
||
}
|
||
|
||
func (h *handler) handleAPINetworkDHCP(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var req struct {
|
||
Interface string `json:"interface"`
|
||
}
|
||
_ = json.NewDecoder(r.Body).Decode(&req)
|
||
|
||
result, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) {
|
||
if req.Interface == "" || req.Interface == "all" {
|
||
return h.opts.App.DHCPAllResult()
|
||
}
|
||
return h.opts.App.DHCPOneResult(req.Interface)
|
||
})
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, map[string]any{
|
||
"status": "ok",
|
||
"output": result.Body,
|
||
"rollback_in": int(netRollbackTimeout.Seconds()),
|
||
})
|
||
}
|
||
|
||
func (h *handler) handleAPINetworkStatic(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var req struct {
|
||
Interface string `json:"interface"`
|
||
Address string `json:"address"`
|
||
Prefix string `json:"prefix"`
|
||
Gateway string `json:"gateway"`
|
||
DNS []string `json:"dns"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
cfg := platform.StaticIPv4Config{
|
||
Interface: req.Interface,
|
||
Address: req.Address,
|
||
Prefix: req.Prefix,
|
||
Gateway: req.Gateway,
|
||
DNS: req.DNS,
|
||
}
|
||
result, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) {
|
||
return h.opts.App.SetStaticIPv4Result(cfg)
|
||
})
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, map[string]any{
|
||
"status": "ok",
|
||
"output": result.Body,
|
||
"rollback_in": int(netRollbackTimeout.Seconds()),
|
||
})
|
||
}
|
||
|
||
// ── Export ────────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIExportList(w http.ResponseWriter, r *http.Request) {
|
||
entries, err := listExportFiles(h.opts.ExportDir)
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, entries)
|
||
}
|
||
|
||
func (h *handler) handleAPIExportUSBTargets(w http.ResponseWriter, _ *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
targets, err := h.opts.App.ListRemovableTargets()
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
if targets == nil {
|
||
targets = []platform.RemovableTarget{}
|
||
}
|
||
writeJSON(w, targets)
|
||
}
|
||
|
||
func (h *handler) handleAPIExportUSBAudit(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var target platform.RemovableTarget
|
||
if err := json.NewDecoder(r.Body).Decode(&target); err != nil || target.Device == "" {
|
||
writeError(w, http.StatusBadRequest, "device is required")
|
||
return
|
||
}
|
||
result, err := h.opts.App.ExportLatestAuditResult(target)
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, map[string]string{"status": "ok", "message": result.Body})
|
||
}
|
||
|
||
func (h *handler) handleAPIExportUSBBundle(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var target platform.RemovableTarget
|
||
if err := json.NewDecoder(r.Body).Decode(&target); err != nil || target.Device == "" {
|
||
writeError(w, http.StatusBadRequest, "device is required")
|
||
return
|
||
}
|
||
result, err := h.opts.App.ExportSupportBundleResult(target)
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, map[string]string{"status": "ok", "message": result.Body})
|
||
}
|
||
|
||
// ── GPU presence ──────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIGNVIDIAGPUs(w http.ResponseWriter, _ *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
gpus, err := h.opts.App.ListNvidiaGPUs()
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
if gpus == nil {
|
||
gpus = []platform.NvidiaGPU{}
|
||
}
|
||
writeJSON(w, gpus)
|
||
}
|
||
|
||
func (h *handler) handleAPIGNVIDIAGPUStatuses(w http.ResponseWriter, _ *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
gpus, err := apiListNvidiaGPUStatuses(h.opts.App)
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
if gpus == nil {
|
||
gpus = []platform.NvidiaGPUStatus{}
|
||
}
|
||
writeJSON(w, gpus)
|
||
}
|
||
|
||
func (h *handler) handleAPIGNVIDIAReset(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var req struct {
|
||
Index int `json:"index"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
result, err := h.opts.App.ResetNvidiaGPU(req.Index)
|
||
status := "ok"
|
||
if err != nil {
|
||
status = "error"
|
||
}
|
||
writeJSON(w, map[string]string{"status": status, "output": result.Body})
|
||
}
|
||
|
||
func (h *handler) handleAPIGPUPresence(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
gp := h.opts.App.DetectGPUPresence()
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(map[string]bool{
|
||
"nvidia": gp.Nvidia,
|
||
"amd": gp.AMD,
|
||
})
|
||
}
|
||
|
||
// ── GPU tools ─────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIGPUTools(w http.ResponseWriter, _ *http.Request) {
|
||
type toolEntry struct {
|
||
ID string `json:"id"`
|
||
Available bool `json:"available"`
|
||
Vendor string `json:"vendor"` // "nvidia" | "amd"
|
||
}
|
||
_, nvidiaErr := os.Stat("/dev/nvidia0")
|
||
_, amdErr := os.Stat("/dev/kfd")
|
||
nvidiaUp := nvidiaErr == nil
|
||
amdUp := amdErr == nil
|
||
_, dcgmErr := exec.LookPath("dcgmi")
|
||
_, ncclStressErr := exec.LookPath("bee-nccl-gpu-stress")
|
||
_, johnErr := exec.LookPath("bee-john-gpu-stress")
|
||
_, beeBurnErr := exec.LookPath("bee-gpu-burn")
|
||
_, nvBandwidthErr := exec.LookPath("nvbandwidth")
|
||
profErr := lookPathAny("dcgmproftester", "dcgmproftester13", "dcgmproftester12", "dcgmproftester11")
|
||
writeJSON(w, []toolEntry{
|
||
{ID: "nvidia-compute", Available: nvidiaUp && profErr == nil, Vendor: "nvidia"},
|
||
{ID: "nvidia-targeted-power", Available: nvidiaUp && dcgmErr == nil, Vendor: "nvidia"},
|
||
{ID: "nvidia-pulse", Available: nvidiaUp && dcgmErr == nil, Vendor: "nvidia"},
|
||
{ID: "nvidia-interconnect", Available: nvidiaUp && ncclStressErr == nil, Vendor: "nvidia"},
|
||
{ID: "nvidia-bandwidth", Available: nvidiaUp && dcgmErr == nil && nvBandwidthErr == nil, Vendor: "nvidia"},
|
||
{ID: "bee-gpu-burn", Available: nvidiaUp && beeBurnErr == nil, Vendor: "nvidia"},
|
||
{ID: "john", Available: nvidiaUp && johnErr == nil, Vendor: "nvidia"},
|
||
{ID: "rvs", Available: amdUp, Vendor: "amd"},
|
||
})
|
||
}
|
||
|
||
func lookPathAny(names ...string) error {
|
||
for _, name := range names {
|
||
if _, err := exec.LookPath(name); err == nil {
|
||
return nil
|
||
}
|
||
}
|
||
return exec.ErrNotFound
|
||
}
|
||
|
||
// ── System ────────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIRAMStatus(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
status := h.currentRAMStatus()
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(status)
|
||
}
|
||
|
||
type ramStatusResponse struct {
|
||
platform.LiveMediaRAMState
|
||
InstallTaskActive bool `json:"install_task_active,omitempty"`
|
||
CopyTaskActive bool `json:"copy_task_active,omitempty"`
|
||
CanStartTask bool `json:"can_start_task,omitempty"`
|
||
BlockedReason string `json:"blocked_reason,omitempty"`
|
||
}
|
||
|
||
func (h *handler) currentRAMStatus() ramStatusResponse {
|
||
state := h.opts.App.LiveMediaRAMState()
|
||
resp := ramStatusResponse{LiveMediaRAMState: state}
|
||
if globalQueue.hasActiveTarget("install") {
|
||
resp.InstallTaskActive = true
|
||
resp.BlockedReason = "install to disk is already running"
|
||
return resp
|
||
}
|
||
if globalQueue.hasActiveTarget("install-to-ram") {
|
||
resp.CopyTaskActive = true
|
||
resp.BlockedReason = "install to RAM task is already pending or running"
|
||
return resp
|
||
}
|
||
if state.InRAM {
|
||
resp.BlockedReason = "system is already running from RAM"
|
||
return resp
|
||
}
|
||
resp.CanStartTask = state.CanStartCopy
|
||
if !resp.CanStartTask && resp.BlockedReason == "" {
|
||
resp.BlockedReason = state.Message
|
||
}
|
||
return resp
|
||
}
|
||
|
||
func (h *handler) handleAPIInstallToRAM(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
status := h.currentRAMStatus()
|
||
if !status.CanStartTask {
|
||
msg := strings.TrimSpace(status.BlockedReason)
|
||
if msg == "" {
|
||
msg = "install to RAM is not available"
|
||
}
|
||
writeError(w, http.StatusConflict, msg)
|
||
return
|
||
}
|
||
t := &Task{
|
||
ID: newJobID("install-to-ram"),
|
||
Name: "Install to RAM",
|
||
Target: "install-to-ram",
|
||
Priority: defaultTaskPriority("install-to-ram", taskParams{}),
|
||
Status: TaskPending,
|
||
CreatedAt: time.Now(),
|
||
}
|
||
globalQueue.enqueue(t)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(map[string]string{"task_id": t.ID})
|
||
}
|
||
|
||
// ── Tools ─────────────────────────────────────────────────────────────────────
|
||
|
||
var standardTools = []string{
|
||
"dmidecode", "smartctl", "nvme", "lspci", "ipmitool",
|
||
"nvidia-smi", "dcgmi", "nv-hostengine", "memtester", "stress-ng", "nvtop",
|
||
"mstflint", "qrencode",
|
||
}
|
||
|
||
func (h *handler) handleAPIToolsCheck(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
statuses := h.opts.App.CheckTools(standardTools)
|
||
writeJSON(w, statuses)
|
||
}
|
||
|
||
// ── Preflight ─────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIPreflight(w http.ResponseWriter, r *http.Request) {
|
||
data, err := loadSnapshot(filepath.Join(h.opts.ExportDir, "runtime-health.json"))
|
||
if err != nil {
|
||
writeError(w, http.StatusNotFound, "runtime health not found")
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||
w.Header().Set("Cache-Control", "no-store")
|
||
_, _ = w.Write(data)
|
||
}
|
||
|
||
// ── Install ───────────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIInstallDisks(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
disks, err := h.opts.App.ListInstallDisks()
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
type diskJSON struct {
|
||
Device string `json:"device"`
|
||
Model string `json:"model"`
|
||
Size string `json:"size"`
|
||
SizeBytes int64 `json:"size_bytes"`
|
||
MountedParts []string `json:"mounted_parts"`
|
||
Warnings []string `json:"warnings"`
|
||
}
|
||
result := make([]diskJSON, 0, len(disks))
|
||
for _, d := range disks {
|
||
result = append(result, diskJSON{
|
||
Device: d.Device,
|
||
Model: d.Model,
|
||
Size: d.Size,
|
||
SizeBytes: d.SizeBytes,
|
||
MountedParts: d.MountedParts,
|
||
Warnings: platform.DiskWarnings(d),
|
||
})
|
||
}
|
||
writeJSON(w, result)
|
||
}
|
||
|
||
func (h *handler) handleAPIInstallRun(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var req struct {
|
||
Device string `json:"device"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Device == "" {
|
||
writeError(w, http.StatusBadRequest, "device is required")
|
||
return
|
||
}
|
||
|
||
// Whitelist: only allow devices that ListInstallDisks() returns.
|
||
disks, err := h.opts.App.ListInstallDisks()
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
allowed := false
|
||
for _, d := range disks {
|
||
if d.Device == req.Device {
|
||
allowed = true
|
||
break
|
||
}
|
||
}
|
||
if !allowed {
|
||
writeError(w, http.StatusBadRequest, "device not in install candidate list")
|
||
return
|
||
}
|
||
if globalQueue.hasActiveTarget("install-to-ram") {
|
||
writeError(w, http.StatusConflict, "install to RAM task is already pending or running")
|
||
return
|
||
}
|
||
if globalQueue.hasActiveTarget("install") {
|
||
writeError(w, http.StatusConflict, "install task is already pending or running")
|
||
return
|
||
}
|
||
t := &Task{
|
||
ID: newJobID("install"),
|
||
Name: "Install to Disk",
|
||
Target: "install",
|
||
Priority: defaultTaskPriority("install", taskParams{}),
|
||
Status: TaskPending,
|
||
CreatedAt: time.Now(),
|
||
params: taskParams{
|
||
Device: req.Device,
|
||
},
|
||
}
|
||
globalQueue.enqueue(t)
|
||
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
|
||
}
|
||
|
||
// ── Metrics SSE ───────────────────────────────────────────────────────────────
|
||
|
||
func (h *handler) handleAPIMetricsLatest(w http.ResponseWriter, r *http.Request) {
|
||
sample, ok := h.latestMetric()
|
||
if !ok {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_, _ = w.Write([]byte("{}"))
|
||
return
|
||
}
|
||
b, err := json.Marshal(sample)
|
||
if err != nil {
|
||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_, _ = w.Write(b)
|
||
}
|
||
|
||
func (h *handler) handleAPIMetricsStream(w http.ResponseWriter, r *http.Request) {
|
||
if !sseStart(w) {
|
||
return
|
||
}
|
||
ticker := time.NewTicker(1 * time.Second)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-r.Context().Done():
|
||
return
|
||
case <-ticker.C:
|
||
sample, ok := h.latestMetric()
|
||
if !ok {
|
||
continue
|
||
}
|
||
b, err := json.Marshal(sample)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
if !sseWrite(w, "metrics", string(b)) {
|
||
return
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// feedRings pushes one sample into all in-memory ring buffers.
|
||
func (h *handler) feedRings(sample platform.LiveMetricSample) {
|
||
for _, t := range sample.Temps {
|
||
switch t.Group {
|
||
case "cpu":
|
||
h.pushNamedMetricRing(&h.cpuTempRings, t.Name, t.Celsius)
|
||
case "ambient":
|
||
h.pushNamedMetricRing(&h.ambientTempRings, t.Name, t.Celsius)
|
||
}
|
||
}
|
||
h.ringPower.push(sample.PowerW)
|
||
h.ringCPULoad.push(sample.CPULoadPct)
|
||
h.ringMemLoad.push(sample.MemLoadPct)
|
||
|
||
h.ringsMu.Lock()
|
||
h.pushFanRings(sample.Fans)
|
||
for _, gpu := range sample.GPUs {
|
||
idx := gpu.GPUIndex
|
||
for len(h.gpuRings) <= idx {
|
||
h.gpuRings = append(h.gpuRings, &gpuRings{
|
||
Temp: newMetricsRing(120),
|
||
Util: newMetricsRing(120),
|
||
MemUtil: newMetricsRing(120),
|
||
Power: newMetricsRing(120),
|
||
})
|
||
}
|
||
h.gpuRings[idx].Temp.push(gpu.TempC)
|
||
h.gpuRings[idx].Util.push(gpu.UsagePct)
|
||
h.gpuRings[idx].MemUtil.push(gpu.MemUsagePct)
|
||
h.gpuRings[idx].Power.push(gpu.PowerW)
|
||
}
|
||
h.ringsMu.Unlock()
|
||
}
|
||
|
||
func (h *handler) pushFanRings(fans []platform.FanReading) {
|
||
if len(fans) == 0 && len(h.ringFans) == 0 {
|
||
return
|
||
}
|
||
fanValues := make(map[string]float64, len(fans))
|
||
for _, fan := range fans {
|
||
if fan.Name == "" {
|
||
continue
|
||
}
|
||
fanValues[fan.Name] = fan.RPM
|
||
found := false
|
||
for i, name := range h.fanNames {
|
||
if name == fan.Name {
|
||
found = true
|
||
if i >= len(h.ringFans) {
|
||
h.ringFans = append(h.ringFans, newMetricsRing(120))
|
||
}
|
||
break
|
||
}
|
||
}
|
||
if !found {
|
||
h.fanNames = append(h.fanNames, fan.Name)
|
||
h.ringFans = append(h.ringFans, newMetricsRing(120))
|
||
}
|
||
}
|
||
for i, ring := range h.ringFans {
|
||
if ring == nil {
|
||
continue
|
||
}
|
||
name := ""
|
||
if i < len(h.fanNames) {
|
||
name = h.fanNames[i]
|
||
}
|
||
if rpm, ok := fanValues[name]; ok {
|
||
ring.push(rpm)
|
||
continue
|
||
}
|
||
if last, ok := ring.latest(); ok {
|
||
ring.push(last)
|
||
continue
|
||
}
|
||
ring.push(0)
|
||
}
|
||
}
|
||
|
||
func (h *handler) pushNamedMetricRing(dst *[]*namedMetricsRing, name string, value float64) {
|
||
if name == "" {
|
||
return
|
||
}
|
||
for _, item := range *dst {
|
||
if item != nil && item.Name == name && item.Ring != nil {
|
||
item.Ring.push(value)
|
||
return
|
||
}
|
||
}
|
||
*dst = append(*dst, &namedMetricsRing{
|
||
Name: name,
|
||
Ring: newMetricsRing(120),
|
||
})
|
||
(*dst)[len(*dst)-1].Ring.push(value)
|
||
}
|
||
|
||
// ── Network toggle ────────────────────────────────────────────────────────────
|
||
|
||
const netRollbackTimeout = 60 * time.Second
|
||
|
||
func (h *handler) handleAPINetworkToggle(w http.ResponseWriter, r *http.Request) {
|
||
if h.opts.App == nil {
|
||
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
||
return
|
||
}
|
||
var req struct {
|
||
Iface string `json:"iface"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Iface == "" {
|
||
writeError(w, http.StatusBadRequest, "iface is required")
|
||
return
|
||
}
|
||
|
||
wasUp, err := h.opts.App.GetInterfaceState(req.Iface)
|
||
if err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
|
||
if _, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) {
|
||
err := h.opts.App.SetInterfaceState(req.Iface, !wasUp)
|
||
return app.ActionResult{}, err
|
||
}); err != nil {
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
|
||
newState := "up"
|
||
if wasUp {
|
||
newState = "down"
|
||
}
|
||
writeJSON(w, map[string]any{
|
||
"iface": req.Iface,
|
||
"new_state": newState,
|
||
"rollback_in": int(netRollbackTimeout.Seconds()),
|
||
})
|
||
}
|
||
|
||
func (h *handler) applyPendingNetworkChange(apply func() (app.ActionResult, error)) (app.ActionResult, error) {
|
||
if h.opts.App == nil {
|
||
return app.ActionResult{}, fmt.Errorf("app not configured")
|
||
}
|
||
|
||
if err := h.rollbackPendingNetworkChange(); err != nil && err.Error() != "no pending network change" {
|
||
return app.ActionResult{}, err
|
||
}
|
||
|
||
snapshot, err := h.opts.App.CaptureNetworkSnapshot()
|
||
if err != nil {
|
||
return app.ActionResult{}, err
|
||
}
|
||
|
||
result, err := apply()
|
||
if err != nil {
|
||
return result, err
|
||
}
|
||
|
||
pnc := &pendingNetChange{
|
||
snapshot: snapshot,
|
||
deadline: time.Now().Add(netRollbackTimeout),
|
||
}
|
||
pnc.timer = time.AfterFunc(netRollbackTimeout, func() {
|
||
_ = h.opts.App.RestoreNetworkSnapshot(snapshot)
|
||
h.pendingNetMu.Lock()
|
||
if h.pendingNet == pnc {
|
||
h.pendingNet = nil
|
||
}
|
||
h.pendingNetMu.Unlock()
|
||
})
|
||
|
||
h.pendingNetMu.Lock()
|
||
h.pendingNet = pnc
|
||
h.pendingNetMu.Unlock()
|
||
|
||
return result, nil
|
||
}
|
||
|
||
func (h *handler) hasPendingNetworkChange() bool {
|
||
h.pendingNetMu.Lock()
|
||
defer h.pendingNetMu.Unlock()
|
||
return h.pendingNet != nil
|
||
}
|
||
|
||
func (h *handler) pendingNetworkRollbackIn() int {
|
||
h.pendingNetMu.Lock()
|
||
defer h.pendingNetMu.Unlock()
|
||
if h.pendingNet == nil {
|
||
return 0
|
||
}
|
||
remaining := int(time.Until(h.pendingNet.deadline).Seconds())
|
||
if remaining < 1 {
|
||
return 1
|
||
}
|
||
return remaining
|
||
}
|
||
|
||
func (h *handler) handleAPINetworkConfirm(w http.ResponseWriter, _ *http.Request) {
|
||
h.pendingNetMu.Lock()
|
||
pnc := h.pendingNet
|
||
h.pendingNet = nil
|
||
h.pendingNetMu.Unlock()
|
||
if pnc != nil {
|
||
pnc.mu.Lock()
|
||
pnc.timer.Stop()
|
||
pnc.mu.Unlock()
|
||
}
|
||
writeJSON(w, map[string]string{"status": "confirmed"})
|
||
}
|
||
|
||
func (h *handler) handleAPINetworkRollback(w http.ResponseWriter, _ *http.Request) {
|
||
if err := h.rollbackPendingNetworkChange(); err != nil {
|
||
if err.Error() == "no pending network change" {
|
||
writeError(w, http.StatusConflict, err.Error())
|
||
return
|
||
}
|
||
writeError(w, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
writeJSON(w, map[string]string{"status": "rolled back"})
|
||
}
|
||
|
||
func (h *handler) handleAPIBenchmarkResults(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
fmt.Fprint(w, renderBenchmarkResultsCard(h.opts.ExportDir))
|
||
}
|
||
|
||
func (h *handler) rollbackPendingNetworkChange() error {
|
||
h.pendingNetMu.Lock()
|
||
pnc := h.pendingNet
|
||
h.pendingNet = nil
|
||
h.pendingNetMu.Unlock()
|
||
if pnc == nil {
|
||
return fmt.Errorf("no pending network change")
|
||
}
|
||
pnc.mu.Lock()
|
||
pnc.timer.Stop()
|
||
pnc.mu.Unlock()
|
||
if h.opts.App != nil {
|
||
return h.opts.App.RestoreNetworkSnapshot(pnc.snapshot)
|
||
}
|
||
return nil
|
||
}
|