Files
logpile/internal/server/job_manager.go
T
Mikhail Chusavitin 9df13327aa feat(collect): remove power-on/off, add skip-hung for Redfish collection
Remove power-on and power-off functionality from the Redfish collector;
keep host power-state detection and show a warning in the UI when the
host is powered off before collection starts.

Add a "Пропустить зависшие" (skip hung) button that lets the user abort
stuck Redfish collection phases without losing already-collected data.
Introduces a two-level context model in Collect(): the outer job context
covers the full lifecycle including replay; an inner collectCtx covers
snapshot, prefetch, and plan-B phases only. Closing the skipCh cancels
collectCtx immediately — aborts all in-flight HTTP requests and exits
plan-B loops — then replay runs on whatever rawTree was collected.

Signal path: UI → POST /api/collect/{id}/skip → JobManager.SkipJob()
→ close(skipCh) → goroutine in Collect() → cancelCollect().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 13:12:38 +03:00

272 lines
5.7 KiB
Go

package server
import (
"context"
"fmt"
"sync"
"time"
)
type JobManager struct {
mu sync.RWMutex
jobs map[string]*Job
}
func NewJobManager() *JobManager {
return &JobManager{
jobs: make(map[string]*Job),
}
}
func (m *JobManager) CreateJob(req CollectRequest) *Job {
now := time.Now().UTC()
job := &Job{
ID: generateJobID(),
Status: CollectStatusQueued,
Progress: 0,
Logs: []string{formatCollectLogLine(now, "Задача поставлена в очередь")},
CreatedAt: now,
UpdatedAt: now,
RequestMeta: CollectRequestMeta{
Host: req.Host,
Protocol: req.Protocol,
Port: req.Port,
Username: req.Username,
AuthType: req.AuthType,
TLSMode: req.TLSMode,
},
}
m.mu.Lock()
m.jobs[job.ID] = job
m.mu.Unlock()
return cloneJob(job)
}
func (m *JobManager) GetJob(id string) (*Job, bool) {
m.mu.RLock()
job, ok := m.jobs[id]
m.mu.RUnlock()
if !ok || job == nil {
return nil, false
}
return cloneJob(job), true
}
func (m *JobManager) CancelJob(id string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if !isTerminalCollectStatus(job.Status) {
job.Status = CollectStatusCanceled
job.Error = ""
job.UpdatedAt = time.Now().UTC()
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Сбор отменен пользователем"))
}
cancelFn := job.cancel
job.cancel = nil
cloned := cloneJob(job)
m.mu.Unlock()
if cancelFn != nil {
cancelFn()
}
return cloned, true
}
func (m *JobManager) UpdateJobStatus(id, status string, progress int, errMsg string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if isTerminalCollectStatus(job.Status) {
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
job.Status = status
job.Progress = normalizeProgress(progress)
job.Error = errMsg
job.UpdatedAt = time.Now().UTC()
if isTerminalCollectStatus(status) {
job.cancel = nil
}
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
if message == "" {
return m.GetJob(id)
}
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.Logs = append(job.Logs, message)
job.UpdatedAt = time.Now().UTC()
job.Logs[len(job.Logs)-1] = formatCollectLogLine(job.UpdatedAt, message)
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobModules(id string, activeModules, moduleScores []CollectModuleStatus) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.ActiveModules = append([]CollectModuleStatus(nil), activeModules...)
job.ModuleScores = append([]CollectModuleStatus(nil), moduleScores...)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobETA(id, phase string, etaSeconds int) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.CurrentPhase = phase
job.ETASeconds = etaSeconds
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobDebugInfo(id string, info *CollectDebugInfo) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.DebugInfo = cloneCollectDebugInfo(info)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AttachJobSkip(id string, skipFn func()) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
return false
}
job.skipFn = skipFn
return true
}
func (m *JobManager) SkipJob(id string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if isTerminalCollectStatus(job.Status) {
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
skipFn := job.skipFn
job.skipFn = nil
job.UpdatedAt = time.Now().UTC()
job.Logs = append(job.Logs, formatCollectLogLine(job.UpdatedAt, "Пропуск зависших запросов по команде пользователя"))
cloned := cloneJob(job)
m.mu.Unlock()
if skipFn != nil {
skipFn()
}
return cloned, true
}
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
return false
}
job.cancel = cancelFn
return true
}
func isTerminalCollectStatus(status string) bool {
switch status {
case CollectStatusSuccess, CollectStatusFailed, CollectStatusCanceled:
return true
default:
return false
}
}
func normalizeProgress(progress int) int {
if progress < 0 {
return 0
}
if progress > 100 {
return 100
}
return progress
}
func formatCollectLogLine(ts time.Time, message string) string {
msg := message
if msg == "" {
msg = "-"
}
if ts.IsZero() {
ts = time.Now().UTC()
}
return fmt.Sprintf("%s %s", ts.Format(time.RFC3339Nano), msg)
}
func cloneJob(job *Job) *Job {
if job == nil {
return nil
}
cloned := *job
cloned.Logs = append([]string(nil), job.Logs...)
cloned.ActiveModules = append([]CollectModuleStatus(nil), job.ActiveModules...)
cloned.ModuleScores = append([]CollectModuleStatus(nil), job.ModuleScores...)
cloned.DebugInfo = cloneCollectDebugInfo(job.DebugInfo)
cloned.CurrentPhase = job.CurrentPhase
cloned.ETASeconds = job.ETASeconds
cloned.cancel = nil
cloned.skipFn = nil
return &cloned
}