Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
58d6da0e4f |
@@ -91,6 +91,7 @@ func (j *jobState) writeLogLineLocked(line string) {
|
|||||||
j.logBuf = bufio.NewWriterSize(f, 64*1024)
|
j.logBuf = bufio.NewWriterSize(f, 64*1024)
|
||||||
}
|
}
|
||||||
_, _ = j.logBuf.WriteString(line + "\n")
|
_, _ = j.logBuf.WriteString(line + "\n")
|
||||||
|
_ = j.logBuf.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeLog flushes and closes the log file. Called after all task output is done.
|
// closeLog flushes and closes the log file. Called after all task output is done.
|
||||||
|
|||||||
@@ -600,6 +600,17 @@ func (q *taskQueue) startRecoveredTaskMonitorLocked(t *Task, j *jobState) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (q *taskQueue) runTaskExternal(t *Task, j *jobState) {
|
func (q *taskQueue) runTaskExternal(t *Task, j *jobState) {
|
||||||
|
startedKmsgWatch := false
|
||||||
|
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
|
||||||
|
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
|
||||||
|
startedKmsgWatch = true
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if startedKmsgWatch && q.kmsgWatcher != nil {
|
||||||
|
q.kmsgWatcher.NotifyTaskFinished(t.ID)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
stopTail := make(chan struct{})
|
stopTail := make(chan struct{})
|
||||||
doneTail := make(chan struct{})
|
doneTail := make(chan struct{})
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|||||||
@@ -126,6 +126,23 @@ func TestNewTaskJobStateLoadsExistingLog(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestJobAppendFlushesTaskLogImmediately(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
path := filepath.Join(dir, "task.log")
|
||||||
|
j := newTaskJobState(path)
|
||||||
|
|
||||||
|
j.append("live-line")
|
||||||
|
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if string(data) != "live-line\n" {
|
||||||
|
t.Fatalf("log=%q want live-line newline", string(data))
|
||||||
|
}
|
||||||
|
j.closeLog()
|
||||||
|
}
|
||||||
|
|
||||||
func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
|
func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
|
||||||
now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
|
now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
|
||||||
q := &taskQueue{
|
q := &taskQueue{
|
||||||
@@ -849,3 +866,82 @@ func TestExecuteTaskMarksPanicsAsFailedAndClosesKmsgWindow(t *testing.T) {
|
|||||||
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
|
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRunTaskExternalOpensAndClosesKmsgWindow(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
releasePath := filepath.Join(dir, "release")
|
||||||
|
readyPath := filepath.Join(dir, "ready")
|
||||||
|
q := &taskQueue{
|
||||||
|
opts: &HandlerOptions{ExportDir: dir},
|
||||||
|
logsDir: filepath.Join(dir, "tasks"),
|
||||||
|
kmsgWatcher: newKmsgWatcher(nil),
|
||||||
|
trigger: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(q.logsDir, 0755); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
tk := &Task{
|
||||||
|
ID: "cpu-external-1",
|
||||||
|
Name: "CPU SAT",
|
||||||
|
Target: "cpu",
|
||||||
|
Status: TaskRunning,
|
||||||
|
CreatedAt: time.Now(),
|
||||||
|
}
|
||||||
|
q.assignTaskLogPathLocked(tk)
|
||||||
|
j := newTaskJobState(tk.LogPath)
|
||||||
|
|
||||||
|
orig := externalTaskRunnerCommand
|
||||||
|
externalTaskRunnerCommand = func(exportDir, taskID string) (*exec.Cmd, error) {
|
||||||
|
script := "printf ready > \"$1\"; while [ ! -f \"$2\" ]; do sleep 0.05; done"
|
||||||
|
return exec.Command("sh", "-c", script, "sh", readyPath, releasePath), nil
|
||||||
|
}
|
||||||
|
defer func() { externalTaskRunnerCommand = orig }()
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
q.runTaskExternal(tk, j)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if _, err := os.Stat(readyPath); err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if _, err := os.Stat(readyPath); err != nil {
|
||||||
|
t.Fatalf("external runner did not start: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
q.kmsgWatcher.mu.Lock()
|
||||||
|
activeCount := q.kmsgWatcher.activeCount
|
||||||
|
window := q.kmsgWatcher.window
|
||||||
|
q.kmsgWatcher.mu.Unlock()
|
||||||
|
if activeCount != 1 {
|
||||||
|
t.Fatalf("activeCount while running=%d want 1", activeCount)
|
||||||
|
}
|
||||||
|
if window == nil || len(window.targets) != 1 || window.targets[0] != "cpu" {
|
||||||
|
t.Fatalf("window while running=%+v", window)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.WriteFile(releasePath, []byte("1\n"), 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("runTaskExternal did not return")
|
||||||
|
}
|
||||||
|
|
||||||
|
q.kmsgWatcher.mu.Lock()
|
||||||
|
activeCount = q.kmsgWatcher.activeCount
|
||||||
|
window = q.kmsgWatcher.window
|
||||||
|
q.kmsgWatcher.mu.Unlock()
|
||||||
|
if activeCount != 0 {
|
||||||
|
t.Fatalf("activeCount after finish=%d want 0", activeCount)
|
||||||
|
}
|
||||||
|
if window != nil {
|
||||||
|
t.Fatalf("expected kmsg window to be cleared, got %+v", window)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user