Files
logpile/internal/server/job_manager.go
Mikhail Chusavitin d650a6ba1c refactor: unified ingest pipeline + modular Redfish profile framework
Implement the full architectural plan: unified ingest.Service entry point
for archive and Redfish payloads, modular redfishprofile package with
composable profiles (generic, ami-family, msi, supermicro, dell,
hgx-topology), score-based profile matching with fallback expansion mode,
and profile-driven acquisition/analysis plans.

Vendor-specific logic moved out of common executors and into profile hooks.
GPU chassis lookup strategies and known storage recovery collections
(IntelVROC/HA-RAID/MRVL) now live in ResolvedAnalysisPlan, populated by
profiles at analysis time. Replay helpers read from the plan; no hardcoded
path lists remain in generic code.

Also splits redfish_replay.go into domain modules (gpu, storage, inventory,
fru, profiles) and adds full fixture/matcher/directive test coverage
including Dell, AMI, unknown-vendor fallback, and deterministic ordering.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 08:48:58 +03:00

234 lines
4.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package server
import (
"context"
"fmt"
"sync"
"time"
)
type JobManager struct {
mu sync.RWMutex
jobs map[string]*Job
}
func NewJobManager() *JobManager {
return &JobManager{
jobs: make(map[string]*Job),
}
}
func (m *JobManager) CreateJob(req CollectRequest) *Job {
now := time.Now().UTC()
job := &Job{
ID: generateJobID(),
Status: CollectStatusQueued,
Progress: 0,
Logs: []string{formatCollectLogLine(now, "Задача поставлена в очередь")},
CreatedAt: now,
UpdatedAt: now,
RequestMeta: CollectRequestMeta{
Host: req.Host,
Protocol: req.Protocol,
Port: req.Port,
Username: req.Username,
AuthType: req.AuthType,
TLSMode: req.TLSMode,
},
}
m.mu.Lock()
m.jobs[job.ID] = job
m.mu.Unlock()
return cloneJob(job)
}
func (m *JobManager) GetJob(id string) (*Job, bool) {
m.mu.RLock()
job, ok := m.jobs[id]
m.mu.RUnlock()
if !ok || job == nil {
return nil, false
}
return cloneJob(job), true
}
func (m *JobManager) CancelJob(id string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if !isTerminalCollectStatus(job.Status) {
job.Status = CollectStatusCanceled
job.Error = ""
job.UpdatedAt = time.Now().UTC()
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Сбор отменен пользователем"))
}
cancelFn := job.cancel
job.cancel = nil
cloned := cloneJob(job)
m.mu.Unlock()
if cancelFn != nil {
cancelFn()
}
return cloned, true
}
func (m *JobManager) UpdateJobStatus(id, status string, progress int, errMsg string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if isTerminalCollectStatus(job.Status) {
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
job.Status = status
job.Progress = normalizeProgress(progress)
job.Error = errMsg
job.UpdatedAt = time.Now().UTC()
if isTerminalCollectStatus(status) {
job.cancel = nil
}
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
if message == "" {
return m.GetJob(id)
}
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.Logs = append(job.Logs, message)
job.UpdatedAt = time.Now().UTC()
job.Logs[len(job.Logs)-1] = formatCollectLogLine(job.UpdatedAt, message)
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobModules(id string, activeModules, moduleScores []CollectModuleStatus) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.ActiveModules = append([]CollectModuleStatus(nil), activeModules...)
job.ModuleScores = append([]CollectModuleStatus(nil), moduleScores...)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobETA(id, phase string, etaSeconds int) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.CurrentPhase = phase
job.ETASeconds = etaSeconds
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobDebugInfo(id string, info *CollectDebugInfo) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.DebugInfo = cloneCollectDebugInfo(info)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
return false
}
job.cancel = cancelFn
return true
}
func isTerminalCollectStatus(status string) bool {
switch status {
case CollectStatusSuccess, CollectStatusFailed, CollectStatusCanceled:
return true
default:
return false
}
}
func normalizeProgress(progress int) int {
if progress < 0 {
return 0
}
if progress > 100 {
return 100
}
return progress
}
func formatCollectLogLine(ts time.Time, message string) string {
msg := message
if msg == "" {
msg = "-"
}
if ts.IsZero() {
ts = time.Now().UTC()
}
return fmt.Sprintf("%s %s", ts.Format(time.RFC3339Nano), msg)
}
func cloneJob(job *Job) *Job {
if job == nil {
return nil
}
cloned := *job
cloned.Logs = append([]string(nil), job.Logs...)
cloned.ActiveModules = append([]CollectModuleStatus(nil), job.ActiveModules...)
cloned.ModuleScores = append([]CollectModuleStatus(nil), job.ModuleScores...)
cloned.DebugInfo = cloneCollectDebugInfo(job.DebugInfo)
cloned.CurrentPhase = job.CurrentPhase
cloned.ETASeconds = job.ETASeconds
cloned.cancel = nil
return &cloned
}