package webui import ( "os" "strings" "sync" "time" ) // jobState holds the output lines and completion status of an async job. type jobState struct { lines []string done bool err string mu sync.Mutex subs []chan string cancel func() // optional cancel function; nil if job is not cancellable logPath string } // abort cancels the job if it has a cancel function and is not yet done. func (j *jobState) abort() bool { j.mu.Lock() defer j.mu.Unlock() if j.done || j.cancel == nil { return false } j.cancel() return true } func (j *jobState) append(line string) { j.mu.Lock() defer j.mu.Unlock() j.lines = append(j.lines, line) if j.logPath != "" { appendJobLog(j.logPath, line) } for _, ch := range j.subs { select { case ch <- line: default: } } } func (j *jobState) finish(errMsg string) { j.mu.Lock() defer j.mu.Unlock() j.done = true j.err = errMsg for _, ch := range j.subs { close(ch) } j.subs = nil } // subscribe returns a channel that receives all future lines. // Existing lines are returned first, then the channel streams new ones. func (j *jobState) subscribe() ([]string, <-chan string) { j.mu.Lock() defer j.mu.Unlock() existing := make([]string, len(j.lines)) copy(existing, j.lines) if j.done { return existing, nil } ch := make(chan string, 256) j.subs = append(j.subs, ch) return existing, ch } // jobManager manages async jobs identified by string IDs. type jobManager struct { mu sync.Mutex jobs map[string]*jobState } var globalJobs = &jobManager{jobs: make(map[string]*jobState)} func (m *jobManager) create(id string) *jobState { m.mu.Lock() defer m.mu.Unlock() j := &jobState{} m.jobs[id] = j // Schedule cleanup after 30 minutes go func() { time.Sleep(30 * time.Minute) m.mu.Lock() delete(m.jobs, id) m.mu.Unlock() }() return j } // isDone returns true if the job has finished (either successfully or with error). func (j *jobState) isDone() bool { j.mu.Lock() defer j.mu.Unlock() return j.done } func (m *jobManager) get(id string) (*jobState, bool) { m.mu.Lock() defer m.mu.Unlock() j, ok := m.jobs[id] return j, ok } func newTaskJobState(logPath string) *jobState { j := &jobState{logPath: logPath} if logPath == "" { return j } data, err := os.ReadFile(logPath) if err != nil || len(data) == 0 { return j } lines := strings.Split(strings.ReplaceAll(string(data), "\r\n", "\n"), "\n") if len(lines) > 0 && lines[len(lines)-1] == "" { lines = lines[:len(lines)-1] } j.lines = append(j.lines, lines...) return j } func appendJobLog(path, line string) { if path == "" { return } f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { return } defer f.Close() _, _ = f.WriteString(line + "\n") }