refactor: unified ingest pipeline + modular Redfish profile framework

Implement the full architectural plan: unified ingest.Service entry point
for archive and Redfish payloads, modular redfishprofile package with
composable profiles (generic, ami-family, msi, supermicro, dell,
hgx-topology), score-based profile matching with fallback expansion mode,
and profile-driven acquisition/analysis plans.

Vendor-specific logic moved out of common executors and into profile hooks.
GPU chassis lookup strategies and known storage recovery collections
(IntelVROC/HA-RAID/MRVL) now live in ResolvedAnalysisPlan, populated by
profiles at analysis time. Replay helpers read from the plan; no hardcoded
path lists remain in generic code.

Also splits redfish_replay.go into domain modules (gpu, storage, inventory,
fru, profiles) and adds full fixture/matcher/directive test coverage
including Dell, AMI, unknown-vendor fallback, and deterministic ordering.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Mikhail Chusavitin
2026-03-18 08:48:58 +03:00
parent d8d3d8c524
commit d650a6ba1c
45 changed files with 5231 additions and 1011 deletions

View File

@@ -91,6 +91,21 @@ func TestCollectLifecycleToTerminal(t *testing.T) {
if len(status.Logs) < 4 {
t.Fatalf("expected detailed logs, got %v", status.Logs)
}
if len(status.ActiveModules) == 0 {
t.Fatal("expected active modules in collect status")
}
if status.ActiveModules[0].Name == "" {
t.Fatal("expected active module name")
}
if len(status.ModuleScores) == 0 {
t.Fatal("expected module scores in collect status")
}
if status.DebugInfo == nil {
t.Fatal("expected debug info in collect status")
}
if len(status.DebugInfo.PhaseTelemetry) == 0 {
t.Fatal("expected phase telemetry in collect debug info")
}
}
func TestCollectCancel(t *testing.T) {

View File

@@ -33,6 +33,28 @@ func (c *mockConnector) Probe(ctx context.Context, req collector.Request) (*coll
func (c *mockConnector) Collect(ctx context.Context, req collector.Request, emit collector.ProgressFn) (*models.AnalysisResult, error) {
steps := []collector.Progress{
{
Status: CollectStatusRunning,
Progress: 10,
Message: "Подбор модулей Redfish...",
ActiveModules: []collector.ModuleActivation{
{Name: "supermicro", Score: 80},
{Name: "generic", Score: 10},
},
ModuleScores: []collector.ModuleScore{
{Name: "supermicro", Score: 80, Active: true, Priority: 20},
{Name: "generic", Score: 10, Active: true, Priority: 100},
{Name: "hgx-topology", Score: 0, Active: false, Priority: 30},
},
DebugInfo: &collector.CollectDebugInfo{
AdaptiveThrottled: false,
SnapshotWorkers: 6,
PrefetchWorkers: 4,
PhaseTelemetry: []collector.PhaseTelemetry{
{Phase: "discovery", Requests: 6, Errors: 0, ErrorRate: 0, AvgMS: 120, P95MS: 180},
},
},
},
{Status: CollectStatusRunning, Progress: 20, Message: "Подключение..."},
{Status: CollectStatusRunning, Progress: 50, Message: "Сбор инвентаря..."},
{Status: CollectStatusRunning, Progress: 80, Message: "Нормализация..."},

View File

@@ -39,13 +39,18 @@ type CollectJobResponse struct {
}
type CollectJobStatusResponse struct {
JobID string `json:"job_id"`
Status string `json:"status"`
Progress *int `json:"progress,omitempty"`
Logs []string `json:"logs,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
JobID string `json:"job_id"`
Status string `json:"status"`
Progress *int `json:"progress,omitempty"`
CurrentPhase string `json:"current_phase,omitempty"`
ETASeconds *int `json:"eta_seconds,omitempty"`
Logs []string `json:"logs,omitempty"`
Error string `json:"error,omitempty"`
ActiveModules []CollectModuleStatus `json:"active_modules,omitempty"`
ModuleScores []CollectModuleStatus `json:"module_scores,omitempty"`
DebugInfo *CollectDebugInfo `json:"debug_info,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
type CollectRequestMeta struct {
@@ -58,27 +63,64 @@ type CollectRequestMeta struct {
}
type Job struct {
ID string
Status string
Progress int
Logs []string
Error string
CreatedAt time.Time
UpdatedAt time.Time
RequestMeta CollectRequestMeta
cancel func()
ID string
Status string
Progress int
CurrentPhase string
ETASeconds int
Logs []string
Error string
ActiveModules []CollectModuleStatus
ModuleScores []CollectModuleStatus
DebugInfo *CollectDebugInfo
CreatedAt time.Time
UpdatedAt time.Time
RequestMeta CollectRequestMeta
cancel func()
}
type CollectModuleStatus struct {
Name string `json:"name"`
Score int `json:"score"`
Active bool `json:"active,omitempty"`
Priority int `json:"priority,omitempty"`
}
type CollectDebugInfo struct {
AdaptiveThrottled bool `json:"adaptive_throttled"`
SnapshotWorkers int `json:"snapshot_workers,omitempty"`
PrefetchWorkers int `json:"prefetch_workers,omitempty"`
PrefetchEnabled *bool `json:"prefetch_enabled,omitempty"`
PhaseTelemetry []CollectPhaseTelemetry `json:"phase_telemetry,omitempty"`
}
type CollectPhaseTelemetry struct {
Phase string `json:"phase"`
Requests int `json:"requests,omitempty"`
Errors int `json:"errors,omitempty"`
ErrorRate float64 `json:"error_rate,omitempty"`
AvgMS int64 `json:"avg_ms,omitempty"`
P95MS int64 `json:"p95_ms,omitempty"`
}
func (j *Job) toStatusResponse() CollectJobStatusResponse {
progress := j.Progress
resp := CollectJobStatusResponse{
JobID: j.ID,
Status: j.Status,
Progress: &progress,
Logs: append([]string(nil), j.Logs...),
Error: j.Error,
CreatedAt: j.CreatedAt,
UpdatedAt: j.UpdatedAt,
JobID: j.ID,
Status: j.Status,
Progress: &progress,
CurrentPhase: j.CurrentPhase,
Logs: append([]string(nil), j.Logs...),
Error: j.Error,
ActiveModules: append([]CollectModuleStatus(nil), j.ActiveModules...),
ModuleScores: append([]CollectModuleStatus(nil), j.ModuleScores...),
DebugInfo: cloneCollectDebugInfo(j.DebugInfo),
CreatedAt: j.CreatedAt,
UpdatedAt: j.UpdatedAt,
}
if j.ETASeconds > 0 {
eta := j.ETASeconds
resp.ETASeconds = &eta
}
return resp
}
@@ -91,3 +133,16 @@ func (j *Job) toJobResponse(message string) CollectJobResponse {
CreatedAt: j.CreatedAt,
}
}
func cloneCollectDebugInfo(in *CollectDebugInfo) *CollectDebugInfo {
if in == nil {
return nil
}
out := *in
out.PhaseTelemetry = append([]CollectPhaseTelemetry(nil), in.PhaseTelemetry...)
if in.PrefetchEnabled != nil {
value := *in.PrefetchEnabled
out.PrefetchEnabled = &value
}
return &out
}

View File

@@ -21,6 +21,7 @@ import (
"git.mchus.pro/mchus/logpile/internal/collector"
"git.mchus.pro/mchus/logpile/internal/exporter"
"git.mchus.pro/mchus/logpile/internal/ingest"
"git.mchus.pro/mchus/logpile/internal/models"
"git.mchus.pro/mchus/logpile/internal/parser"
chartviewer "reanimator/chart/viewer"
@@ -219,13 +220,12 @@ func (s *Server) analyzeUploadedFile(filename, mimeType string, payload []byte)
return nil, "", nil, fmt.Errorf("unsupported archive format: %s", strings.ToLower(filepath.Ext(filename)))
}
p := parser.NewBMCParser()
if err := p.ParseFromReader(bytes.NewReader(payload), filename); err != nil {
result, vendor, err := s.ingestService().AnalyzeArchivePayload(filename, payload)
if err != nil {
return nil, "", nil, err
}
result := p.Result()
applyArchiveSourceMetadata(result)
return result, p.DetectedVendor(), newRawExportFromUploadedFile(filename, mimeType, payload, result), nil
return result, vendor, newRawExportFromUploadedFile(filename, mimeType, payload, result), nil
}
func uploadMultipartMaxBytes() int64 {
@@ -297,33 +297,18 @@ func (s *Server) reanalyzeRawExportPackage(pkg *RawExportPackage) (*models.Analy
if !strings.EqualFold(strings.TrimSpace(pkg.Source.Protocol), "redfish") {
return nil, "", fmt.Errorf("unsupported live protocol: %s", pkg.Source.Protocol)
}
result, err := collector.ReplayRedfishFromRawPayloads(pkg.Source.RawPayloads, nil)
result, vendor, err := s.ingestService().AnalyzeRedfishRawPayloads(pkg.Source.RawPayloads, ingest.RedfishSourceMetadata{
TargetHost: pkg.Source.TargetHost,
SourceTimezone: pkg.Source.SourceTimezone,
Filename: pkg.Source.Filename,
})
if err != nil {
return nil, "", err
}
if result != nil {
if strings.TrimSpace(result.Protocol) == "" {
result.Protocol = "redfish"
}
if strings.TrimSpace(result.SourceType) == "" {
result.SourceType = models.SourceTypeAPI
}
if strings.TrimSpace(result.TargetHost) == "" {
result.TargetHost = strings.TrimSpace(pkg.Source.TargetHost)
}
if strings.TrimSpace(result.SourceTimezone) == "" {
result.SourceTimezone = strings.TrimSpace(pkg.Source.SourceTimezone)
}
result.CollectedAt = inferRawExportCollectedAt(result, pkg)
if strings.TrimSpace(result.Filename) == "" {
target := result.TargetHost
if target == "" {
target = "snapshot"
}
result.Filename = "redfish://" + target
}
}
return result, "redfish", nil
return result, vendor, nil
default:
return nil, "", fmt.Errorf("unsupported raw export source kind: %s", pkg.Source.Kind)
}
@@ -342,13 +327,12 @@ func (s *Server) parseUploadedPayload(filename string, payload []byte) (*models.
return snapshotResult, vendor, nil
}
p := parser.NewBMCParser()
if err := p.ParseFromReader(bytes.NewReader(payload), filename); err != nil {
result, vendor, err := s.ingestService().AnalyzeArchivePayload(filename, payload)
if err != nil {
return nil, "", err
}
result := p.Result()
applyArchiveSourceMetadata(result)
return result, p.DetectedVendor(), nil
return result, vendor, nil
}
func (s *Server) handleGetParsers(w http.ResponseWriter, r *http.Request) {
@@ -1706,6 +1690,51 @@ func (s *Server) startCollectionJob(jobID string, req CollectRequest) {
status = CollectStatusRunning
}
s.jobManager.UpdateJobStatus(jobID, status, update.Progress, "")
if update.CurrentPhase != "" || update.ETASeconds > 0 {
s.jobManager.UpdateJobETA(jobID, update.CurrentPhase, update.ETASeconds)
}
if update.DebugInfo != nil {
debugInfo := &CollectDebugInfo{
AdaptiveThrottled: update.DebugInfo.AdaptiveThrottled,
SnapshotWorkers: update.DebugInfo.SnapshotWorkers,
PrefetchWorkers: update.DebugInfo.PrefetchWorkers,
PrefetchEnabled: update.DebugInfo.PrefetchEnabled,
}
if len(update.DebugInfo.PhaseTelemetry) > 0 {
debugInfo.PhaseTelemetry = make([]CollectPhaseTelemetry, 0, len(update.DebugInfo.PhaseTelemetry))
for _, item := range update.DebugInfo.PhaseTelemetry {
debugInfo.PhaseTelemetry = append(debugInfo.PhaseTelemetry, CollectPhaseTelemetry{
Phase: item.Phase,
Requests: item.Requests,
Errors: item.Errors,
ErrorRate: item.ErrorRate,
AvgMS: item.AvgMS,
P95MS: item.P95MS,
})
}
}
s.jobManager.UpdateJobDebugInfo(jobID, debugInfo)
}
if len(update.ActiveModules) > 0 || len(update.ModuleScores) > 0 {
activeModules := make([]CollectModuleStatus, 0, len(update.ActiveModules))
for _, module := range update.ActiveModules {
activeModules = append(activeModules, CollectModuleStatus{
Name: module.Name,
Score: module.Score,
Active: true,
})
}
moduleScores := make([]CollectModuleStatus, 0, len(update.ModuleScores))
for _, module := range update.ModuleScores {
moduleScores = append(moduleScores, CollectModuleStatus{
Name: module.Name,
Score: module.Score,
Active: module.Active,
Priority: module.Priority,
})
}
s.jobManager.UpdateJobModules(jobID, activeModules, moduleScores)
}
if update.Message != "" {
s.jobManager.AppendJobLog(jobID, update.Message)
}

View File

@@ -128,6 +128,53 @@ func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
return cloned, true
}
func (m *JobManager) UpdateJobModules(id string, activeModules, moduleScores []CollectModuleStatus) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.ActiveModules = append([]CollectModuleStatus(nil), activeModules...)
job.ModuleScores = append([]CollectModuleStatus(nil), moduleScores...)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobETA(id, phase string, etaSeconds int) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.CurrentPhase = phase
job.ETASeconds = etaSeconds
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) UpdateJobDebugInfo(id string, info *CollectDebugInfo) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.DebugInfo = cloneCollectDebugInfo(info)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
m.mu.Lock()
defer m.mu.Unlock()
@@ -176,6 +223,11 @@ func cloneJob(job *Job) *Job {
}
cloned := *job
cloned.Logs = append([]string(nil), job.Logs...)
cloned.ActiveModules = append([]CollectModuleStatus(nil), job.ActiveModules...)
cloned.ModuleScores = append([]CollectModuleStatus(nil), job.ModuleScores...)
cloned.DebugInfo = cloneCollectDebugInfo(job.DebugInfo)
cloned.CurrentPhase = job.CurrentPhase
cloned.ETASeconds = job.ETASeconds
cloned.cancel = nil
return &cloned
}

View File

@@ -0,0 +1,72 @@
package server
import (
"os"
"strings"
"testing"
"git.mchus.pro/mchus/logpile/internal/models"
)
// TestManualInspectInput is a persistent local debugging harness for checking
// how the current server code analyzes a real input file. It is skipped unless
// LOGPILE_MANUAL_INPUT points to a file on disk.
//
// Usage:
//
// LOGPILE_MANUAL_INPUT=/abs/path/to/file.zip go test ./internal/server -run TestManualInspectInput -v
func TestManualInspectInput(t *testing.T) {
path := strings.TrimSpace(os.Getenv("LOGPILE_MANUAL_INPUT"))
if path == "" {
t.Skip("set LOGPILE_MANUAL_INPUT to inspect a real input file")
}
payload, err := os.ReadFile(path)
if err != nil {
t.Fatalf("read input: %v", err)
}
s := &Server{}
filename := path
if rawPkg, ok, err := parseRawExportBundle(payload); err != nil {
t.Fatalf("parseRawExportBundle: %v", err)
} else if ok {
result, vendor, err := s.reanalyzeRawExportPackage(rawPkg)
if err != nil {
t.Fatalf("reanalyzeRawExportPackage: %v", err)
}
logManualAnalysisResult(t, "raw_export_bundle", vendor, result)
return
}
result, vendor, err := s.parseUploadedPayload(filename, payload)
if err != nil {
t.Fatalf("parseUploadedPayload: %v", err)
}
logManualAnalysisResult(t, "uploaded_payload", vendor, result)
}
func logManualAnalysisResult(t *testing.T, mode, vendor string, result *models.AnalysisResult) {
t.Helper()
if result == nil || result.Hardware == nil {
t.Fatalf("missing hardware result")
}
t.Logf("mode=%s vendor=%s source_type=%s protocol=%s target=%s", mode, vendor, result.SourceType, result.Protocol, result.TargetHost)
t.Logf("counts: gpus=%d pcie=%d cpus=%d memory=%d storage=%d nics=%d psus=%d",
len(result.Hardware.GPUs),
len(result.Hardware.PCIeDevices),
len(result.Hardware.CPUs),
len(result.Hardware.Memory),
len(result.Hardware.Storage),
len(result.Hardware.NetworkAdapters),
len(result.Hardware.PowerSupply),
)
for i, g := range result.Hardware.GPUs {
t.Logf("gpu[%d]: slot=%s model=%s bdf=%s serial=%s status=%s", i, g.Slot, g.Model, g.BDF, g.SerialNumber, g.Status)
}
for i, p := range result.Hardware.PCIeDevices {
t.Logf("pcie[%d]: slot=%s class=%s model=%s bdf=%s serial=%s vendor=%s", i, p.Slot, p.DeviceClass, p.PartNumber, p.BDF, p.SerialNumber, p.Manufacturer)
}
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"git.mchus.pro/mchus/logpile/internal/collector"
"git.mchus.pro/mchus/logpile/internal/ingest"
"git.mchus.pro/mchus/logpile/internal/models"
chartviewer "reanimator/chart/viewer"
)
@@ -38,6 +39,7 @@ type Server struct {
jobManager *JobManager
collectors *collector.Registry
ingest *ingest.Service
}
type ConvertArtifact struct {
@@ -51,6 +53,7 @@ func New(cfg Config) *Server {
mux: http.NewServeMux(),
jobManager: NewJobManager(),
collectors: collector.NewDefaultRegistry(),
ingest: ingest.NewService(),
convertJobs: make(map[string]struct{}),
convertOutput: make(map[string]ConvertArtifact),
}
@@ -160,6 +163,17 @@ func (s *Server) ClientVersionString() string {
return fmt.Sprintf("LOGPile %s (commit: %s)", v, c)
}
func (s *Server) ingestService() *ingest.Service {
if s != nil && s.ingest != nil {
return s.ingest
}
svc := ingest.NewService()
if s != nil {
s.ingest = svc
}
return svc
}
// SetDetectedVendor sets the detected vendor name
func (s *Server) SetDetectedVendor(vendor string) {
s.mu.Lock()