package webui import ( "bufio" "context" "encoding/json" "fmt" "io" "net/http" "os/exec" "path/filepath" "strings" "sync/atomic" "time" "bee/audit/internal/app" "bee/audit/internal/platform" ) // ── Job ID counter ──────────────────────────────────────────────────────────── var jobCounter atomic.Uint64 func newJobID(prefix string) string { return fmt.Sprintf("%s-%d", prefix, jobCounter.Add(1)) } // ── SSE helpers ─────────────────────────────────────────────────────────────── func sseWrite(w http.ResponseWriter, event, data string) bool { f, ok := w.(http.Flusher) if !ok { return false } if event != "" { fmt.Fprintf(w, "event: %s\n", event) } fmt.Fprintf(w, "data: %s\n\n", data) f.Flush() return true } func sseStart(w http.ResponseWriter) bool { _, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return false } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") return true } // streamJob streams lines from a jobState to a SSE response. func streamJob(w http.ResponseWriter, r *http.Request, j *jobState) { if !sseStart(w) { return } existing, ch := j.subscribe() for _, line := range existing { sseWrite(w, "", line) } if ch == nil { // Job already finished sseWrite(w, "done", j.err) return } for { select { case line, ok := <-ch: if !ok { sseWrite(w, "done", j.err) return } sseWrite(w, "", line) case <-r.Context().Done(): return } } } // runCmdJob runs an exec.Cmd as a background job, streaming stdout+stderr lines. func runCmdJob(j *jobState, cmd *exec.Cmd) { pr, pw := io.Pipe() cmd.Stdout = pw cmd.Stderr = pw if err := cmd.Start(); err != nil { j.finish(err.Error()) return } go func() { scanner := bufio.NewScanner(pr) for scanner.Scan() { j.append(scanner.Text()) } }() err := cmd.Wait() _ = pw.Close() if err != nil { j.finish(err.Error()) } else { j.finish("") } } // ── Audit ───────────────────────────────────────────────────────────────────── func (h *handler) handleAPIAuditRun(w http.ResponseWriter, _ *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } t := &Task{ ID: newJobID("audit"), Name: "Audit", Target: "audit", Status: TaskPending, CreatedAt: time.Now(), } globalQueue.enqueue(t) writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID}) } func (h *handler) handleAPIAuditStream(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("job_id") if id == "" { id = r.URL.Query().Get("task_id") } // Try task queue first, then legacy job manager if j, ok := globalQueue.findJob(id); ok { streamJob(w, r, j) return } if j, ok := globalJobs.get(id); ok { streamJob(w, r, j) return } http.Error(w, "job not found", http.StatusNotFound) } // ── SAT ─────────────────────────────────────────────────────────────────────── func (h *handler) handleAPISATRun(target string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } var body struct { Duration int `json:"duration"` DiagLevel int `json:"diag_level"` GPUIndices []int `json:"gpu_indices"` Profile string `json:"profile"` DisplayName string `json:"display_name"` } if r.ContentLength > 0 { _ = json.NewDecoder(r.Body).Decode(&body) } name := taskNames[target] if name == "" { name = target } t := &Task{ ID: newJobID("sat-" + target), Name: name, Target: target, Status: TaskPending, CreatedAt: time.Now(), params: taskParams{ Duration: body.Duration, DiagLevel: body.DiagLevel, GPUIndices: body.GPUIndices, BurnProfile: body.Profile, DisplayName: body.DisplayName, }, } if strings.TrimSpace(body.DisplayName) != "" { t.Name = body.DisplayName } globalQueue.enqueue(t) writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID}) } } func (h *handler) handleAPISATStream(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("job_id") if id == "" { id = r.URL.Query().Get("task_id") } if j, ok := globalQueue.findJob(id); ok { streamJob(w, r, j) return } if j, ok := globalJobs.get(id); ok { streamJob(w, r, j) return } http.Error(w, "job not found", http.StatusNotFound) } func (h *handler) handleAPISATAbort(w http.ResponseWriter, r *http.Request) { id := r.URL.Query().Get("job_id") if id == "" { id = r.URL.Query().Get("task_id") } if t, ok := globalQueue.findByID(id); ok { globalQueue.mu.Lock() switch t.Status { case TaskPending: t.Status = TaskCancelled now := time.Now() t.DoneAt = &now case TaskRunning: if t.job != nil { t.job.abort() } t.Status = TaskCancelled now := time.Now() t.DoneAt = &now } globalQueue.mu.Unlock() writeJSON(w, map[string]string{"status": "aborted"}) return } if j, ok := globalJobs.get(id); ok { if j.abort() { writeJSON(w, map[string]string{"status": "aborted"}) } else { writeJSON(w, map[string]string{"status": "not_running"}) } return } http.Error(w, "job not found", http.StatusNotFound) } // ── Services ────────────────────────────────────────────────────────────────── func (h *handler) handleAPIServicesList(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } names, err := h.opts.App.ListBeeServices() if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } type serviceInfo struct { Name string `json:"name"` State string `json:"state"` Body string `json:"body"` } result := make([]serviceInfo, 0, len(names)) for _, name := range names { state := h.opts.App.ServiceState(name) body, _ := h.opts.App.ServiceStatus(name) result = append(result, serviceInfo{Name: name, State: state, Body: body}) } writeJSON(w, result) } func (h *handler) handleAPIServicesAction(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } var req struct { Name string `json:"name"` Action string `json:"action"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } var action platform.ServiceAction switch req.Action { case "start": action = platform.ServiceStart case "stop": action = platform.ServiceStop case "restart": action = platform.ServiceRestart default: writeError(w, http.StatusBadRequest, "action must be start|stop|restart") return } result, err := h.opts.App.ServiceActionResult(req.Name, action) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, map[string]string{"status": "ok", "output": result.Body}) } // ── Network ─────────────────────────────────────────────────────────────────── func (h *handler) handleAPINetworkStatus(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } ifaces, err := h.opts.App.ListInterfaces() if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, map[string]any{ "interfaces": ifaces, "default_route": h.opts.App.DefaultRoute(), }) } func (h *handler) handleAPINetworkDHCP(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } var req struct { Interface string `json:"interface"` } _ = json.NewDecoder(r.Body).Decode(&req) result, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) { if req.Interface == "" || req.Interface == "all" { return h.opts.App.DHCPAllResult() } return h.opts.App.DHCPOneResult(req.Interface) }) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, map[string]any{ "status": "ok", "output": result.Body, "rollback_in": int(netRollbackTimeout.Seconds()), }) } func (h *handler) handleAPINetworkStatic(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } var req struct { Interface string `json:"interface"` Address string `json:"address"` Prefix string `json:"prefix"` Gateway string `json:"gateway"` DNS []string `json:"dns"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeError(w, http.StatusBadRequest, "invalid request body") return } cfg := platform.StaticIPv4Config{ Interface: req.Interface, Address: req.Address, Prefix: req.Prefix, Gateway: req.Gateway, DNS: req.DNS, } result, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) { return h.opts.App.SetStaticIPv4Result(cfg) }) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, map[string]any{ "status": "ok", "output": result.Body, "rollback_in": int(netRollbackTimeout.Seconds()), }) } // ── Export ──────────────────────────────────────────────────────────────────── func (h *handler) handleAPIExportList(w http.ResponseWriter, r *http.Request) { entries, err := listExportFiles(h.opts.ExportDir) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, entries) } func (h *handler) handleAPIExportBundle(w http.ResponseWriter, r *http.Request) { archive, err := app.BuildSupportBundle(h.opts.ExportDir) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, map[string]string{ "status": "ok", "path": archive, "url": "/export/support.tar.gz", }) } // ── GPU presence ────────────────────────────────────────────────────────────── func (h *handler) handleAPIGPUPresence(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } gp := h.opts.App.DetectGPUPresence() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]bool{ "nvidia": gp.Nvidia, "amd": gp.AMD, }) } // ── System ──────────────────────────────────────────────────────────────────── func (h *handler) handleAPIRAMStatus(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } inRAM := h.opts.App.IsLiveMediaInRAM() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]bool{"in_ram": inRAM}) } func (h *handler) handleAPIInstallToRAM(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } h.installMu.Lock() installRunning := h.installJob != nil && !h.installJob.isDone() h.installMu.Unlock() if installRunning { writeError(w, http.StatusConflict, "install to disk is already running") return } t := &Task{ ID: newJobID("install-to-ram"), Name: "Install to RAM", Target: "install-to-ram", Priority: 10, Status: TaskPending, CreatedAt: time.Now(), } globalQueue.enqueue(t) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]string{"task_id": t.ID}) } // ── Tools ───────────────────────────────────────────────────────────────────── var standardTools = []string{ "dmidecode", "smartctl", "nvme", "lspci", "ipmitool", "nvidia-smi", "memtester", "stress-ng", "nvtop", "mstflint", "qrencode", } func (h *handler) handleAPIToolsCheck(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } statuses := h.opts.App.CheckTools(standardTools) writeJSON(w, statuses) } // ── Preflight ───────────────────────────────────────────────────────────────── func (h *handler) handleAPIPreflight(w http.ResponseWriter, r *http.Request) { data, err := loadSnapshot(filepath.Join(h.opts.ExportDir, "runtime-health.json")) if err != nil { writeError(w, http.StatusNotFound, "runtime health not found") return } w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Cache-Control", "no-store") _, _ = w.Write(data) } // ── Install ─────────────────────────────────────────────────────────────────── func (h *handler) handleAPIInstallDisks(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } disks, err := h.opts.App.ListInstallDisks() if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } type diskJSON struct { Device string `json:"device"` Model string `json:"model"` Size string `json:"size"` SizeBytes int64 `json:"size_bytes"` MountedParts []string `json:"mounted_parts"` Warnings []string `json:"warnings"` } result := make([]diskJSON, 0, len(disks)) for _, d := range disks { result = append(result, diskJSON{ Device: d.Device, Model: d.Model, Size: d.Size, SizeBytes: d.SizeBytes, MountedParts: d.MountedParts, Warnings: platform.DiskWarnings(d), }) } writeJSON(w, result) } func (h *handler) handleAPIInstallRun(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } var req struct { Device string `json:"device"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Device == "" { writeError(w, http.StatusBadRequest, "device is required") return } // Whitelist: only allow devices that ListInstallDisks() returns. disks, err := h.opts.App.ListInstallDisks() if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } allowed := false for _, d := range disks { if d.Device == req.Device { allowed = true break } } if !allowed { writeError(w, http.StatusBadRequest, "device not in install candidate list") return } if globalQueue.hasActiveTarget("install-to-ram") { writeError(w, http.StatusConflict, "install to RAM task is already pending or running") return } h.installMu.Lock() if h.installJob != nil && !h.installJob.isDone() { h.installMu.Unlock() writeError(w, http.StatusConflict, "install already running") return } j := &jobState{} h.installJob = j h.installMu.Unlock() logFile := platform.InstallLogPath(req.Device) go runCmdJob(j, exec.CommandContext(context.Background(), "bee-install", req.Device, logFile)) w.WriteHeader(http.StatusNoContent) } func (h *handler) handleAPIInstallStream(w http.ResponseWriter, r *http.Request) { h.installMu.Lock() j := h.installJob h.installMu.Unlock() if j == nil { if !sseStart(w) { return } sseWrite(w, "done", "") return } streamJob(w, r, j) } // ── Metrics SSE ─────────────────────────────────────────────────────────────── func (h *handler) handleAPIMetricsStream(w http.ResponseWriter, r *http.Request) { if !sseStart(w) { return } ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-r.Context().Done(): return case <-ticker.C: sample, ok := h.latestMetric() if !ok { continue } b, err := json.Marshal(sample) if err != nil { continue } if !sseWrite(w, "metrics", string(b)) { return } } } } // feedRings pushes one sample into all in-memory ring buffers. func (h *handler) feedRings(sample platform.LiveMetricSample) { for _, t := range sample.Temps { switch t.Group { case "cpu": h.pushNamedMetricRing(&h.cpuTempRings, t.Name, t.Celsius) case "ambient": h.pushNamedMetricRing(&h.ambientTempRings, t.Name, t.Celsius) } } h.ringPower.push(sample.PowerW) h.ringCPULoad.push(sample.CPULoadPct) h.ringMemLoad.push(sample.MemLoadPct) h.ringsMu.Lock() for i, fan := range sample.Fans { for len(h.ringFans) <= i { h.ringFans = append(h.ringFans, newMetricsRing(120)) h.fanNames = append(h.fanNames, fan.Name) } h.ringFans[i].push(float64(fan.RPM)) } for _, gpu := range sample.GPUs { idx := gpu.GPUIndex for len(h.gpuRings) <= idx { h.gpuRings = append(h.gpuRings, &gpuRings{ Temp: newMetricsRing(120), Util: newMetricsRing(120), MemUtil: newMetricsRing(120), Power: newMetricsRing(120), }) } h.gpuRings[idx].Temp.push(gpu.TempC) h.gpuRings[idx].Util.push(gpu.UsagePct) h.gpuRings[idx].MemUtil.push(gpu.MemUsagePct) h.gpuRings[idx].Power.push(gpu.PowerW) } h.ringsMu.Unlock() } func (h *handler) pushNamedMetricRing(dst *[]*namedMetricsRing, name string, value float64) { if name == "" { return } for _, item := range *dst { if item != nil && item.Name == name && item.Ring != nil { item.Ring.push(value) return } } *dst = append(*dst, &namedMetricsRing{ Name: name, Ring: newMetricsRing(120), }) (*dst)[len(*dst)-1].Ring.push(value) } // ── Network toggle ──────────────────────────────────────────────────────────── const netRollbackTimeout = 60 * time.Second func (h *handler) handleAPINetworkToggle(w http.ResponseWriter, r *http.Request) { if h.opts.App == nil { writeError(w, http.StatusServiceUnavailable, "app not configured") return } var req struct { Iface string `json:"iface"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil || req.Iface == "" { writeError(w, http.StatusBadRequest, "iface is required") return } wasUp, err := h.opts.App.GetInterfaceState(req.Iface) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } if _, err := h.applyPendingNetworkChange(func() (app.ActionResult, error) { err := h.opts.App.SetInterfaceState(req.Iface, !wasUp) return app.ActionResult{}, err }); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } newState := "up" if wasUp { newState = "down" } writeJSON(w, map[string]any{ "iface": req.Iface, "new_state": newState, "rollback_in": int(netRollbackTimeout.Seconds()), }) } func (h *handler) applyPendingNetworkChange(apply func() (app.ActionResult, error)) (app.ActionResult, error) { if h.opts.App == nil { return app.ActionResult{}, fmt.Errorf("app not configured") } if err := h.rollbackPendingNetworkChange(); err != nil && err.Error() != "no pending network change" { return app.ActionResult{}, err } snapshot, err := h.opts.App.CaptureNetworkSnapshot() if err != nil { return app.ActionResult{}, err } result, err := apply() if err != nil { return result, err } pnc := &pendingNetChange{snapshot: snapshot} pnc.timer = time.AfterFunc(netRollbackTimeout, func() { _ = h.opts.App.RestoreNetworkSnapshot(snapshot) h.pendingNetMu.Lock() if h.pendingNet == pnc { h.pendingNet = nil } h.pendingNetMu.Unlock() }) h.pendingNetMu.Lock() h.pendingNet = pnc h.pendingNetMu.Unlock() return result, nil } func (h *handler) handleAPINetworkConfirm(w http.ResponseWriter, _ *http.Request) { h.pendingNetMu.Lock() pnc := h.pendingNet h.pendingNet = nil h.pendingNetMu.Unlock() if pnc != nil { pnc.mu.Lock() pnc.timer.Stop() pnc.mu.Unlock() } writeJSON(w, map[string]string{"status": "confirmed"}) } func (h *handler) handleAPINetworkRollback(w http.ResponseWriter, _ *http.Request) { if err := h.rollbackPendingNetworkChange(); err != nil { if err.Error() == "no pending network change" { writeError(w, http.StatusConflict, err.Error()) return } writeError(w, http.StatusInternalServerError, err.Error()) return } writeJSON(w, map[string]string{"status": "rolled back"}) } func (h *handler) rollbackPendingNetworkChange() error { h.pendingNetMu.Lock() pnc := h.pendingNet h.pendingNet = nil h.pendingNetMu.Unlock() if pnc == nil { return fmt.Errorf("no pending network change") } pnc.mu.Lock() pnc.timer.Stop() pnc.mu.Unlock() if h.opts.App != nil { return h.opts.App.RestoreNetworkSnapshot(pnc.snapshot) } return nil }