refactor(webui): queue install and bundle tasks - v3.18

This commit is contained in:
Mikhail Chusavitin
2026-04-01 08:46:46 +03:00
parent 3472afea32
commit c394845b34
9 changed files with 410 additions and 86 deletions

View File

@@ -2,7 +2,6 @@ package webui
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
@@ -87,15 +86,16 @@ func streamJob(w http.ResponseWriter, r *http.Request, j *jobState) {
}
}
// runCmdJob runs an exec.Cmd as a background job, streaming stdout+stderr lines.
func runCmdJob(j *jobState, cmd *exec.Cmd) {
// streamCmdJob runs an exec.Cmd and streams stdout+stderr lines into j.
func streamCmdJob(j *jobState, cmd *exec.Cmd) error {
pr, pw := io.Pipe()
cmd.Stdout = pw
cmd.Stderr = pw
if err := cmd.Start(); err != nil {
j.finish(err.Error())
return
_ = pw.Close()
_ = pr.Close()
return err
}
// Lower the CPU scheduling priority of stress/audit subprocesses to nice+10
// so the X server and kernel interrupt handling remain responsive under load
@@ -104,8 +104,10 @@ func runCmdJob(j *jobState, cmd *exec.Cmd) {
_ = syscall.Setpriority(syscall.PRIO_PROCESS, cmd.Process.Pid, 10)
}
scanDone := make(chan error, 1)
go func() {
scanner := bufio.NewScanner(pr)
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
// Split on \r to handle progress-bar style output (e.g. \r overwrites)
// and strip ANSI escape codes so logs are readable in the browser.
@@ -117,15 +119,21 @@ func runCmdJob(j *jobState, cmd *exec.Cmd) {
}
}
}
if err := scanner.Err(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
scanDone <- err
return
}
scanDone <- nil
}()
err := cmd.Wait()
_ = pw.Close()
scanErr := <-scanDone
_ = pr.Close()
if err != nil {
j.finish(err.Error())
} else {
j.finish("")
return err
}
return scanErr
}
// ── Audit ─────────────────────────────────────────────────────────────────────
@@ -417,15 +425,23 @@ func (h *handler) handleAPIExportList(w http.ResponseWriter, r *http.Request) {
}
func (h *handler) handleAPIExportBundle(w http.ResponseWriter, r *http.Request) {
archive, err := app.BuildSupportBundle(h.opts.ExportDir)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
if globalQueue.hasActiveTarget("support-bundle") {
writeError(w, http.StatusConflict, "support bundle task is already pending or running")
return
}
t := &Task{
ID: newJobID("support-bundle"),
Name: "Support Bundle",
Target: "support-bundle",
Status: TaskPending,
CreatedAt: time.Now(),
}
globalQueue.enqueue(t)
writeJSON(w, map[string]string{
"status": "ok",
"path": archive,
"url": "/export/support.tar.gz",
"status": "queued",
"task_id": t.ID,
"job_id": t.ID,
"url": "/export/support.tar.gz",
})
}
@@ -513,10 +529,7 @@ func (h *handler) handleAPIInstallToRAM(w http.ResponseWriter, r *http.Request)
writeError(w, http.StatusServiceUnavailable, "app not configured")
return
}
h.installMu.Lock()
installRunning := h.installJob != nil && !h.installJob.isDone()
h.installMu.Unlock()
if installRunning {
if globalQueue.hasActiveTarget("install") {
writeError(w, http.StatusConflict, "install to disk is already running")
return
}
@@ -631,35 +644,23 @@ func (h *handler) handleAPIInstallRun(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusConflict, "install to RAM task is already pending or running")
return
}
h.installMu.Lock()
if h.installJob != nil && !h.installJob.isDone() {
h.installMu.Unlock()
writeError(w, http.StatusConflict, "install already running")
if globalQueue.hasActiveTarget("install") {
writeError(w, http.StatusConflict, "install task is already pending or running")
return
}
j := &jobState{}
h.installJob = j
h.installMu.Unlock()
logFile := platform.InstallLogPath(req.Device)
go runCmdJob(j, exec.CommandContext(context.Background(), "bee-install", req.Device, logFile))
w.WriteHeader(http.StatusNoContent)
}
func (h *handler) handleAPIInstallStream(w http.ResponseWriter, r *http.Request) {
h.installMu.Lock()
j := h.installJob
h.installMu.Unlock()
if j == nil {
if !sseStart(w) {
return
}
sseWrite(w, "done", "")
return
t := &Task{
ID: newJobID("install"),
Name: "Install to Disk",
Target: "install",
Priority: 20,
Status: TaskPending,
CreatedAt: time.Now(),
params: taskParams{
Device: req.Device,
},
}
streamJob(w, r, j)
globalQueue.enqueue(t)
writeJSON(w, map[string]string{"task_id": t.ID, "job_id": t.ID})
}
// ── Metrics SSE ───────────────────────────────────────────────────────────────