Fix live task logs and SAT windows
This commit is contained in:
@@ -91,6 +91,7 @@ func (j *jobState) writeLogLineLocked(line string) {
|
||||
j.logBuf = bufio.NewWriterSize(f, 64*1024)
|
||||
}
|
||||
_, _ = j.logBuf.WriteString(line + "\n")
|
||||
_ = j.logBuf.Flush()
|
||||
}
|
||||
|
||||
// closeLog flushes and closes the log file. Called after all task output is done.
|
||||
|
||||
@@ -600,6 +600,17 @@ func (q *taskQueue) startRecoveredTaskMonitorLocked(t *Task, j *jobState) {
|
||||
}
|
||||
|
||||
func (q *taskQueue) runTaskExternal(t *Task, j *jobState) {
|
||||
startedKmsgWatch := false
|
||||
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
|
||||
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
|
||||
startedKmsgWatch = true
|
||||
}
|
||||
defer func() {
|
||||
if startedKmsgWatch && q.kmsgWatcher != nil {
|
||||
q.kmsgWatcher.NotifyTaskFinished(t.ID)
|
||||
}
|
||||
}()
|
||||
|
||||
stopTail := make(chan struct{})
|
||||
doneTail := make(chan struct{})
|
||||
defer func() {
|
||||
|
||||
@@ -126,6 +126,23 @@ func TestNewTaskJobStateLoadsExistingLog(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobAppendFlushesTaskLogImmediately(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
path := filepath.Join(dir, "task.log")
|
||||
j := newTaskJobState(path)
|
||||
|
||||
j.append("live-line")
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(data) != "live-line\n" {
|
||||
t.Fatalf("log=%q want live-line newline", string(data))
|
||||
}
|
||||
j.closeLog()
|
||||
}
|
||||
|
||||
func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
|
||||
now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
|
||||
q := &taskQueue{
|
||||
@@ -849,3 +866,82 @@ func TestExecuteTaskMarksPanicsAsFailedAndClosesKmsgWindow(t *testing.T) {
|
||||
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunTaskExternalOpensAndClosesKmsgWindow(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
releasePath := filepath.Join(dir, "release")
|
||||
readyPath := filepath.Join(dir, "ready")
|
||||
q := &taskQueue{
|
||||
opts: &HandlerOptions{ExportDir: dir},
|
||||
logsDir: filepath.Join(dir, "tasks"),
|
||||
kmsgWatcher: newKmsgWatcher(nil),
|
||||
trigger: make(chan struct{}, 1),
|
||||
}
|
||||
if err := os.MkdirAll(q.logsDir, 0755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tk := &Task{
|
||||
ID: "cpu-external-1",
|
||||
Name: "CPU SAT",
|
||||
Target: "cpu",
|
||||
Status: TaskRunning,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
q.assignTaskLogPathLocked(tk)
|
||||
j := newTaskJobState(tk.LogPath)
|
||||
|
||||
orig := externalTaskRunnerCommand
|
||||
externalTaskRunnerCommand = func(exportDir, taskID string) (*exec.Cmd, error) {
|
||||
script := "printf ready > \"$1\"; while [ ! -f \"$2\" ]; do sleep 0.05; done"
|
||||
return exec.Command("sh", "-c", script, "sh", readyPath, releasePath), nil
|
||||
}
|
||||
defer func() { externalTaskRunnerCommand = orig }()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
q.runTaskExternal(tk, j)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if _, err := os.Stat(readyPath); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
if _, err := os.Stat(readyPath); err != nil {
|
||||
t.Fatalf("external runner did not start: %v", err)
|
||||
}
|
||||
|
||||
q.kmsgWatcher.mu.Lock()
|
||||
activeCount := q.kmsgWatcher.activeCount
|
||||
window := q.kmsgWatcher.window
|
||||
q.kmsgWatcher.mu.Unlock()
|
||||
if activeCount != 1 {
|
||||
t.Fatalf("activeCount while running=%d want 1", activeCount)
|
||||
}
|
||||
if window == nil || len(window.targets) != 1 || window.targets[0] != "cpu" {
|
||||
t.Fatalf("window while running=%+v", window)
|
||||
}
|
||||
|
||||
if err := os.WriteFile(releasePath, []byte("1\n"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("runTaskExternal did not return")
|
||||
}
|
||||
|
||||
q.kmsgWatcher.mu.Lock()
|
||||
activeCount = q.kmsgWatcher.activeCount
|
||||
window = q.kmsgWatcher.window
|
||||
q.kmsgWatcher.mu.Unlock()
|
||||
if activeCount != 0 {
|
||||
t.Fatalf("activeCount after finish=%d want 0", activeCount)
|
||||
}
|
||||
if window != nil {
|
||||
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user