- identifier-normalization: use strings.EqualFold in h3c/parser.go
- import-export: CSV now uses UTF-8 BOM and semicolon delimiter
- go-code-style: translate all Russian source strings to English (ADL-007)
- go-background-tasks: add Type, Message, Result fields to Job struct
- go-api: wrap list endpoints in {items, total_count, page, per_page, total_pages}
- module-structure: rename helpers.go → context_sleep.go
- build-version-display: htmlError renders version footer on error pages
- go-logging: migrate all log.Printf calls to log/slog with structured attrs
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
291 lines
5.9 KiB
Go
291 lines
5.9 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"maps"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type JobManager struct {
|
|
mu sync.RWMutex
|
|
jobs map[string]*Job
|
|
}
|
|
|
|
func NewJobManager() *JobManager {
|
|
return &JobManager{
|
|
jobs: make(map[string]*Job),
|
|
}
|
|
}
|
|
|
|
func (m *JobManager) CreateJob(req CollectRequest) *Job {
|
|
now := time.Now().UTC()
|
|
job := &Job{
|
|
ID: generateJobID(),
|
|
Type: req.Protocol,
|
|
Status: CollectStatusQueued,
|
|
Progress: 0,
|
|
Message: "Job queued",
|
|
Logs: []string{formatCollectLogLine(now, "Job queued")},
|
|
CreatedAt: now,
|
|
UpdatedAt: now,
|
|
RequestMeta: CollectRequestMeta{
|
|
Host: req.Host,
|
|
Protocol: req.Protocol,
|
|
Port: req.Port,
|
|
Username: req.Username,
|
|
AuthType: req.AuthType,
|
|
TLSMode: req.TLSMode,
|
|
},
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.jobs[job.ID] = job
|
|
m.mu.Unlock()
|
|
|
|
return cloneJob(job)
|
|
}
|
|
|
|
func (m *JobManager) GetJob(id string) (*Job, bool) {
|
|
m.mu.RLock()
|
|
job, ok := m.jobs[id]
|
|
m.mu.RUnlock()
|
|
if !ok || job == nil {
|
|
return nil, false
|
|
}
|
|
return cloneJob(job), true
|
|
}
|
|
|
|
func (m *JobManager) CancelJob(id string) (*Job, bool) {
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
|
|
if !isTerminalCollectStatus(job.Status) {
|
|
job.Status = CollectStatusCanceled
|
|
job.Error = ""
|
|
job.UpdatedAt = time.Now().UTC()
|
|
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Collection canceled by user"))
|
|
}
|
|
|
|
cancelFn := job.cancel
|
|
job.cancel = nil
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
|
|
if cancelFn != nil {
|
|
cancelFn()
|
|
}
|
|
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) UpdateJobStatus(id, status string, progress int, errMsg string) (*Job, bool) {
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
if isTerminalCollectStatus(job.Status) {
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
|
|
job.Status = status
|
|
job.Progress = normalizeProgress(progress)
|
|
job.Error = errMsg
|
|
job.UpdatedAt = time.Now().UTC()
|
|
if isTerminalCollectStatus(status) {
|
|
job.cancel = nil
|
|
}
|
|
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
|
|
if message == "" {
|
|
return m.GetJob(id)
|
|
}
|
|
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
|
|
job.Logs = append(job.Logs, message)
|
|
job.UpdatedAt = time.Now().UTC()
|
|
job.Logs[len(job.Logs)-1] = formatCollectLogLine(job.UpdatedAt, message)
|
|
job.Message = message
|
|
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) UpdateJobModules(id string, activeModules, moduleScores []CollectModuleStatus) (*Job, bool) {
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
job.ActiveModules = append([]CollectModuleStatus(nil), activeModules...)
|
|
job.ModuleScores = append([]CollectModuleStatus(nil), moduleScores...)
|
|
job.UpdatedAt = time.Now().UTC()
|
|
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) UpdateJobETA(id, phase string, etaSeconds int) (*Job, bool) {
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
job.CurrentPhase = phase
|
|
job.ETASeconds = etaSeconds
|
|
job.UpdatedAt = time.Now().UTC()
|
|
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) UpdateJobDebugInfo(id string, info *CollectDebugInfo) (*Job, bool) {
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
job.DebugInfo = cloneCollectDebugInfo(info)
|
|
job.UpdatedAt = time.Now().UTC()
|
|
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) AttachJobSkip(id string, skipFn func()) bool {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
|
|
return false
|
|
}
|
|
job.skipFn = skipFn
|
|
return true
|
|
}
|
|
|
|
func (m *JobManager) SkipJob(id string) (*Job, bool) {
|
|
m.mu.Lock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
m.mu.Unlock()
|
|
return nil, false
|
|
}
|
|
if isTerminalCollectStatus(job.Status) {
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
return cloned, true
|
|
}
|
|
skipFn := job.skipFn
|
|
job.skipFn = nil
|
|
job.UpdatedAt = time.Now().UTC()
|
|
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Skipping stalled requests on user request"))
|
|
cloned := cloneJob(job)
|
|
m.mu.Unlock()
|
|
|
|
if skipFn != nil {
|
|
skipFn()
|
|
}
|
|
return cloned, true
|
|
}
|
|
|
|
func (m *JobManager) SetJobResult(id string, result map[string]interface{}) bool {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil {
|
|
return false
|
|
}
|
|
job.Result = result
|
|
job.UpdatedAt = time.Now().UTC()
|
|
return true
|
|
}
|
|
|
|
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
job, ok := m.jobs[id]
|
|
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
|
|
return false
|
|
}
|
|
job.cancel = cancelFn
|
|
return true
|
|
}
|
|
|
|
func isTerminalCollectStatus(status string) bool {
|
|
switch status {
|
|
case CollectStatusSuccess, CollectStatusFailed, CollectStatusCanceled:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func normalizeProgress(progress int) int {
|
|
if progress < 0 {
|
|
return 0
|
|
}
|
|
if progress > 100 {
|
|
return 100
|
|
}
|
|
return progress
|
|
}
|
|
|
|
func formatCollectLogLine(ts time.Time, message string) string {
|
|
msg := message
|
|
if msg == "" {
|
|
msg = "-"
|
|
}
|
|
if ts.IsZero() {
|
|
ts = time.Now().UTC()
|
|
}
|
|
return fmt.Sprintf("%s %s", ts.Format(time.RFC3339Nano), msg)
|
|
}
|
|
|
|
func cloneJob(job *Job) *Job {
|
|
if job == nil {
|
|
return nil
|
|
}
|
|
cloned := *job
|
|
cloned.Logs = append([]string(nil), job.Logs...)
|
|
cloned.ActiveModules = append([]CollectModuleStatus(nil), job.ActiveModules...)
|
|
cloned.ModuleScores = append([]CollectModuleStatus(nil), job.ModuleScores...)
|
|
cloned.DebugInfo = cloneCollectDebugInfo(job.DebugInfo)
|
|
cloned.CurrentPhase = job.CurrentPhase
|
|
cloned.ETASeconds = job.ETASeconds
|
|
if job.Result != nil {
|
|
cloned.Result = maps.Clone(job.Result)
|
|
}
|
|
cloned.cancel = nil
|
|
cloned.skipFn = nil
|
|
return &cloned
|
|
}
|