Logs —
-
+
`
diff --git a/audit/internal/webui/server_test.go b/audit/internal/webui/server_test.go
index e563876..6978f82 100644
--- a/audit/internal/webui/server_test.go
+++ b/audit/internal/webui/server_test.go
@@ -359,6 +359,25 @@ func TestAuditPageRendersViewerFrameAndActions(t *testing.T) {
}
}
+func TestTasksPageRendersLogModalAndPaginationControls(t *testing.T) {
+ handler := NewHandler(HandlerOptions{})
+ rec := httptest.NewRecorder()
+ handler.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/tasks", nil))
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d", rec.Code)
+ }
+ body := rec.Body.String()
+ if !strings.Contains(body, `id="task-log-overlay"`) {
+ t.Fatalf("tasks page missing log modal overlay: %s", body)
+ }
+ if !strings.Contains(body, `_taskPageSize = 50`) {
+ t.Fatalf("tasks page missing pagination size config: %s", body)
+ }
+ if !strings.Contains(body, `Previous`) || !strings.Contains(body, `Next`) {
+ t.Fatalf("tasks page missing pagination controls: %s", body)
+ }
+}
+
func TestViewerRendersLatestSnapshot(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "audit.json")
diff --git a/audit/internal/webui/tasks.go b/audit/internal/webui/tasks.go
index 4341740..a11dd4d 100644
--- a/audit/internal/webui/tasks.go
+++ b/audit/internal/webui/tasks.go
@@ -291,6 +291,30 @@ func (q *taskQueue) findJob(id string) (*jobState, bool) {
return t.job, true
}
+type taskStreamSource struct {
+ status string
+ errMsg string
+ logPath string
+ job *jobState
+}
+
+func (q *taskQueue) taskStreamSource(id string) (taskStreamSource, bool) {
+ q.mu.Lock()
+ defer q.mu.Unlock()
+ for _, t := range q.tasks {
+ if t.ID != id {
+ continue
+ }
+ return taskStreamSource{
+ status: t.Status,
+ errMsg: t.ErrMsg,
+ logPath: t.LogPath,
+ job: t.job,
+ }, true
+ }
+ return taskStreamSource{}, false
+}
+
func (q *taskQueue) hasActiveTarget(target string) bool {
q.mu.Lock()
defer q.mu.Unlock()
@@ -305,7 +329,7 @@ func (q *taskQueue) hasActiveTarget(target string) bool {
return false
}
-// snapshot returns a copy of all tasks sorted for display (running first, then pending by priority, then done by doneAt desc).
+// snapshot returns a copy of all tasks sorted for display with newest tasks first.
func (q *taskQueue) snapshot() []Task {
q.mu.Lock()
defer q.mu.Unlock()
@@ -315,6 +339,9 @@ func (q *taskQueue) snapshot() []Task {
out[i].ElapsedSec = taskElapsedSec(&out[i], time.Now())
}
sort.SliceStable(out, func(i, j int) bool {
+ if !out[i].CreatedAt.Equal(out[j].CreatedAt) {
+ return out[i].CreatedAt.After(out[j].CreatedAt)
+ }
si := statusOrder(out[i].Status)
sj := statusOrder(out[j].Status)
if si != sj {
@@ -323,7 +350,7 @@ func (q *taskQueue) snapshot() []Task {
if out[i].Priority != out[j].Priority {
return out[i].Priority > out[j].Priority
}
- return out[i].CreatedAt.Before(out[j].CreatedAt)
+ return out[i].Name < out[j].Name
})
return out
}
@@ -750,21 +777,49 @@ func (h *handler) handleAPITasksKillWorkers(w http.ResponseWriter, _ *http.Reque
func (h *handler) handleAPITasksStream(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
- // Wait up to 5s for the task to get a job (it may be pending)
- deadline := time.Now().Add(5 * time.Second)
- var j *jobState
- for time.Now().Before(deadline) {
- if jj, ok := globalQueue.findJob(id); ok {
- j = jj
- break
- }
- time.Sleep(200 * time.Millisecond)
- }
- if j == nil {
- http.Error(w, "task not found or not yet started", http.StatusNotFound)
+ src, ok := globalQueue.taskStreamSource(id)
+ if !ok {
+ http.Error(w, "task not found", http.StatusNotFound)
return
}
- streamJob(w, r, j)
+ if src.job != nil {
+ streamJob(w, r, src.job)
+ return
+ }
+ if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled {
+ j := newTaskJobState(src.logPath)
+ j.finish(src.errMsg)
+ streamJob(w, r, j)
+ return
+ }
+ if !sseStart(w) {
+ return
+ }
+ sseWrite(w, "", "Task is queued. Waiting for worker...")
+ ticker := time.NewTicker(200 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ src, ok = globalQueue.taskStreamSource(id)
+ if !ok {
+ sseWrite(w, "done", "task not found")
+ return
+ }
+ if src.job != nil {
+ streamSubscribedJob(w, r, src.job)
+ return
+ }
+ if src.status == TaskDone || src.status == TaskFailed || src.status == TaskCancelled {
+ j := newTaskJobState(src.logPath)
+ j.finish(src.errMsg)
+ streamSubscribedJob(w, r, j)
+ return
+ }
+ case <-r.Context().Done():
+ return
+ }
+ }
}
func (q *taskQueue) assignTaskLogPathLocked(t *Task) {
diff --git a/audit/internal/webui/tasks_test.go b/audit/internal/webui/tasks_test.go
index 8ba6788..27a32f6 100644
--- a/audit/internal/webui/tasks_test.go
+++ b/audit/internal/webui/tasks_test.go
@@ -2,6 +2,8 @@ package webui
import (
"context"
+ "net/http"
+ "net/http/httptest"
"os"
"os/exec"
"path/filepath"
@@ -122,6 +124,130 @@ func TestNewTaskJobStateLoadsExistingLog(t *testing.T) {
}
}
+func TestTaskQueueSnapshotSortsNewestFirst(t *testing.T) {
+ now := time.Date(2026, 4, 2, 12, 0, 0, 0, time.UTC)
+ q := &taskQueue{
+ tasks: []*Task{
+ {
+ ID: "old-running",
+ Name: "Old Running",
+ Status: TaskRunning,
+ Priority: 10,
+ CreatedAt: now.Add(-3 * time.Minute),
+ },
+ {
+ ID: "new-done",
+ Name: "New Done",
+ Status: TaskDone,
+ Priority: 0,
+ CreatedAt: now.Add(-1 * time.Minute),
+ },
+ {
+ ID: "mid-pending",
+ Name: "Mid Pending",
+ Status: TaskPending,
+ Priority: 1,
+ CreatedAt: now.Add(-2 * time.Minute),
+ },
+ },
+ }
+
+ got := q.snapshot()
+ if len(got) != 3 {
+ t.Fatalf("snapshot len=%d want 3", len(got))
+ }
+ if got[0].ID != "new-done" || got[1].ID != "mid-pending" || got[2].ID != "old-running" {
+ t.Fatalf("snapshot order=%q,%q,%q", got[0].ID, got[1].ID, got[2].ID)
+ }
+}
+
+func TestHandleAPITasksStreamReplaysPersistedLogWithoutLiveJob(t *testing.T) {
+ dir := t.TempDir()
+ logPath := filepath.Join(dir, "task.log")
+ if err := os.WriteFile(logPath, []byte("line1\nline2\n"), 0644); err != nil {
+ t.Fatal(err)
+ }
+
+ globalQueue.mu.Lock()
+ origTasks := globalQueue.tasks
+ globalQueue.tasks = []*Task{{
+ ID: "done-1",
+ Name: "Done Task",
+ Status: TaskDone,
+ CreatedAt: time.Now(),
+ LogPath: logPath,
+ }}
+ globalQueue.mu.Unlock()
+ t.Cleanup(func() {
+ globalQueue.mu.Lock()
+ globalQueue.tasks = origTasks
+ globalQueue.mu.Unlock()
+ })
+
+ req := httptest.NewRequest(http.MethodGet, "/api/tasks/done-1/stream", nil)
+ req.SetPathValue("id", "done-1")
+ rec := httptest.NewRecorder()
+
+ h := &handler{}
+ h.handleAPITasksStream(rec, req)
+
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ body := rec.Body.String()
+ if !strings.Contains(body, "data: line1\n\n") || !strings.Contains(body, "data: line2\n\n") {
+ t.Fatalf("body=%q", body)
+ }
+ if !strings.Contains(body, "event: done\n") {
+ t.Fatalf("missing done event: %q", body)
+ }
+}
+
+func TestHandleAPITasksStreamPendingTaskStartsSSEImmediately(t *testing.T) {
+ globalQueue.mu.Lock()
+ origTasks := globalQueue.tasks
+ globalQueue.tasks = []*Task{{
+ ID: "pending-1",
+ Name: "Pending Task",
+ Status: TaskPending,
+ CreatedAt: time.Now(),
+ }}
+ globalQueue.mu.Unlock()
+ t.Cleanup(func() {
+ globalQueue.mu.Lock()
+ globalQueue.tasks = origTasks
+ globalQueue.mu.Unlock()
+ })
+
+ ctx, cancel := context.WithCancel(context.Background())
+ req := httptest.NewRequest(http.MethodGet, "/api/tasks/pending-1/stream", nil).WithContext(ctx)
+ req.SetPathValue("id", "pending-1")
+ rec := httptest.NewRecorder()
+
+ done := make(chan struct{})
+ go func() {
+ h := &handler{}
+ h.handleAPITasksStream(rec, req)
+ close(done)
+ }()
+
+ deadline := time.Now().Add(2 * time.Second)
+ for time.Now().Before(deadline) {
+ if strings.Contains(rec.Body.String(), "Task is queued. Waiting for worker...") {
+ cancel()
+ <-done
+ if rec.Code != http.StatusOK {
+ t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String())
+ }
+ return
+ }
+ time.Sleep(20 * time.Millisecond)
+ }
+ cancel()
+ <-done
+ t.Fatalf("stream did not emit queued status promptly, body=%q", rec.Body.String())
+}
+
func TestResolveBurnPreset(t *testing.T) {
tests := []struct {
profile string