Compare commits

..

1 Commits
v9.9 ... main

Author SHA1 Message Date
Mikhail Chusavitin
58d6da0e4f Fix live task logs and SAT windows 2026-04-30 17:26:45 +03:00
3 changed files with 108 additions and 0 deletions

View File

@@ -91,6 +91,7 @@ func (j *jobState) writeLogLineLocked(line string) {
j.logBuf = bufio.NewWriterSize(f, 64*1024)
}
_, _ = j.logBuf.WriteString(line + "\n")
_ = j.logBuf.Flush()
}
// closeLog flushes and closes the log file. Called after all task output is done.

View File

@@ -600,6 +600,17 @@ func (q *taskQueue) startRecoveredTaskMonitorLocked(t *Task, j *jobState) {
}
func (q *taskQueue) runTaskExternal(t *Task, j *jobState) {
startedKmsgWatch := false
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
startedKmsgWatch = true
}
defer func() {
if startedKmsgWatch && q.kmsgWatcher != nil {
q.kmsgWatcher.NotifyTaskFinished(t.ID)
}
}()
stopTail := make(chan struct{})
doneTail := make(chan struct{})
defer func() {

View File

@@ -126,6 +126,23 @@ func TestNewTaskJobStateLoadsExistingLog(t *testing.T) {
}
}
func TestJobAppendFlushesTaskLogImmediately(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "task.log")
j := newTaskJobState(path)
j.append("live-line")
data, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
if string(data) != "live-line\n" {
t.Fatalf("log=%q want live-line newline", string(data))
}
j.closeLog()
}
func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
q := &taskQueue{
@@ -849,3 +866,82 @@ func TestExecuteTaskMarksPanicsAsFailedAndClosesKmsgWindow(t *testing.T) {
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
}
}
func TestRunTaskExternalOpensAndClosesKmsgWindow(t *testing.T) {
dir := t.TempDir()
releasePath := filepath.Join(dir, "release")
readyPath := filepath.Join(dir, "ready")
q := &taskQueue{
opts: &HandlerOptions{ExportDir: dir},
logsDir: filepath.Join(dir, "tasks"),
kmsgWatcher: newKmsgWatcher(nil),
trigger: make(chan struct{}, 1),
}
if err := os.MkdirAll(q.logsDir, 0755); err != nil {
t.Fatal(err)
}
tk := &Task{
ID: "cpu-external-1",
Name: "CPU SAT",
Target: "cpu",
Status: TaskRunning,
CreatedAt: time.Now(),
}
q.assignTaskLogPathLocked(tk)
j := newTaskJobState(tk.LogPath)
orig := externalTaskRunnerCommand
externalTaskRunnerCommand = func(exportDir, taskID string) (*exec.Cmd, error) {
script := "printf ready > \"$1\"; while [ ! -f \"$2\" ]; do sleep 0.05; done"
return exec.Command("sh", "-c", script, "sh", readyPath, releasePath), nil
}
defer func() { externalTaskRunnerCommand = orig }()
done := make(chan struct{})
go func() {
q.runTaskExternal(tk, j)
close(done)
}()
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if _, err := os.Stat(readyPath); err == nil {
break
}
time.Sleep(20 * time.Millisecond)
}
if _, err := os.Stat(readyPath); err != nil {
t.Fatalf("external runner did not start: %v", err)
}
q.kmsgWatcher.mu.Lock()
activeCount := q.kmsgWatcher.activeCount
window := q.kmsgWatcher.window
q.kmsgWatcher.mu.Unlock()
if activeCount != 1 {
t.Fatalf("activeCount while running=%d want 1", activeCount)
}
if window == nil || len(window.targets) != 1 || window.targets[0] != "cpu" {
t.Fatalf("window while running=%+v", window)
}
if err := os.WriteFile(releasePath, []byte("1\n"), 0644); err != nil {
t.Fatal(err)
}
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("runTaskExternal did not return")
}
q.kmsgWatcher.mu.Lock()
activeCount = q.kmsgWatcher.activeCount
window = q.kmsgWatcher.window
q.kmsgWatcher.mu.Unlock()
if activeCount != 0 {
t.Fatalf("activeCount after finish=%d want 0", activeCount)
}
if window != nil {
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
}
}