Files
logpile/internal/server/job_manager.go
Michael Chus 57de3ba6eb chore: align codebase with bible engineering contracts
- identifier-normalization: use strings.EqualFold in h3c/parser.go
- import-export: CSV now uses UTF-8 BOM and semicolon delimiter
- go-code-style: translate all Russian source strings to English (ADL-007)
- go-background-tasks: add Type, Message, Result fields to Job struct
- go-api: wrap list endpoints in {items, total_count, page, per_page, total_pages}
- module-structure: rename helpers.go → context_sleep.go
- build-version-display: htmlError renders version footer on error pages
- go-logging: migrate all log.Printf calls to log/slog with structured attrs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 14:35:39 +03:00

291 lines
5.9 KiB
Go

package server
import (
"context"
"fmt"
"maps"
"sync"
"time"
)
type JobManager struct {
mu sync.RWMutex
jobs map[string]*Job
}
func NewJobManager() *JobManager {
return &JobManager{
jobs: make(map[string]*Job),
}
}
func (m *JobManager) CreateJob(req CollectRequest) *Job {
now := time.Now().UTC()
job := &Job{
ID: generateJobID(),
Type: req.Protocol,
Status: CollectStatusQueued,
Progress: 0,
Message: "Job queued",
Logs: []string{formatCollectLogLine(now, "Job queued")},
CreatedAt: now,
UpdatedAt: now,
RequestMeta: CollectRequestMeta{
Host: req.Host,
Protocol: req.Protocol,
Port: req.Port,
Username: req.Username,
AuthType: req.AuthType,
TLSMode: req.TLSMode,
},
}
m.mu.Lock()
m.jobs[job.ID] = job
m.mu.Unlock()
return cloneJob(job)
}
func (m *JobManager) GetJob(id string) (*Job, bool) {
m.mu.RLock()
job, ok := m.jobs[id]
m.mu.RUnlock()
if !ok || job == nil {
return nil, false
}
return cloneJob(job), true
}
func (m *JobManager) CancelJob(id string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if !isTerminalCollectStatus(job.Status) {
job.Status = CollectStatusCanceled
job.Error = ""
job.UpdatedAt = time.Now().UTC()
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Collection canceled by user"))
}
cancelFn := job.cancel
job.cancel = nil
cloned := cloneJob(job)
m.mu.Unlock()
if cancelFn != nil {
cancelFn()
}
return cloned, true
}
func (m *JobManager) UpdateJobStatus(id, status string, progress int, errMsg string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if isTerminalCollectStatus(job.Status) {
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
job.Status = status
job.Progress = normalizeProgress(progress)
job.Error = errMsg
job.UpdatedAt = time.Now().UTC()
if isTerminalCollectStatus(status) {
job.cancel = nil
}
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
if message == "" {
return m.GetJob(id)
}
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.Logs = append(job.Logs, message)
job.UpdatedAt = time.Now().UTC()
job.Logs[len(job.Logs)-1] = formatCollectLogLine(job.UpdatedAt, message)
job.Message = message
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobModules(id string, activeModules, moduleScores []CollectModuleStatus) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.ActiveModules = append([]CollectModuleStatus(nil), activeModules...)
job.ModuleScores = append([]CollectModuleStatus(nil), moduleScores...)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobETA(id, phase string, etaSeconds int) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.CurrentPhase = phase
job.ETASeconds = etaSeconds
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobDebugInfo(id string, info *CollectDebugInfo) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.DebugInfo = cloneCollectDebugInfo(info)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AttachJobSkip(id string, skipFn func()) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
return false
}
job.skipFn = skipFn
return true
}
func (m *JobManager) SkipJob(id string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if isTerminalCollectStatus(job.Status) {
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
skipFn := job.skipFn
job.skipFn = nil
job.UpdatedAt = time.Now().UTC()
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Skipping stalled requests on user request"))
cloned := cloneJob(job)
m.mu.Unlock()
if skipFn != nil {
skipFn()
}
return cloned, true
}
func (m *JobManager) SetJobResult(id string, result map[string]interface{}) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil {
return false
}
job.Result = result
job.UpdatedAt = time.Now().UTC()
return true
}
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
return false
}
job.cancel = cancelFn
return true
}
func isTerminalCollectStatus(status string) bool {
switch status {
case CollectStatusSuccess, CollectStatusFailed, CollectStatusCanceled:
return true
default:
return false
}
}
func normalizeProgress(progress int) int {
if progress < 0 {
return 0
}
if progress > 100 {
return 100
}
return progress
}
func formatCollectLogLine(ts time.Time, message string) string {
msg := message
if msg == "" {
msg = "-"
}
if ts.IsZero() {
ts = time.Now().UTC()
}
return fmt.Sprintf("%s %s", ts.Format(time.RFC3339Nano), msg)
}
func cloneJob(job *Job) *Job {
if job == nil {
return nil
}
cloned := *job
cloned.Logs = append([]string(nil), job.Logs...)
cloned.ActiveModules = append([]CollectModuleStatus(nil), job.ActiveModules...)
cloned.ModuleScores = append([]CollectModuleStatus(nil), job.ModuleScores...)
cloned.DebugInfo = cloneCollectDebugInfo(job.DebugInfo)
cloned.CurrentPhase = job.CurrentPhase
cloned.ETASeconds = job.ETASeconds
if job.Result != nil {
cloned.Result = maps.Clone(job.Result)
}
cloned.cancel = nil
cloned.skipFn = nil
return &cloned
}