Mirror task lifecycle to serial console
This commit is contained in:
@@ -9,13 +9,14 @@ import (
|
||||
|
||||
// jobState holds the output lines and completion status of an async job.
|
||||
type jobState struct {
|
||||
lines []string
|
||||
done bool
|
||||
err string
|
||||
mu sync.Mutex
|
||||
subs []chan string
|
||||
cancel func() // optional cancel function; nil if job is not cancellable
|
||||
logPath string
|
||||
lines []string
|
||||
done bool
|
||||
err string
|
||||
mu sync.Mutex
|
||||
subs []chan string
|
||||
cancel func() // optional cancel function; nil if job is not cancellable
|
||||
logPath string
|
||||
serialPrefix string
|
||||
}
|
||||
|
||||
// abort cancels the job if it has a cancel function and is not yet done.
|
||||
@@ -36,6 +37,9 @@ func (j *jobState) append(line string) {
|
||||
if j.logPath != "" {
|
||||
appendJobLog(j.logPath, line)
|
||||
}
|
||||
if j.serialPrefix != "" {
|
||||
taskSerialWriteLine(j.serialPrefix + line)
|
||||
}
|
||||
for _, ch := range j.subs {
|
||||
select {
|
||||
case ch <- line:
|
||||
@@ -107,8 +111,11 @@ func (m *jobManager) get(id string) (*jobState, bool) {
|
||||
return j, ok
|
||||
}
|
||||
|
||||
func newTaskJobState(logPath string) *jobState {
|
||||
func newTaskJobState(logPath string, serialPrefix ...string) *jobState {
|
||||
j := &jobState{logPath: logPath}
|
||||
if len(serialPrefix) > 0 {
|
||||
j.serialPrefix = serialPrefix[0]
|
||||
}
|
||||
if logPath == "" {
|
||||
return j
|
||||
}
|
||||
|
||||
41
audit/internal/webui/serial_console.go
Normal file
41
audit/internal/webui/serial_console.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package webui
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
var taskSerialWriteLine = writeTaskSerialLine
|
||||
|
||||
func writeTaskSerialLine(line string) {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
return
|
||||
}
|
||||
payload := fmt.Sprintf("%s %s\n", time.Now().UTC().Format("2006-01-02 15:04:05Z"), line)
|
||||
for _, path := range []string{"/dev/ttyS0", "/dev/ttyS1", "/dev/console"} {
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND, 0)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_, _ = f.WriteString(payload)
|
||||
_ = f.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func taskSerialPrefix(t *Task) string {
|
||||
if t == nil {
|
||||
return "[task] "
|
||||
}
|
||||
return fmt.Sprintf("[task %s %s] ", t.ID, t.Name)
|
||||
}
|
||||
|
||||
func taskSerialEvent(t *Task, event string) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
taskSerialWriteLine(fmt.Sprintf("%s%s", taskSerialPrefix(t), strings.TrimSpace(event)))
|
||||
}
|
||||
@@ -258,6 +258,7 @@ func (q *taskQueue) enqueue(t *Task) {
|
||||
q.prune()
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
taskSerialEvent(t, "queued")
|
||||
select {
|
||||
case q.trigger <- struct{}{}:
|
||||
default:
|
||||
@@ -435,7 +436,7 @@ func (q *taskQueue) worker() {
|
||||
t.StartedAt = &now
|
||||
t.DoneAt = nil
|
||||
t.ErrMsg = ""
|
||||
j := newTaskJobState(t.LogPath)
|
||||
j := newTaskJobState(t.LogPath, taskSerialPrefix(t))
|
||||
t.job = j
|
||||
batch = append(batch, t)
|
||||
}
|
||||
@@ -520,6 +521,11 @@ func (q *taskQueue) finalizeTaskRun(t *Task, j *jobState) {
|
||||
if err := writeTaskReportArtifacts(t); err != nil {
|
||||
appendJobLog(t.LogPath, "WARN: task report generation failed: "+err.Error())
|
||||
}
|
||||
if t.ErrMsg != "" {
|
||||
taskSerialEvent(t, "finished with status="+t.Status+" error="+t.ErrMsg)
|
||||
return
|
||||
}
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
}
|
||||
|
||||
// setCPUGovernor writes the given governor to all CPU scaling_governor sysfs files.
|
||||
@@ -858,6 +864,7 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now()
|
||||
t.DoneAt = &now
|
||||
globalQueue.persistLocked()
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
writeJSON(w, map[string]string{"status": "cancelled"})
|
||||
case TaskRunning:
|
||||
if t.job != nil {
|
||||
@@ -867,6 +874,7 @@ func (h *handler) handleAPITasksCancel(w http.ResponseWriter, r *http.Request) {
|
||||
now := time.Now()
|
||||
t.DoneAt = &now
|
||||
globalQueue.persistLocked()
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
writeJSON(w, map[string]string{"status": "cancelled"})
|
||||
default:
|
||||
writeError(w, http.StatusConflict, "task is not running or pending")
|
||||
@@ -907,6 +915,7 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request
|
||||
case TaskPending:
|
||||
t.Status = TaskCancelled
|
||||
t.DoneAt = &now
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
n++
|
||||
case TaskRunning:
|
||||
if t.job != nil {
|
||||
@@ -914,6 +923,7 @@ func (h *handler) handleAPITasksCancelAll(w http.ResponseWriter, _ *http.Request
|
||||
}
|
||||
t.Status = TaskCancelled
|
||||
t.DoneAt = &now
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
n++
|
||||
}
|
||||
}
|
||||
@@ -932,6 +942,7 @@ func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Reque
|
||||
case TaskPending:
|
||||
t.Status = TaskCancelled
|
||||
t.DoneAt = &now
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
cancelled++
|
||||
case TaskRunning:
|
||||
if t.job != nil {
|
||||
@@ -939,6 +950,7 @@ func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Reque
|
||||
}
|
||||
t.Status = TaskCancelled
|
||||
t.DoneAt = &now
|
||||
taskSerialEvent(t, "finished with status="+t.Status)
|
||||
cancelled++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -325,6 +325,49 @@ func TestFinalizeTaskRunCreatesReportFolderAndArtifacts(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskLifecycleMirrorsToSerialConsole(t *testing.T) {
|
||||
var lines []string
|
||||
prev := taskSerialWriteLine
|
||||
taskSerialWriteLine = func(line string) { lines = append(lines, line) }
|
||||
t.Cleanup(func() { taskSerialWriteLine = prev })
|
||||
|
||||
dir := t.TempDir()
|
||||
q := &taskQueue{
|
||||
statePath: filepath.Join(dir, "tasks-state.json"),
|
||||
logsDir: filepath.Join(dir, "tasks"),
|
||||
trigger: make(chan struct{}, 1),
|
||||
}
|
||||
task := &Task{
|
||||
ID: "task-serial-1",
|
||||
Name: "CPU SAT",
|
||||
Target: "cpu",
|
||||
Status: TaskPending,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
}
|
||||
|
||||
q.enqueue(task)
|
||||
started := time.Now().UTC()
|
||||
task.Status = TaskRunning
|
||||
task.StartedAt = &started
|
||||
job := newTaskJobState(task.LogPath, taskSerialPrefix(task))
|
||||
job.append("Starting CPU SAT...")
|
||||
job.append("CPU stress duration: 60s")
|
||||
job.finish("")
|
||||
q.finalizeTaskRun(task, job)
|
||||
|
||||
joined := strings.Join(lines, "\n")
|
||||
for _, needle := range []string{
|
||||
"queued",
|
||||
"Starting CPU SAT...",
|
||||
"CPU stress duration: 60s",
|
||||
"finished with status=done",
|
||||
} {
|
||||
if !strings.Contains(joined, needle) {
|
||||
t.Fatalf("serial mirror missing %q in %q", needle, joined)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveBurnPreset(t *testing.T) {
|
||||
tests := []struct {
|
||||
profile string
|
||||
|
||||
Reference in New Issue
Block a user