1548 lines
44 KiB
Go
1548 lines
44 KiB
Go
package webui
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"regexp"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"bee/audit/internal/app"
|
|
"bee/audit/internal/platform"
|
|
)
|
|
|
|
var ansiEscapeRE = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]|\x1b[()][A-Z0-9]|\x1b[DABC]`)
|
|
var apiListNvidiaGPUs = func(a *app.App) ([]platform.NvidiaGPU, error) {
|
|
if a == nil {
|
|
return nil, fmt.Errorf("app not configured")
|
|
}
|
|
return a.ListNvidiaGPUs()
|
|
}
|
|
var apiListNvidiaGPUStatuses = func(a *app.App) ([]platform.NvidiaGPUStatus, error) {
|
|
if a == nil {
|
|
return nil, fmt.Errorf("app not configured")
|
|
}
|
|
return a.ListNvidiaGPUStatuses()
|
|
}
|
|
|
|
const (
|
|
taskPriorityBenchmark = 10
|
|
taskPriorityBurn = 20
|
|
taskPriorityValidateStress = 30
|
|
taskPriorityValidate = 40
|
|
taskPriorityAudit = 50
|
|
taskPriorityInstallToRAM = 60
|
|
taskPriorityInstall = 70
|
|
)
|
|
|
|
// ── Job ID counter ────────────────────────────────────────────────────────────
|
|
|
|
var jobCounter atomic.Uint64
|
|
|
|
func newJobID(_ string) string {
|
|
start := int((jobCounter.Add(1) - 1) % 1000)
|
|
globalQueue.mu.Lock()
|
|
defer globalQueue.mu.Unlock()
|
|
for offset := 0; offset < 1000; offset++ {
|
|
n := (start + offset) % 1000
|
|
id := fmt.Sprintf("TASK-%03d", n)
|
|
if !taskIDInUseLocked(id) {
|
|
return id
|
|
}
|
|
}
|
|
return fmt.Sprintf("TASK-%03d", start)
|
|
}
|
|
|
|
func taskIDInUseLocked(id string) bool {
|
|
for _, t := range globalQueue.tasks {
|
|
if t != nil && t.ID == id {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
type taskRunResponse struct {
|
|
TaskID string `json:"task_id,omitempty"`
|
|
JobID string `json:"job_id,omitempty"`
|
|
TaskIDs []string `json:"task_ids,omitempty"`
|
|
JobIDs []string `json:"job_ids,omitempty"`
|
|
TaskCount int `json:"task_count,omitempty"`
|
|
}
|
|
|
|
type nvidiaTaskSelection struct {
|
|
GPUIndices []int
|
|
Label string
|
|
}
|
|
|
|
func writeTaskRunResponse(w http.ResponseWriter, tasks []*Task) {
|
|
if len(tasks) == 0 {
|
|
writeJSON(w, taskRunResponse{})
|
|
return
|
|
}
|
|
ids := make([]string, 0, len(tasks))
|
|
for _, t := range tasks {
|
|
if t == nil || strings.TrimSpace(t.ID) == "" {
|
|
continue
|
|
}
|
|
ids = append(ids, t.ID)
|
|
}
|
|
resp := taskRunResponse{TaskCount: len(ids)}
|
|
if len(ids) > 0 {
|
|
resp.TaskID = ids[0]
|
|
resp.JobID = ids[0]
|
|
resp.TaskIDs = ids
|
|
resp.JobIDs = ids
|
|
}
|
|
writeJSON(w, resp)
|
|
}
|
|
|
|
func shouldSplitHomogeneousNvidiaTarget(target string) bool {
|
|
switch strings.TrimSpace(target) {
|
|
case "nvidia", "nvidia-targeted-stress", "nvidia-bench-perf", "nvidia-bench-power", "nvidia-compute",
|
|
"nvidia-targeted-power", "nvidia-pulse", "nvidia-interconnect",
|
|
"nvidia-bandwidth", "nvidia-stress":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func defaultTaskPriority(target string, params taskParams) int {
|
|
switch strings.TrimSpace(target) {
|
|
case "install":
|
|
return taskPriorityInstall
|
|
case "install-to-ram":
|
|
return taskPriorityInstallToRAM
|
|
case "audit":
|
|
return taskPriorityAudit
|
|
case "nvidia-bench-perf", "nvidia-bench-power":
|
|
return taskPriorityBenchmark
|
|
case "nvidia-stress", "amd-stress", "memory-stress", "sat-stress", "platform-stress", "nvidia-compute":
|
|
return taskPriorityBurn
|
|
case "nvidia", "nvidia-targeted-stress", "nvidia-targeted-power", "nvidia-pulse",
|
|
"nvidia-interconnect", "nvidia-bandwidth", "memory", "storage", "cpu",
|
|
"amd", "amd-mem", "amd-bandwidth":
|
|
if params.StressMode {
|
|
return taskPriorityValidateStress
|
|
}
|
|
return taskPriorityValidate
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func expandHomogeneousNvidiaSelections(gpus []platform.NvidiaGPU, include, exclude []int) ([]nvidiaTaskSelection, error) {
|
|
if len(gpus) == 0 {
|
|
return nil, fmt.Errorf("no NVIDIA GPUs detected")
|
|
}
|
|
indexed := make(map[int]platform.NvidiaGPU, len(gpus))
|
|
allIndices := make([]int, 0, len(gpus))
|
|
for _, gpu := range gpus {
|
|
indexed[gpu.Index] = gpu
|
|
allIndices = append(allIndices, gpu.Index)
|
|
}
|
|
sort.Ints(allIndices)
|
|
|
|
selected := allIndices
|
|
if len(include) > 0 {
|
|
selected = make([]int, 0, len(include))
|
|
seen := make(map[int]struct{}, len(include))
|
|
for _, idx := range include {
|
|
if _, ok := indexed[idx]; !ok {
|
|
continue
|
|
}
|
|
if _, dup := seen[idx]; dup {
|
|
continue
|
|
}
|
|
seen[idx] = struct{}{}
|
|
selected = append(selected, idx)
|
|
}
|
|
sort.Ints(selected)
|
|
}
|
|
if len(exclude) > 0 {
|
|
skip := make(map[int]struct{}, len(exclude))
|
|
for _, idx := range exclude {
|
|
skip[idx] = struct{}{}
|
|
}
|
|
filtered := selected[:0]
|
|
for _, idx := range selected {
|
|
if _, ok := skip[idx]; ok {
|
|
continue
|
|
}
|
|
filtered = append(filtered, idx)
|
|
}
|
|
selected = filtered
|
|
}
|
|
if len(selected) == 0 {
|
|
return nil, fmt.Errorf("no NVIDIA GPUs selected")
|
|
}
|
|
|
|
modelGroups := make(map[string][]platform.NvidiaGPU)
|
|
modelOrder := make([]string, 0)
|
|
for _, idx := range selected {
|
|
gpu := indexed[idx]
|
|
model := strings.TrimSpace(gpu.Name)
|
|
if model == "" {
|
|
model = fmt.Sprintf("GPU %d", gpu.Index)
|
|
}
|
|
if _, ok := modelGroups[model]; !ok {
|
|
modelOrder = append(modelOrder, model)
|
|
}
|
|
modelGroups[model] = append(modelGroups[model], gpu)
|
|
}
|
|
sort.Slice(modelOrder, func(i, j int) bool {
|
|
left := modelGroups[modelOrder[i]]
|
|
right := modelGroups[modelOrder[j]]
|
|
if len(left) == 0 || len(right) == 0 {
|
|
return modelOrder[i] < modelOrder[j]
|
|
}
|
|
return left[0].Index < right[0].Index
|
|
})
|
|
|
|
var groups []nvidiaTaskSelection
|
|
var singles []nvidiaTaskSelection
|
|
for _, model := range modelOrder {
|
|
group := modelGroups[model]
|
|
sort.Slice(group, func(i, j int) bool { return group[i].Index < group[j].Index })
|
|
indices := make([]int, 0, len(group))
|
|
for _, gpu := range group {
|
|
indices = append(indices, gpu.Index)
|
|
}
|
|
if len(indices) >= 2 {
|
|
groups = append(groups, nvidiaTaskSelection{
|
|
GPUIndices: indices,
|
|
Label: fmt.Sprintf("%s; GPUs %s", model, joinTaskIndices(indices)),
|
|
})
|
|
continue
|
|
}
|
|
gpu := group[0]
|
|
singles = append(singles, nvidiaTaskSelection{
|
|
GPUIndices: []int{gpu.Index},
|
|
Label: fmt.Sprintf("GPU %d — %s", gpu.Index, model),
|
|
})
|
|
}
|
|
return append(groups, singles...), nil
|
|
}
|
|
|
|
func joinTaskIndices(indices []int) string {
|
|
parts := make([]string, 0, len(indices))
|
|
for _, idx := range indices {
|
|
parts = append(parts, fmt.Sprintf("%d", idx))
|
|
}
|
|
return strings.Join(parts, ",")
|
|
}
|
|
|
|
func formatGPUIndexList(indices []int) string {
|
|
parts := make([]string, len(indices))
|
|
for i, idx := range indices {
|
|
parts[i] = strconv.Itoa(idx)
|
|
}
|
|
return strings.Join(parts, ",")
|
|
}
|
|
|
|
func formatSplitTaskName(baseName, selectionLabel string) string {
|
|
baseName = strings.TrimSpace(baseName)
|
|
selectionLabel = strings.TrimSpace(selectionLabel)
|
|
if baseName == "" {
|
|
return selectionLabel
|
|
}
|
|
if selectionLabel == "" {
|
|
return baseName
|
|
}
|
|
return baseName + " (" + selectionLabel + ")"
|
|
}
|
|
|
|
func buildNvidiaTaskSet(target string, priority int, createdAt time.Time, params taskParams, baseName string, appRef *app.App, idPrefix string) ([]*Task, error) {
|
|
if !shouldSplitHomogeneousNvidiaTarget(target) || params.ParallelGPUs {
|
|
// Parallel mode (or non-splittable target): one task for all selected GPUs.
|
|
if params.ParallelGPUs && shouldSplitHomogeneousNvidiaTarget(target) {
|
|
// Resolve the selected GPU indices so ExcludeGPUIndices is applied.
|
|
gpus, err := apiListNvidiaGPUs(appRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resolved, err := expandSelectedGPUIndices(gpus, params.GPUIndices, params.ExcludeGPUIndices)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
params.GPUIndices = resolved
|
|
params.ExcludeGPUIndices = nil
|
|
}
|
|
t := &Task{
|
|
ID: newJobID(idPrefix),
|
|
Name: baseName,
|
|
Target: target,
|
|
Priority: priority,
|
|
Status: TaskPending,
|
|
CreatedAt: createdAt,
|
|
params: params,
|
|
}
|
|
return []*Task{t}, nil
|
|
}
|
|
gpus, err := apiListNvidiaGPUs(appRef)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
selections, err := expandHomogeneousNvidiaSelections(gpus, params.GPUIndices, params.ExcludeGPUIndices)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tasks := make([]*Task, 0, len(selections))
|
|
for _, selection := range selections {
|
|
taskParamsCopy := params
|
|
taskParamsCopy.GPUIndices = append([]int(nil), selection.GPUIndices...)
|
|
taskParamsCopy.ExcludeGPUIndices = nil
|
|
displayName := formatSplitTaskName(baseName, selection.Label)
|
|
taskParamsCopy.DisplayName = displayName
|
|
tasks = append(tasks, &Task{
|
|
ID: newJobID(idPrefix),
|
|
Name: displayName,
|
|
Target: target,
|
|
Priority: priority,
|
|
Status: TaskPending,
|
|
CreatedAt: createdAt,
|
|
params: taskParamsCopy,
|
|
})
|
|
}
|
|
return tasks, nil
|
|
}
|
|
|
|
// expandSelectedGPUIndices returns the sorted list of selected GPU indices after
|
|
// applying include/exclude filters, without splitting by model.
|
|
func expandSelectedGPUIndices(gpus []platform.NvidiaGPU, include, exclude []int) ([]int, error) {
|
|
indexed := make(map[int]struct{}, len(gpus))
|
|
allIndices := make([]int, 0, len(gpus))
|
|
for _, gpu := range gpus {
|
|
indexed[gpu.Index] = struct{}{}
|
|
allIndices = append(allIndices, gpu.Index)
|
|
}
|
|
sort.Ints(allIndices)
|
|
|
|
selected := allIndices
|
|
if len(include) > 0 {
|
|
selected = make([]int, 0, len(include))
|
|
seen := make(map[int]struct{}, len(include))
|
|
for _, idx := range include {
|
|
if _, ok := indexed[idx]; !ok {
|
|
continue
|
|
}
|
|
if _, dup := seen[idx]; dup {
|
|
continue
|
|
}
|
|
seen[idx] = struct{}{}
|
|
selected = append(selected, idx)
|
|
}
|
|
sort.Ints(selected)
|
|
}
|
|
if len(exclude) > 0 {
|
|
skip := make(map[int]struct{}, len(exclude))
|
|
for _, idx := range exclude {
|
|
skip[idx] = struct{}{}
|
|
}
|
|
filtered := selected[:0]
|
|
for _, idx := range selected {
|
|
if _, ok := skip[idx]; ok {
|
|
continue
|
|
}
|
|
filtered = append(filtered, idx)
|
|
}
|
|
selected = filtered
|
|
}
|
|
if len(selected) == 0 {
|
|
return nil, fmt.Errorf("no NVIDIA GPUs selected")
|
|
}
|
|
return selected, nil
|
|
}
|
|
|
|
// ── SSE helpers ───────────────────────────────────────────────────────────────
|
|
|
|
func sseWrite(w http.ResponseWriter, event, data string) bool {
|
|
f, ok := w.(http.Flusher)
|
|
if !ok {
|
|
return false
|
|
}
|
|
if event != "" {
|
|
fmt.Fprintf(w, "event: %s\n", event)
|
|
}
|
|
fmt.Fprintf(w, "data: %s\n\n", data)
|
|
f.Flush()
|
|
return true
|
|
}
|
|
|
|
func sseStart(w http.ResponseWriter) bool {
|
|
_, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming not supported", http.StatusInternalServerError)
|
|
return false
|
|
}
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
return true
|
|
}
|
|
|
|
// streamJob streams lines from a jobState to a SSE response.
|
|
func streamJob(w http.ResponseWriter, r *http.Request, j *jobState) {
|
|
if !sseStart(w) {
|
|
return
|
|
}
|
|
streamSubscribedJob(w, r, j)
|
|
}
|
|
|
|
func streamSubscribedJob(w http.ResponseWriter, r *http.Request, j *jobState) {
|
|
existing, ch := j.subscribe()
|
|
for _, line := range existing {
|
|
sseWrite(w, "", line)
|
|
}
|
|
if ch == nil {
|
|
// Job already finished
|
|
sseWrite(w, "done", j.err)
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case line, ok := <-ch:
|
|
if !ok {
|
|
sseWrite(w, "done", j.err)
|
|
return
|
|
}
|
|
sseWrite(w, "", line)
|
|
case <-r.Context().Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// streamCmdJob runs an exec.Cmd and streams stdout+stderr lines into j.
|
|
func streamCmdJob(j *jobState, cmd *exec.Cmd) error {
|
|
pr, pw := io.Pipe()
|
|
cmd.Stdout = pw
|
|
cmd.Stderr = pw
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
_ = pw.Close()
|
|
_ = pr.Close()
|
|
return err
|
|
}
|
|
// Lower the CPU scheduling priority of stress/audit subprocesses to nice+10
|
|
// so the X server and kernel interrupt handling remain responsive under load
|
|
// (prevents KVM/IPMI graphical console from freezing during GPU stress tests).
|
|
if cmd.Process != nil {
|
|
_ = syscall.Setpriority(syscall.PRIO_PROCESS, cmd.Process.Pid, 10)
|
|
}
|
|
|
|
scanDone := make(chan error, 1)
|
|
go func() {
|
|
defer func() {
|
|
if rec := recover(); rec != nil {
|
|
scanDone <- fmt.Errorf("stream scanner panic: %v", rec)
|
|
}
|
|
}()
|
|
scanner := bufio.NewScanner(pr)
|
|
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
|
for scanner.Scan() {
|
|
// Split on \r to handle progress-bar style output (e.g. \r overwrites)
|
|
// and strip ANSI escape codes so logs are readable in the browser.
|
|
parts := strings.Split(scanner.Text(), "\r")
|
|
for _, part := range parts {
|
|
line := ansiEscapeRE.ReplaceAllString(part, "")
|
|
if line != "" {
|
|
j.append(line)
|
|
}
|
|
}
|
|
}
|
|
if err := scanner.Err(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
scanDone <- err
|
|
return
|
|
}
|
|
scanDone <- nil
|
|
}()
|
|
|
|
err := cmd.Wait()
|
|
_ = pw.Close()
|
|
scanErr := <-scanDone
|
|
_ = pr.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return scanErr
|
|
}
|
|
|
|
// ── Audit ─────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIAuditRun(w http.ResponseWriter, _ *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
t := &Task{
|
|
ID: newJobID("audit"),
|
|
Name: "Audit",
|
|
Target: "audit",
|
|
Priority: defaultTaskPriority("audit", taskParams{}),
|
|
Status: TaskPending,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
globalQueue.enqueue(t)
|
|
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
|
|
}
|
|
|
|
func (h *handler) handleAPIAuditStream(w http.ResponseWriter, r *http.Request) {
|
|
id := r.URL.Query().Get("job_id")
|
|
if id == "" {
|
|
id = r.URL.Query().Get("task_id")
|
|
}
|
|
// Try task queue first, then legacy job manager
|
|
if j, ok := globalQueue.findJob(id); ok {
|
|
streamJob(w, r, j)
|
|
return
|
|
}
|
|
if j, ok := globalJobs.get(id); ok {
|
|
streamJob(w, r, j)
|
|
return
|
|
}
|
|
http.Error(w, "job not found", http.StatusNotFound)
|
|
}
|
|
|
|
// ── SAT ───────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPISATRun(target string) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
|
|
var body struct {
|
|
Duration int `json:"duration"`
|
|
StressMode bool `json:"stress_mode"`
|
|
GPUIndices []int `json:"gpu_indices"`
|
|
ExcludeGPUIndices []int `json:"exclude_gpu_indices"`
|
|
StaggerGPUStart bool `json:"stagger_gpu_start"`
|
|
ParallelGPUs bool `json:"parallel_gpus"`
|
|
Loader string `json:"loader"`
|
|
Profile string `json:"profile"`
|
|
DisplayName string `json:"display_name"`
|
|
PlatformComponents []string `json:"platform_components"`
|
|
}
|
|
if r.Body != nil {
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil && !errors.Is(err, io.EOF) {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
}
|
|
|
|
name := taskDisplayName(target, body.Profile, body.Loader)
|
|
if strings.TrimSpace(body.DisplayName) != "" {
|
|
name = body.DisplayName
|
|
}
|
|
params := taskParams{
|
|
Duration: body.Duration,
|
|
StressMode: body.StressMode,
|
|
GPUIndices: body.GPUIndices,
|
|
ExcludeGPUIndices: body.ExcludeGPUIndices,
|
|
StaggerGPUStart: body.StaggerGPUStart,
|
|
ParallelGPUs: body.ParallelGPUs,
|
|
Loader: body.Loader,
|
|
BurnProfile: body.Profile,
|
|
DisplayName: body.DisplayName,
|
|
PlatformComponents: body.PlatformComponents,
|
|
}
|
|
tasks, err := buildNvidiaTaskSet(target, defaultTaskPriority(target, params), time.Now(), params, name, h.opts.App, "sat-"+target)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
for _, t := range tasks {
|
|
globalQueue.enqueue(t)
|
|
}
|
|
writeTaskRunResponse(w, tasks)
|
|
}
|
|
}
|
|
|
|
func (h *handler) handleAPIBenchmarkNvidiaRunKind(target string) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
|
|
var body struct {
|
|
Profile string `json:"profile"`
|
|
SizeMB int `json:"size_mb"`
|
|
GPUIndices []int `json:"gpu_indices"`
|
|
ExcludeGPUIndices []int `json:"exclude_gpu_indices"`
|
|
RunNCCL *bool `json:"run_nccl"`
|
|
ParallelGPUs *bool `json:"parallel_gpus"`
|
|
RampUp *bool `json:"ramp_up"`
|
|
DisplayName string `json:"display_name"`
|
|
}
|
|
if r.Body != nil {
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil && !errors.Is(err, io.EOF) {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
}
|
|
|
|
runNCCL := true
|
|
if body.RunNCCL != nil {
|
|
runNCCL = *body.RunNCCL
|
|
}
|
|
parallelGPUs := false
|
|
if body.ParallelGPUs != nil {
|
|
parallelGPUs = *body.ParallelGPUs
|
|
}
|
|
rampUp := false
|
|
if body.RampUp != nil {
|
|
rampUp = *body.RampUp
|
|
}
|
|
// Build a descriptive base name that includes profile and mode so the task
|
|
// list is self-explanatory without opening individual task detail pages.
|
|
profile := strings.TrimSpace(body.Profile)
|
|
if profile == "" {
|
|
profile = "standard"
|
|
}
|
|
name := taskDisplayName(target, "", "")
|
|
if strings.TrimSpace(body.DisplayName) != "" {
|
|
name = body.DisplayName
|
|
}
|
|
// Append profile tag.
|
|
name = fmt.Sprintf("%s · %s", name, profile)
|
|
|
|
if target == "nvidia-bench-power" && parallelGPUs {
|
|
writeError(w, http.StatusBadRequest, "power / thermal fit benchmark uses sequential or ramp-up modes only")
|
|
return
|
|
}
|
|
|
|
if rampUp && len(body.GPUIndices) > 1 {
|
|
// Ramp-up mode: resolve GPU list, then create one task per prefix
|
|
// [gpu0], [gpu0,gpu1], ..., [gpu0,...,gpuN-1], each running in parallel.
|
|
gpus, err := apiListNvidiaGPUs(h.opts.App)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
resolved, err := expandSelectedGPUIndices(gpus, body.GPUIndices, body.ExcludeGPUIndices)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
if len(resolved) < 2 {
|
|
// Fall through to normal single-task path.
|
|
rampUp = false
|
|
} else {
|
|
now := time.Now()
|
|
rampRunID := fmt.Sprintf("ramp-%s", now.UTC().Format("20060102-150405"))
|
|
var allTasks []*Task
|
|
for step := 1; step <= len(resolved); step++ {
|
|
subset := resolved[:step]
|
|
stepName := fmt.Sprintf("%s · ramp %d/%d · GPU %s", name, step, len(resolved), formatGPUIndexList(subset))
|
|
t := &Task{
|
|
ID: newJobID("bee-bench-nvidia"),
|
|
Name: stepName,
|
|
Target: target,
|
|
Priority: defaultTaskPriority(target, taskParams{}),
|
|
Status: TaskPending,
|
|
CreatedAt: now,
|
|
params: taskParams{
|
|
GPUIndices: append([]int(nil), subset...),
|
|
SizeMB: body.SizeMB,
|
|
BenchmarkProfile: body.Profile,
|
|
RunNCCL: runNCCL && step == len(resolved),
|
|
ParallelGPUs: true,
|
|
RampStep: step,
|
|
RampTotal: len(resolved),
|
|
RampRunID: rampRunID,
|
|
DisplayName: stepName,
|
|
},
|
|
}
|
|
allTasks = append(allTasks, t)
|
|
}
|
|
for _, t := range allTasks {
|
|
globalQueue.enqueue(t)
|
|
}
|
|
writeTaskRunResponse(w, allTasks)
|
|
return
|
|
}
|
|
}
|
|
|
|
// For non-ramp tasks append mode tag.
|
|
if parallelGPUs {
|
|
name = fmt.Sprintf("%s · parallel", name)
|
|
} else {
|
|
name = fmt.Sprintf("%s · sequential", name)
|
|
}
|
|
|
|
params := taskParams{
|
|
GPUIndices: body.GPUIndices,
|
|
ExcludeGPUIndices: body.ExcludeGPUIndices,
|
|
SizeMB: body.SizeMB,
|
|
BenchmarkProfile: body.Profile,
|
|
RunNCCL: runNCCL,
|
|
ParallelGPUs: parallelGPUs,
|
|
DisplayName: body.DisplayName,
|
|
}
|
|
tasks, err := buildNvidiaTaskSet(target, defaultTaskPriority(target, params), time.Now(), params, name, h.opts.App, "bee-bench-nvidia")
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
for _, t := range tasks {
|
|
globalQueue.enqueue(t)
|
|
}
|
|
writeTaskRunResponse(w, tasks)
|
|
}
|
|
}
|
|
|
|
func (h *handler) handleAPIBenchmarkNvidiaRun(w http.ResponseWriter, r *http.Request) {
|
|
h.handleAPIBenchmarkNvidiaRunKind("nvidia-bench-perf").ServeHTTP(w, r)
|
|
}
|
|
|
|
func (h *handler) handleAPISATStream(w http.ResponseWriter, r *http.Request) {
|
|
id := r.URL.Query().Get("job_id")
|
|
if id == "" {
|
|
id = r.URL.Query().Get("task_id")
|
|
}
|
|
if j, ok := globalQueue.findJob(id); ok {
|
|
streamJob(w, r, j)
|
|
return
|
|
}
|
|
if j, ok := globalJobs.get(id); ok {
|
|
streamJob(w, r, j)
|
|
return
|
|
}
|
|
http.Error(w, "job not found", http.StatusNotFound)
|
|
}
|
|
|
|
func (h *handler) handleAPISATAbort(w http.ResponseWriter, r *http.Request) {
|
|
id := r.URL.Query().Get("job_id")
|
|
if id == "" {
|
|
id = r.URL.Query().Get("task_id")
|
|
}
|
|
if t, ok := globalQueue.findByID(id); ok {
|
|
globalQueue.mu.Lock()
|
|
switch t.Status {
|
|
case TaskPending:
|
|
t.Status = TaskCancelled
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
case TaskRunning:
|
|
if t.job != nil {
|
|
t.job.abort()
|
|
}
|
|
t.Status = TaskCancelled
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
}
|
|
globalQueue.mu.Unlock()
|
|
writeJSON(w, map[string]string{"status": "aborted"})
|
|
return
|
|
}
|
|
if j, ok := globalJobs.get(id); ok {
|
|
if j.abort() {
|
|
writeJSON(w, map[string]string{"status": "aborted"})
|
|
} else {
|
|
writeJSON(w, map[string]string{"status": "not_running"})
|
|
}
|
|
return
|
|
}
|
|
http.Error(w, "job not found", http.StatusNotFound)
|
|
}
|
|
|
|
// ── Services ──────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIServicesList(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
names, err := h.opts.App.ListBeeServices()
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
type serviceInfo struct {
|
|
Name string `json:"name"`
|
|
State string `json:"state"`
|
|
Body string `json:"body"`
|
|
}
|
|
result := make([]serviceInfo, 0, len(names))
|
|
for _, name := range names {
|
|
state := h.opts.App.ServiceState(name)
|
|
body, _ := h.opts.App.ServiceStatus(name)
|
|
result = append(result, serviceInfo{Name: name, State: state, Body: body})
|
|
}
|
|
writeJSON(w, result)
|
|
}
|
|
|
|
func (h *handler) handleAPIServicesAction(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var req struct {
|
|
Name string `json:"name"`
|
|
Action string `json:"action"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
var action platform.ServiceAction
|
|
switch req.Action {
|
|
case "start":
|
|
action = platform.ServiceStart
|
|
case "stop":
|
|
action = platform.ServiceStop
|
|
case "restart":
|
|
action = platform.ServiceRestart
|
|
default:
|
|
writeError(w, http.StatusBadRequest, "action must be start|stop|restart")
|
|
return
|
|
}
|
|
result, err := h.opts.App.ServiceActionResult(req.Name, action)
|
|
status := "ok"
|
|
if err != nil {
|
|
status = "error"
|
|
}
|
|
// Always return 200 with output so the frontend can display the actual
|
|
// systemctl error message instead of a generic "exit status 1".
|
|
writeJSON(w, map[string]string{"status": status, "output": result.Body})
|
|
}
|
|
|
|
// ── Network ───────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPINetworkStatus(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
ifaces, err := h.opts.App.ListInterfaces()
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, map[string]any{
|
|
"interfaces": ifaces,
|
|
"default_route": h.opts.App.DefaultRoute(),
|
|
"pending_change": h.hasPendingNetworkChange(),
|
|
"rollback_in": h.pendingNetworkRollbackIn(),
|
|
})
|
|
}
|
|
|
|
func (h *handler) handleAPINetworkDHCP(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var req struct {
|
|
Interface string `json:"interface"`
|
|
}
|
|
_ = json.NewDecoder(r.Body).Decode(&req)
|
|
|
|
result, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) {
|
|
if req.Interface == "" || req.Interface == "all" {
|
|
return h.opts.App.DHCPAllResult()
|
|
}
|
|
return h.opts.App.DHCPOneResult(req.Interface)
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, map[string]any{
|
|
"status": "ok",
|
|
"output": result.Body,
|
|
"rollback_in": int(netRollbackTimeout.Seconds()),
|
|
})
|
|
}
|
|
|
|
func (h *handler) handleAPINetworkStatic(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var req struct {
|
|
Interface string `json:"interface"`
|
|
Address string `json:"address"`
|
|
Prefix string `json:"prefix"`
|
|
Gateway string `json:"gateway"`
|
|
DNS []string `json:"dns"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
cfg := platform.StaticIPv4Config{
|
|
Interface: req.Interface,
|
|
Address: req.Address,
|
|
Prefix: req.Prefix,
|
|
Gateway: req.Gateway,
|
|
DNS: req.DNS,
|
|
}
|
|
result, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) {
|
|
return h.opts.App.SetStaticIPv4Result(cfg)
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, map[string]any{
|
|
"status": "ok",
|
|
"output": result.Body,
|
|
"rollback_in": int(netRollbackTimeout.Seconds()),
|
|
})
|
|
}
|
|
|
|
// ── Export ────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIExportList(w http.ResponseWriter, r *http.Request) {
|
|
entries, err := listExportFiles(h.opts.ExportDir)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, entries)
|
|
}
|
|
|
|
func (h *handler) handleAPIExportUSBTargets(w http.ResponseWriter, _ *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
targets, err := h.opts.App.ListRemovableTargets()
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if targets == nil {
|
|
targets = []platform.RemovableTarget{}
|
|
}
|
|
writeJSON(w, targets)
|
|
}
|
|
|
|
func (h *handler) handleAPIExportUSBAudit(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var target platform.RemovableTarget
|
|
if err := json.NewDecoder(r.Body).Decode(&target); err != nil || target.Device == "" {
|
|
writeError(w, http.StatusBadRequest, "device is required")
|
|
return
|
|
}
|
|
result, err := h.opts.App.ExportLatestAuditResult(target)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, map[string]string{"status": "ok", "message": result.Body})
|
|
}
|
|
|
|
func (h *handler) handleAPIExportUSBBundle(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var target platform.RemovableTarget
|
|
if err := json.NewDecoder(r.Body).Decode(&target); err != nil || target.Device == "" {
|
|
writeError(w, http.StatusBadRequest, "device is required")
|
|
return
|
|
}
|
|
result, err := h.opts.App.ExportSupportBundleResult(target)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, map[string]string{"status": "ok", "message": result.Body})
|
|
}
|
|
|
|
// ── GPU presence ──────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIGNVIDIAGPUs(w http.ResponseWriter, _ *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
gpus, err := h.opts.App.ListNvidiaGPUs()
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if gpus == nil {
|
|
gpus = []platform.NvidiaGPU{}
|
|
}
|
|
writeJSON(w, gpus)
|
|
}
|
|
|
|
func (h *handler) handleAPIGNVIDIAGPUStatuses(w http.ResponseWriter, _ *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
gpus, err := apiListNvidiaGPUStatuses(h.opts.App)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
if gpus == nil {
|
|
gpus = []platform.NvidiaGPUStatus{}
|
|
}
|
|
writeJSON(w, gpus)
|
|
}
|
|
|
|
func (h *handler) handleAPIGNVIDIAReset(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var req struct {
|
|
Index int `json:"index"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
result, err := h.opts.App.ResetNvidiaGPU(req.Index)
|
|
status := "ok"
|
|
if err != nil {
|
|
status = "error"
|
|
}
|
|
writeJSON(w, map[string]string{"status": status, "output": result.Body})
|
|
}
|
|
|
|
func (h *handler) handleAPIGPUPresence(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
gp := h.opts.App.DetectGPUPresence()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]bool{
|
|
"nvidia": gp.Nvidia,
|
|
"amd": gp.AMD,
|
|
})
|
|
}
|
|
|
|
// ── GPU tools ─────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIGPUTools(w http.ResponseWriter, _ *http.Request) {
|
|
type toolEntry struct {
|
|
ID string `json:"id"`
|
|
Available bool `json:"available"`
|
|
Vendor string `json:"vendor"` // "nvidia" | "amd"
|
|
}
|
|
_, nvidiaErr := os.Stat("/dev/nvidia0")
|
|
_, amdErr := os.Stat("/dev/kfd")
|
|
nvidiaUp := nvidiaErr == nil
|
|
amdUp := amdErr == nil
|
|
_, dcgmErr := exec.LookPath("dcgmi")
|
|
_, ncclStressErr := exec.LookPath("bee-nccl-gpu-stress")
|
|
_, johnErr := exec.LookPath("bee-john-gpu-stress")
|
|
_, beeBurnErr := exec.LookPath("bee-gpu-burn")
|
|
_, nvBandwidthErr := exec.LookPath("nvbandwidth")
|
|
profErr := lookPathAny("dcgmproftester", "dcgmproftester13", "dcgmproftester12", "dcgmproftester11")
|
|
writeJSON(w, []toolEntry{
|
|
{ID: "nvidia-compute", Available: nvidiaUp && profErr == nil, Vendor: "nvidia"},
|
|
{ID: "nvidia-targeted-power", Available: nvidiaUp && dcgmErr == nil, Vendor: "nvidia"},
|
|
{ID: "nvidia-pulse", Available: nvidiaUp && dcgmErr == nil, Vendor: "nvidia"},
|
|
{ID: "nvidia-interconnect", Available: nvidiaUp && ncclStressErr == nil, Vendor: "nvidia"},
|
|
{ID: "nvidia-bandwidth", Available: nvidiaUp && dcgmErr == nil && nvBandwidthErr == nil, Vendor: "nvidia"},
|
|
{ID: "bee-gpu-burn", Available: nvidiaUp && beeBurnErr == nil, Vendor: "nvidia"},
|
|
{ID: "john", Available: nvidiaUp && johnErr == nil, Vendor: "nvidia"},
|
|
{ID: "rvs", Available: amdUp, Vendor: "amd"},
|
|
})
|
|
}
|
|
|
|
func lookPathAny(names ...string) error {
|
|
for _, name := range names {
|
|
if _, err := exec.LookPath(name); err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
return exec.ErrNotFound
|
|
}
|
|
|
|
// ── System ────────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIRAMStatus(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
status := h.currentRAMStatus()
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(status)
|
|
}
|
|
|
|
type ramStatusResponse struct {
|
|
platform.LiveMediaRAMState
|
|
InstallTaskActive bool `json:"install_task_active,omitempty"`
|
|
CopyTaskActive bool `json:"copy_task_active,omitempty"`
|
|
CanStartTask bool `json:"can_start_task,omitempty"`
|
|
BlockedReason string `json:"blocked_reason,omitempty"`
|
|
}
|
|
|
|
func (h *handler) currentRAMStatus() ramStatusResponse {
|
|
state := h.opts.App.LiveMediaRAMState()
|
|
resp := ramStatusResponse{LiveMediaRAMState: state}
|
|
if globalQueue.hasActiveTarget("install") {
|
|
resp.InstallTaskActive = true
|
|
resp.BlockedReason = "install to disk is already running"
|
|
return resp
|
|
}
|
|
if globalQueue.hasActiveTarget("install-to-ram") {
|
|
resp.CopyTaskActive = true
|
|
resp.BlockedReason = "install to RAM task is already pending or running"
|
|
return resp
|
|
}
|
|
if state.InRAM {
|
|
resp.BlockedReason = "system is already running from RAM"
|
|
return resp
|
|
}
|
|
resp.CanStartTask = state.CanStartCopy
|
|
if !resp.CanStartTask && resp.BlockedReason == "" {
|
|
resp.BlockedReason = state.Message
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func (h *handler) handleAPIInstallToRAM(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
status := h.currentRAMStatus()
|
|
if !status.CanStartTask {
|
|
msg := strings.TrimSpace(status.BlockedReason)
|
|
if msg == "" {
|
|
msg = "install to RAM is not available"
|
|
}
|
|
writeError(w, http.StatusConflict, msg)
|
|
return
|
|
}
|
|
t := &Task{
|
|
ID: newJobID("install-to-ram"),
|
|
Name: "Install to RAM",
|
|
Target: "install-to-ram",
|
|
Priority: defaultTaskPriority("install-to-ram", taskParams{}),
|
|
Status: TaskPending,
|
|
CreatedAt: time.Now(),
|
|
}
|
|
globalQueue.enqueue(t)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_ = json.NewEncoder(w).Encode(map[string]string{"task_id": t.ID})
|
|
}
|
|
|
|
// ── Tools ─────────────────────────────────────────────────────────────────────
|
|
|
|
var standardTools = []string{
|
|
"dmidecode", "smartctl", "nvme", "lspci", "ipmitool",
|
|
"nvidia-smi", "dcgmi", "nv-hostengine", "memtester", "stress-ng", "nvtop",
|
|
"mstflint", "qrencode",
|
|
}
|
|
|
|
func (h *handler) handleAPIToolsCheck(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
statuses := h.opts.App.CheckTools(standardTools)
|
|
writeJSON(w, statuses)
|
|
}
|
|
|
|
// ── Preflight ─────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIPreflight(w http.ResponseWriter, r *http.Request) {
|
|
data, err := loadSnapshot(filepath.Join(h.opts.ExportDir, "runtime-health.json"))
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "runtime health not found")
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
w.Header().Set("Cache-Control", "no-store")
|
|
_, _ = w.Write(data)
|
|
}
|
|
|
|
// ── Install ───────────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIInstallDisks(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
disks, err := h.opts.App.ListInstallDisks()
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
type diskJSON struct {
|
|
Device string `json:"device"`
|
|
Model string `json:"model"`
|
|
Size string `json:"size"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
MountedParts []string `json:"mounted_parts"`
|
|
Warnings []string `json:"warnings"`
|
|
}
|
|
result := make([]diskJSON, 0, len(disks))
|
|
for _, d := range disks {
|
|
result = append(result, diskJSON{
|
|
Device: d.Device,
|
|
Model: d.Model,
|
|
Size: d.Size,
|
|
SizeBytes: d.SizeBytes,
|
|
MountedParts: d.MountedParts,
|
|
Warnings: platform.DiskWarnings(d),
|
|
})
|
|
}
|
|
writeJSON(w, result)
|
|
}
|
|
|
|
func (h *handler) handleAPIInstallRun(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var req struct {
|
|
Device string `json:"device"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Device == "" {
|
|
writeError(w, http.StatusBadRequest, "device is required")
|
|
return
|
|
}
|
|
|
|
// Whitelist: only allow devices that ListInstallDisks() returns.
|
|
disks, err := h.opts.App.ListInstallDisks()
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
allowed := false
|
|
for _, d := range disks {
|
|
if d.Device == req.Device {
|
|
allowed = true
|
|
break
|
|
}
|
|
}
|
|
if !allowed {
|
|
writeError(w, http.StatusBadRequest, "device not in install candidate list")
|
|
return
|
|
}
|
|
if globalQueue.hasActiveTarget("install-to-ram") {
|
|
writeError(w, http.StatusConflict, "install to RAM task is already pending or running")
|
|
return
|
|
}
|
|
if globalQueue.hasActiveTarget("install") {
|
|
writeError(w, http.StatusConflict, "install task is already pending or running")
|
|
return
|
|
}
|
|
t := &Task{
|
|
ID: newJobID("install"),
|
|
Name: "Install to Disk",
|
|
Target: "install",
|
|
Priority: defaultTaskPriority("install", taskParams{}),
|
|
Status: TaskPending,
|
|
CreatedAt: time.Now(),
|
|
params: taskParams{
|
|
Device: req.Device,
|
|
},
|
|
}
|
|
globalQueue.enqueue(t)
|
|
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
|
|
}
|
|
|
|
// ── Metrics SSE ───────────────────────────────────────────────────────────────
|
|
|
|
func (h *handler) handleAPIMetricsLatest(w http.ResponseWriter, r *http.Request) {
|
|
sample, ok := h.latestMetric()
|
|
if !ok {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte("{}"))
|
|
return
|
|
}
|
|
b, err := json.Marshal(sample)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write(b)
|
|
}
|
|
|
|
func (h *handler) handleAPIMetricsStream(w http.ResponseWriter, r *http.Request) {
|
|
if !sseStart(w) {
|
|
return
|
|
}
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-r.Context().Done():
|
|
return
|
|
case <-ticker.C:
|
|
sample, ok := h.latestMetric()
|
|
if !ok {
|
|
continue
|
|
}
|
|
b, err := json.Marshal(sample)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if !sseWrite(w, "metrics", string(b)) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// feedRings pushes one sample into all in-memory ring buffers.
|
|
func (h *handler) feedRings(sample platform.LiveMetricSample) {
|
|
for _, t := range sample.Temps {
|
|
switch t.Group {
|
|
case "cpu":
|
|
h.pushNamedMetricRing(&h.cpuTempRings, t.Name, t.Celsius)
|
|
case "ambient":
|
|
h.pushNamedMetricRing(&h.ambientTempRings, t.Name, t.Celsius)
|
|
}
|
|
}
|
|
h.ringPower.push(sample.PowerW)
|
|
h.ringCPULoad.push(sample.CPULoadPct)
|
|
h.ringMemLoad.push(sample.MemLoadPct)
|
|
|
|
h.ringsMu.Lock()
|
|
h.pushFanRings(sample.Fans)
|
|
for _, gpu := range sample.GPUs {
|
|
idx := gpu.GPUIndex
|
|
for len(h.gpuRings) <= idx {
|
|
h.gpuRings = append(h.gpuRings, &gpuRings{
|
|
Temp: newMetricsRing(120),
|
|
Util: newMetricsRing(120),
|
|
MemUtil: newMetricsRing(120),
|
|
Power: newMetricsRing(120),
|
|
})
|
|
}
|
|
h.gpuRings[idx].Temp.push(gpu.TempC)
|
|
h.gpuRings[idx].Util.push(gpu.UsagePct)
|
|
h.gpuRings[idx].MemUtil.push(gpu.MemUsagePct)
|
|
h.gpuRings[idx].Power.push(gpu.PowerW)
|
|
}
|
|
h.ringsMu.Unlock()
|
|
}
|
|
|
|
func (h *handler) pushFanRings(fans []platform.FanReading) {
|
|
if len(fans) == 0 && len(h.ringFans) == 0 {
|
|
return
|
|
}
|
|
fanValues := make(map[string]float64, len(fans))
|
|
for _, fan := range fans {
|
|
if fan.Name == "" {
|
|
continue
|
|
}
|
|
fanValues[fan.Name] = fan.RPM
|
|
found := false
|
|
for i, name := range h.fanNames {
|
|
if name == fan.Name {
|
|
found = true
|
|
if i >= len(h.ringFans) {
|
|
h.ringFans = append(h.ringFans, newMetricsRing(120))
|
|
}
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
h.fanNames = append(h.fanNames, fan.Name)
|
|
h.ringFans = append(h.ringFans, newMetricsRing(120))
|
|
}
|
|
}
|
|
for i, ring := range h.ringFans {
|
|
if ring == nil {
|
|
continue
|
|
}
|
|
name := ""
|
|
if i < len(h.fanNames) {
|
|
name = h.fanNames[i]
|
|
}
|
|
if rpm, ok := fanValues[name]; ok {
|
|
ring.push(rpm)
|
|
continue
|
|
}
|
|
if last, ok := ring.latest(); ok {
|
|
ring.push(last)
|
|
continue
|
|
}
|
|
ring.push(0)
|
|
}
|
|
}
|
|
|
|
func (h *handler) pushNamedMetricRing(dst *[]*namedMetricsRing, name string, value float64) {
|
|
if name == "" {
|
|
return
|
|
}
|
|
for _, item := range *dst {
|
|
if item != nil && item.Name == name && item.Ring != nil {
|
|
item.Ring.push(value)
|
|
return
|
|
}
|
|
}
|
|
*dst = append(*dst, &namedMetricsRing{
|
|
Name: name,
|
|
Ring: newMetricsRing(120),
|
|
})
|
|
(*dst)[len(*dst)-1].Ring.push(value)
|
|
}
|
|
|
|
// ── Network toggle ────────────────────────────────────────────────────────────
|
|
|
|
const netRollbackTimeout = 60 * time.Second
|
|
|
|
func (h *handler) handleAPINetworkToggle(w http.ResponseWriter, r *http.Request) {
|
|
if h.opts.App == nil {
|
|
writeError(w, http.StatusServiceUnavailable, "app not configured")
|
|
return
|
|
}
|
|
var req struct {
|
|
Iface string `json:"iface"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Iface == "" {
|
|
writeError(w, http.StatusBadRequest, "iface is required")
|
|
return
|
|
}
|
|
|
|
wasUp, err := h.opts.App.GetInterfaceState(req.Iface)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
|
|
if _, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) {
|
|
err := h.opts.App.SetInterfaceState(req.Iface, !wasUp)
|
|
return app.ActionResult{}, err
|
|
}); err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
|
|
newState := "up"
|
|
if wasUp {
|
|
newState = "down"
|
|
}
|
|
writeJSON(w, map[string]any{
|
|
"iface": req.Iface,
|
|
"new_state": newState,
|
|
"rollback_in": int(netRollbackTimeout.Seconds()),
|
|
})
|
|
}
|
|
|
|
func (h *handler) applyPendingNetworkChange(apply func() (app.ActionResult, error)) (app.ActionResult, error) {
|
|
if h.opts.App == nil {
|
|
return app.ActionResult{}, fmt.Errorf("app not configured")
|
|
}
|
|
|
|
if err := h.rollbackPendingNetworkChange(); err != nil && err.Error() != "no pending network change" {
|
|
return app.ActionResult{}, err
|
|
}
|
|
|
|
snapshot, err := h.opts.App.CaptureNetworkSnapshot()
|
|
if err != nil {
|
|
return app.ActionResult{}, err
|
|
}
|
|
|
|
result, err := apply()
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
|
|
pnc := &pendingNetChange{
|
|
snapshot: snapshot,
|
|
deadline: time.Now().Add(netRollbackTimeout),
|
|
}
|
|
pnc.timer = time.AfterFunc(netRollbackTimeout, func() {
|
|
_ = h.opts.App.RestoreNetworkSnapshot(snapshot)
|
|
h.pendingNetMu.Lock()
|
|
if h.pendingNet == pnc {
|
|
h.pendingNet = nil
|
|
}
|
|
h.pendingNetMu.Unlock()
|
|
})
|
|
|
|
h.pendingNetMu.Lock()
|
|
h.pendingNet = pnc
|
|
h.pendingNetMu.Unlock()
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (h *handler) hasPendingNetworkChange() bool {
|
|
h.pendingNetMu.Lock()
|
|
defer h.pendingNetMu.Unlock()
|
|
return h.pendingNet != nil
|
|
}
|
|
|
|
func (h *handler) pendingNetworkRollbackIn() int {
|
|
h.pendingNetMu.Lock()
|
|
defer h.pendingNetMu.Unlock()
|
|
if h.pendingNet == nil {
|
|
return 0
|
|
}
|
|
remaining := int(time.Until(h.pendingNet.deadline).Seconds())
|
|
if remaining < 1 {
|
|
return 1
|
|
}
|
|
return remaining
|
|
}
|
|
|
|
func (h *handler) handleAPINetworkConfirm(w http.ResponseWriter, _ *http.Request) {
|
|
h.pendingNetMu.Lock()
|
|
pnc := h.pendingNet
|
|
h.pendingNet = nil
|
|
h.pendingNetMu.Unlock()
|
|
if pnc != nil {
|
|
pnc.mu.Lock()
|
|
pnc.timer.Stop()
|
|
pnc.mu.Unlock()
|
|
}
|
|
writeJSON(w, map[string]string{"status": "confirmed"})
|
|
}
|
|
|
|
func (h *handler) handleAPINetworkRollback(w http.ResponseWriter, _ *http.Request) {
|
|
if err := h.rollbackPendingNetworkChange(); err != nil {
|
|
if err.Error() == "no pending network change" {
|
|
writeError(w, http.StatusConflict, err.Error())
|
|
return
|
|
}
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
writeJSON(w, map[string]string{"status": "rolled back"})
|
|
}
|
|
|
|
func (h *handler) rollbackPendingNetworkChange() error {
|
|
h.pendingNetMu.Lock()
|
|
pnc := h.pendingNet
|
|
h.pendingNet = nil
|
|
h.pendingNetMu.Unlock()
|
|
if pnc == nil {
|
|
return fmt.Errorf("no pending network change")
|
|
}
|
|
pnc.mu.Lock()
|
|
pnc.timer.Stop()
|
|
pnc.mu.Unlock()
|
|
if h.opts.App != nil {
|
|
return h.opts.App.RestoreNetworkSnapshot(pnc.snapshot)
|
|
}
|
|
return nil
|
|
}
|