From f1621efee41690b58959476b7dc65dc28f949983 Mon Sep 17 00:00:00 2001 From: Michael Chus Date: Sun, 5 Apr 2026 18:34:06 +0300 Subject: [PATCH] Mirror task lifecycle to serial console --- audit/internal/webui/jobs.go | 23 +++++++++----- audit/internal/webui/serial_console.go | 41 ++++++++++++++++++++++++ audit/internal/webui/tasks.go | 14 ++++++++- audit/internal/webui/tasks_test.go | 43 ++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 9 deletions(-) create mode 100644 audit/internal/webui/serial_console.go diff --git a/audit/internal/webui/jobs.go b/audit/internal/webui/jobs.go index 1e08560..5924a5b 100644 --- a/audit/internal/webui/jobs.go +++ b/audit/internal/webui/jobs.go @@ -9,13 +9,14 @@ import ( // jobState holds the output lines and completion status of an async job. type jobState struct { - lines []string - done bool - err string - mu sync.Mutex - subs []chan string - cancel func() // optional cancel function; nil if job is not cancellable - logPath string + lines []string + done bool + err string + mu sync.Mutex + subs []chan string + cancel func() // optional cancel function; nil if job is not cancellable + logPath string + serialPrefix string } // abort cancels the job if it has a cancel function and is not yet done. @@ -36,6 +37,9 @@ func (j *jobState) append(line string) { if j.logPath != "" { appendJobLog(j.logPath, line) } + if j.serialPrefix != "" { + taskSerialWriteLine(j.serialPrefix + line) + } for _, ch := range j.subs { select { case ch <- line: @@ -107,8 +111,11 @@ func (m *jobManager) get(id string) (*jobState, bool) { return j, ok } -func newTaskJobState(logPath string) *jobState { +func newTaskJobState(logPath string, serialPrefix ...string) *jobState { j := &jobState{logPath: logPath} + if len(serialPrefix) > 0 { + j.serialPrefix = serialPrefix[0] + } if logPath == "" { return j } diff --git a/audit/internal/webui/serial_console.go b/audit/internal/webui/serial_console.go new file mode 100644 index 0000000..dfc12e1 --- /dev/null +++ b/audit/internal/webui/serial_console.go @@ -0,0 +1,41 @@ +package webui + +import ( + "fmt" + "os" + "strings" + "time" +) + +var taskSerialWriteLine = writeTaskSerialLine + +func writeTaskSerialLine(line string) { + line = strings.TrimSpace(line) + if line == "" { + return + } + payload := fmt.Sprintf("%s %s\n", time.Now().UTC().Format("2006-01-02 15:04:05Z"), line) + for _, path := range []string{"/dev/ttyS0", "/dev/ttyS1", "/dev/console"} { + f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0) + if err != nil { + continue + } + _, _ = f.WriteString(payload) + _ = f.Close() + return + } +} + +func taskSerialPrefix(t *Task) string { + if t == nil { + return "[task] " + } + return fmt.Sprintf("[task %s %s] ", t.ID, t.Name) +} + +func taskSerialEvent(t *Task, event string) { + if t == nil { + return + } + taskSerialWriteLine(fmt.Sprintf("%s%s", taskSerialPrefix(t), strings.TrimSpace(event))) +} diff --git a/audit/internal/webui/tasks.go b/audit/internal/webui/tasks.go index 9e1753b..19056b3 100644 --- a/audit/internal/webui/tasks.go +++ b/audit/internal/webui/tasks.go @@ -258,6 +258,7 @@ func (q *taskQueue) enqueue(t *Task) { q.prune() q.persistLocked() q.mu.Unlock() + taskSerialEvent(t, "queued") select { case q.trigger <- struct{}{}: default: @@ -435,7 +436,7 @@ func (q *taskQueue) worker() { t.StartedAt = &now t.DoneAt = nil t.ErrMsg = "" - j := newTaskJobState(t.LogPath) + j := newTaskJobState(t.LogPath, taskSerialPrefix(t)) t.job = j batch = append(batch, t) } @@ -520,6 +521,11 @@ func (q *taskQueue) finalizeTaskRun(t *Task, j *jobState) { if err := writeTaskReportArtifacts(t); err != nil { appendJobLog(t.LogPath, "WARN: task report generation failed: "+err.Error()) } + if t.ErrMsg != "" { + taskSerialEvent(t, "finished with status="+t.Status+" error="+t.ErrMsg) + return + } + taskSerialEvent(t, "finished with status="+t.Status) } // setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files. @@ -858,6 +864,7 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) { now := time.Now() t.DoneAt = &now globalQueue.persistLocked() + taskSerialEvent(t, "finished with status="+t.Status) writeJSON(w, map[string]string{"status": "cancelled"}) case TaskRunning: if t.job != nil { @@ -867,6 +874,7 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) { now := time.Now() t.DoneAt = &now globalQueue.persistLocked() + taskSerialEvent(t, "finished with status="+t.Status) writeJSON(w, map[string]string{"status": "cancelled"}) default: writeError(w, http.StatusConflict, "task is not running or pending") @@ -907,6 +915,7 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request case TaskPending: t.Status = TaskCancelled t.DoneAt = &now + taskSerialEvent(t, "finished with status="+t.Status) n++ case TaskRunning: if t.job != nil { @@ -914,6 +923,7 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request } t.Status = TaskCancelled t.DoneAt = &now + taskSerialEvent(t, "finished with status="+t.Status) n++ } } @@ -932,6 +942,7 @@ func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Reque case TaskPending: t.Status = TaskCancelled t.DoneAt = &now + taskSerialEvent(t, "finished with status="+t.Status) cancelled++ case TaskRunning: if t.job != nil { @@ -939,6 +950,7 @@ func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Reque } t.Status = TaskCancelled t.DoneAt = &now + taskSerialEvent(t, "finished with status="+t.Status) cancelled++ } } diff --git a/audit/internal/webui/tasks_test.go b/audit/internal/webui/tasks_test.go index 4ebf4e0..ad39f87 100644 --- a/audit/internal/webui/tasks_test.go +++ b/audit/internal/webui/tasks_test.go @@ -325,6 +325,49 @@ func TestFinalizeTaskRunCreatesReportFolderAndArtifacts(t *testing.T) { } } +func TestTaskLifecycleMirrorsToSerialConsole(t *testing.T) { + var lines []string + prev := taskSerialWriteLine + taskSerialWriteLine = func(line string) { lines = append(lines, line) } + t.Cleanup(func() { taskSerialWriteLine = prev }) + + dir := t.TempDir() + q := &taskQueue{ + statePath: filepath.Join(dir, "tasks-state.json"), + logsDir: filepath.Join(dir, "tasks"), + trigger: make(chan struct{}, 1), + } + task := &Task{ + ID: "task-serial-1", + Name: "CPU SAT", + Target: "cpu", + Status: TaskPending, + CreatedAt: time.Now().UTC(), + } + + q.enqueue(task) + started := time.Now().UTC() + task.Status = TaskRunning + task.StartedAt = &started + job := newTaskJobState(task.LogPath, taskSerialPrefix(task)) + job.append("Starting CPU SAT...") + job.append("CPU stress duration: 60s") + job.finish("") + q.finalizeTaskRun(task, job) + + joined := strings.Join(lines, "\n") + for _, needle := range []string{ + "queued", + "Starting CPU SAT...", + "CPU stress duration: 60s", + "finished with status=done", + } { + if !strings.Contains(joined, needle) { + t.Fatalf("serial mirror missing %q in %q", needle, joined) + } + } +} + func TestResolveBurnPreset(t *testing.T) { tests := []struct { profile string