169 lines
3.1 KiB
Go
169 lines
3.1 KiB
Go
package server
|
||
|
||
import (
|
||
"context"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type JobManager struct {
|
||
mu sync.RWMutex
|
||
jobs map[string]*Job
|
||
}
|
||
|
||
func NewJobManager() *JobManager {
|
||
return &JobManager{
|
||
jobs: make(map[string]*Job),
|
||
}
|
||
}
|
||
|
||
func (m *JobManager) CreateJob(req CollectRequest) *Job {
|
||
now := time.Now().UTC()
|
||
job := &Job{
|
||
ID: generateJobID(),
|
||
Status: CollectStatusQueued,
|
||
Progress: 0,
|
||
Logs: []string{"Задача поставлена в очередь"},
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
RequestMeta: CollectRequestMeta{
|
||
Host: req.Host,
|
||
Protocol: req.Protocol,
|
||
Port: req.Port,
|
||
Username: req.Username,
|
||
AuthType: req.AuthType,
|
||
TLSMode: req.TLSMode,
|
||
},
|
||
}
|
||
|
||
m.mu.Lock()
|
||
m.jobs[job.ID] = job
|
||
m.mu.Unlock()
|
||
|
||
return cloneJob(job)
|
||
}
|
||
|
||
func (m *JobManager) GetJob(id string) (*Job, bool) {
|
||
m.mu.RLock()
|
||
job, ok := m.jobs[id]
|
||
m.mu.RUnlock()
|
||
if !ok || job == nil {
|
||
return nil, false
|
||
}
|
||
return cloneJob(job), true
|
||
}
|
||
|
||
func (m *JobManager) CancelJob(id string) (*Job, bool) {
|
||
m.mu.Lock()
|
||
job, ok := m.jobs[id]
|
||
if !ok || job == nil {
|
||
m.mu.Unlock()
|
||
return nil, false
|
||
}
|
||
|
||
if !isTerminalCollectStatus(job.Status) {
|
||
job.Status = CollectStatusCanceled
|
||
job.Error = ""
|
||
job.UpdatedAt = time.Now().UTC()
|
||
job.Logs = append(job.Logs, "Сбор отменен пользователем")
|
||
}
|
||
|
||
cancelFn := job.cancel
|
||
job.cancel = nil
|
||
cloned := cloneJob(job)
|
||
m.mu.Unlock()
|
||
|
||
if cancelFn != nil {
|
||
cancelFn()
|
||
}
|
||
|
||
return cloned, true
|
||
}
|
||
|
||
func (m *JobManager) UpdateJobStatus(id, status string, progress int, errMsg string) (*Job, bool) {
|
||
m.mu.Lock()
|
||
job, ok := m.jobs[id]
|
||
if !ok || job == nil {
|
||
m.mu.Unlock()
|
||
return nil, false
|
||
}
|
||
if isTerminalCollectStatus(job.Status) {
|
||
cloned := cloneJob(job)
|
||
m.mu.Unlock()
|
||
return cloned, true
|
||
}
|
||
|
||
job.Status = status
|
||
job.Progress = normalizeProgress(progress)
|
||
job.Error = errMsg
|
||
job.UpdatedAt = time.Now().UTC()
|
||
if isTerminalCollectStatus(status) {
|
||
job.cancel = nil
|
||
}
|
||
|
||
cloned := cloneJob(job)
|
||
m.mu.Unlock()
|
||
return cloned, true
|
||
}
|
||
|
||
func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
|
||
if message == "" {
|
||
return m.GetJob(id)
|
||
}
|
||
|
||
m.mu.Lock()
|
||
job, ok := m.jobs[id]
|
||
if !ok || job == nil {
|
||
m.mu.Unlock()
|
||
return nil, false
|
||
}
|
||
|
||
job.Logs = append(job.Logs, message)
|
||
job.UpdatedAt = time.Now().UTC()
|
||
|
||
cloned := cloneJob(job)
|
||
m.mu.Unlock()
|
||
return cloned, true
|
||
}
|
||
|
||
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
|
||
m.mu.Lock()
|
||
defer m.mu.Unlock()
|
||
|
||
job, ok := m.jobs[id]
|
||
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
|
||
return false
|
||
}
|
||
job.cancel = cancelFn
|
||
return true
|
||
}
|
||
|
||
func isTerminalCollectStatus(status string) bool {
|
||
switch status {
|
||
case CollectStatusSuccess, CollectStatusFailed, CollectStatusCanceled:
|
||
return true
|
||
default:
|
||
return false
|
||
}
|
||
}
|
||
|
||
func normalizeProgress(progress int) int {
|
||
if progress < 0 {
|
||
return 0
|
||
}
|
||
if progress > 100 {
|
||
return 100
|
||
}
|
||
return progress
|
||
}
|
||
|
||
func cloneJob(job *Job) *Job {
|
||
if job == nil {
|
||
return nil
|
||
}
|
||
cloned := *job
|
||
cloned.Logs = append([]string(nil), job.Logs...)
|
||
cloned.cancel = nil
|
||
return &cloned
|
||
}
|