collector/redfish: make prefetch/post-probe adaptive with metrics

This commit is contained in:
2026-02-28 19:05:34 +03:00
parent fe5da1dbd7
commit 2fa4a1235a
2 changed files with 319 additions and 38 deletions

View File

@@ -28,6 +28,27 @@ type RedfishConnector struct {
debugSnapshot bool
}
type redfishPrefetchMetrics struct {
Enabled bool
Candidates int
Targets int
Docs int
Added int
Duration time.Duration
SkipReason string
}
type redfishPostProbeMetrics struct {
NVMECandidates int
NVMESelected int
NVMEAdded int
CollectionCandidates int
CollectionSelected int
SkippedExplicit int
Added int
Duration time.Duration
}
func NewRedfishConnector() *RedfishConnector {
debug := false
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_DEBUG")); v != "" && v != "0" && !strings.EqualFold(v, "false") {
@@ -87,7 +108,6 @@ func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit Progre
if emit != nil {
emit(Progress{Status: "running", Progress: 30, Message: "Redfish: чтение структуры Redfish..."})
}
prefetchedCritical := c.prefetchCriticalRedfishDocs(ctx, prefetchClient, req, baseURL, criticalPaths, emit)
if emit != nil {
emit(Progress{Status: "running", Progress: 55, Message: "Redfish: подготовка snapshot..."})
@@ -95,29 +115,31 @@ func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit Progre
emit(Progress{Status: "running", Progress: 90, Message: "Redfish: сбор расширенного snapshot..."})
}
c.debugSnapshotf("snapshot crawl start host=%s port=%d", req.Host, req.Port)
rawTree, fetchErrors := c.collectRawRedfishTree(ctx, snapshotClient, req, baseURL, redfishSnapshotPrioritySeeds(systemPaths, chassisPaths, managerPaths), emit)
if len(prefetchedCritical) > 0 {
reused := 0
for p, doc := range prefetchedCritical {
if _, exists := rawTree[p]; exists {
continue
}
rawTree[p] = doc
reused++
}
if emit != nil && reused > 0 {
emit(Progress{
Status: "running",
Progress: 96,
Message: fmt.Sprintf("Redfish: prefetch использован для %d документов", reused),
})
}
}
rawTree, fetchErrors, postProbeMetrics := c.collectRawRedfishTree(ctx, snapshotClient, req, baseURL, redfishSnapshotPrioritySeeds(systemPaths, chassisPaths, managerPaths), emit)
c.debugSnapshotf("snapshot crawl done docs=%d", len(rawTree))
fetchErrMap := redfishFetchErrorListToMap(fetchErrors)
prefetchedCritical, prefetchMetrics := c.prefetchCriticalRedfishDocs(ctx, prefetchClient, req, baseURL, criticalPaths, rawTree, fetchErrMap, emit)
for p, doc := range prefetchedCritical {
if _, exists := rawTree[p]; exists {
continue
}
rawTree[p] = doc
prefetchMetrics.Added++
}
for p := range prefetchedCritical {
delete(fetchErrMap, p)
}
log.Printf(
"redfish-prefetch-metrics: enabled=%t candidates=%d targets=%d docs=%d added=%d dur=%s skip=%s",
prefetchMetrics.Enabled,
prefetchMetrics.Candidates,
prefetchMetrics.Targets,
prefetchMetrics.Docs,
prefetchMetrics.Added,
prefetchMetrics.Duration.Round(time.Millisecond),
firstNonEmpty(prefetchMetrics.SkipReason, "-"),
)
if recoveredN := c.recoverCriticalRedfishDocsPlanB(ctx, criticalClient, req, baseURL, criticalPaths, rawTree, fetchErrMap, emit); recoveredN > 0 {
c.debugSnapshotf("critical plan-b recovered docs=%d", recoveredN)
}
@@ -142,6 +164,17 @@ func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit Progre
return nil, err
}
totalElapsed := time.Since(collectStart).Round(time.Second)
log.Printf(
"redfish-postprobe-metrics: nvme_candidates=%d nvme_selected=%d nvme_added=%d candidates=%d selected=%d skipped_explicit=%d added=%d dur=%s",
postProbeMetrics.NVMECandidates,
postProbeMetrics.NVMESelected,
postProbeMetrics.NVMEAdded,
postProbeMetrics.CollectionCandidates,
postProbeMetrics.CollectionSelected,
postProbeMetrics.SkippedExplicit,
postProbeMetrics.Added,
postProbeMetrics.Duration.Round(time.Millisecond),
)
log.Printf("redfish-collect: completed in %s (docs=%d, fetch_errors=%d)", totalElapsed, len(rawTree), len(fetchErrMap))
if emit != nil {
emit(Progress{
@@ -159,24 +192,46 @@ func (c *RedfishConnector) prefetchCriticalRedfishDocs(
req Request,
baseURL string,
criticalPaths []string,
rawTree map[string]interface{},
fetchErrMap map[string]string,
emit ProgressFn,
) map[string]interface{} {
if !redfishPrefetchEnabled() || len(criticalPaths) == 0 {
return nil
) (map[string]interface{}, redfishPrefetchMetrics) {
metrics := redfishPrefetchMetrics{
Enabled: redfishPrefetchEnabled(),
}
if !metrics.Enabled || len(criticalPaths) == 0 {
metrics.SkipReason = "disabled-or-empty"
return nil, metrics
}
targets := redfishPrefetchTargets(criticalPaths)
candidates := redfishPrefetchTargets(criticalPaths)
metrics.Candidates = len(candidates)
if len(candidates) == 0 {
metrics.SkipReason = "no-candidates"
return nil, metrics
}
targets := redfishAdaptivePrefetchTargets(candidates, rawTree, fetchErrMap)
metrics.Targets = len(targets)
if len(targets) == 0 {
return nil
metrics.SkipReason = "not-needed"
if emit != nil {
emit(Progress{
Status: "running",
Progress: 96,
Message: fmt.Sprintf("Redfish: prefetch пропущен (адаптивно, кандидатов=%d)", metrics.Candidates),
})
}
return nil, metrics
}
if emit != nil {
emit(Progress{
Status: "running",
Progress: 35,
Message: fmt.Sprintf("Redfish: prefetch критичных endpoint (%d)...", len(targets)),
Progress: 96,
Message: fmt.Sprintf("Redfish: prefetch критичных endpoint (адаптивно %d/%d)...", len(targets), len(candidates)),
})
}
start := time.Now()
out := make(map[string]interface{}, len(targets))
seen := make(map[string]struct{}, len(targets))
var mu sync.Mutex
@@ -241,22 +296,78 @@ func (c *RedfishConnector) prefetchCriticalRedfishDocs(
case <-ctx.Done():
close(jobs)
wg.Wait()
return out
metrics.Docs = len(out)
metrics.Duration = time.Since(start)
metrics.SkipReason = "ctx-cancelled"
return out, metrics
}
}
close(jobs)
wg.Wait()
metrics.Docs = len(out)
metrics.Duration = time.Since(start)
if emit != nil {
emit(Progress{
Status: "running",
Progress: 40,
Message: fmt.Sprintf("Redfish: prefetch завершен (targets=%d, docs=%d)", len(targets), len(out)),
Progress: 96,
Message: fmt.Sprintf("Redfish: prefetch завершен (адаптивно targets=%d, docs=%d)", len(targets), len(out)),
})
}
return out, metrics
}
func redfishAdaptivePrefetchTargets(candidates []string, rawTree map[string]interface{}, fetchErrs map[string]string) []string {
out := make([]string, 0, len(candidates))
seen := make(map[string]struct{}, len(candidates))
for _, p := range candidates {
p = normalizeRedfishPath(p)
if p == "" {
continue
}
if _, exists := seen[p]; exists {
continue
}
needsFetch := false
docAny, inTree := rawTree[p]
if !inTree {
needsFetch = true
if msg, hasErr := fetchErrs[p]; hasErr && !isRetryableRedfishFetchError(fmt.Errorf("%s", msg)) {
needsFetch = false
}
} else if doc, ok := docAny.(map[string]interface{}); ok {
needsFetch = redfishCollectionNeedsMemberRecovery(doc, rawTree, fetchErrs)
}
if !needsFetch {
continue
}
seen[p] = struct{}{}
out = append(out, p)
}
return out
}
func redfishCollectionNeedsMemberRecovery(collectionDoc map[string]interface{}, rawTree map[string]interface{}, fetchErrs map[string]string) bool {
memberPaths := redfishCollectionMemberRefs(collectionDoc)
if len(memberPaths) == 0 {
return false
}
for _, memberPath := range memberPaths {
memberPath = normalizeRedfishPath(memberPath)
if memberPath == "" {
continue
}
if _, exists := rawTree[memberPath]; exists {
continue
}
if msg, hasErr := fetchErrs[memberPath]; hasErr && !isRetryableRedfishFetchError(fmt.Errorf("%s", msg)) {
continue
}
return true
}
return false
}
func (c *RedfishConnector) httpClient(req Request) *http.Client {
return c.httpClientWithTimeout(req, c.timeout)
}
@@ -656,7 +767,7 @@ func (c *RedfishConnector) discoverMemberPaths(ctx context.Context, client *http
return nil
}
func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *http.Client, req Request, baseURL string, seedPaths []string, emit ProgressFn) (map[string]interface{}, []map[string]interface{}) {
func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *http.Client, req Request, baseURL string, seedPaths []string, emit ProgressFn) (map[string]interface{}, []map[string]interface{}, redfishPostProbeMetrics) {
maxDocuments := redfishSnapshotMaxDocuments()
workers := redfishSnapshotWorkers()
const heartbeatInterval = 5 * time.Second
@@ -667,6 +778,7 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
branchRetryPause := redfishSnapshotBranchRequeueBackoff()
timings := newRedfishPathTimingCollector(4)
postProbeMetrics := redfishPostProbeMetrics{}
out := make(map[string]interface{}, maxDocuments)
fetchErrors := make(map[string]string)
seen := make(map[string]struct{}, maxDocuments)
@@ -869,13 +981,22 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
// Some Supermicro BMCs expose NVMe disks at direct Disk.Bay endpoints even when the
// Drives collection returns Members: []. Probe those paths so raw export can be replayed.
postProbeTotalStart := time.Now()
driveCollections := make([]string, 0)
for path := range out {
if strings.HasSuffix(normalizeRedfishPath(path), "/Drives") {
driveCollections = append(driveCollections, normalizeRedfishPath(path))
for path, docAny := range out {
normalized := normalizeRedfishPath(path)
if !strings.HasSuffix(normalized, "/Drives") {
continue
}
postProbeMetrics.NVMECandidates++
doc, _ := docAny.(map[string]interface{})
if !shouldAdaptiveNVMeProbe(doc) {
continue
}
driveCollections = append(driveCollections, normalized)
}
sort.Strings(driveCollections)
postProbeMetrics.NVMESelected = len(driveCollections)
nvmeProbeStart := time.Now()
for i, path := range driveCollections {
if emit != nil && len(driveCollections) > 0 && (i == 0 || i%4 == 0 || i == len(driveCollections)-1) {
@@ -893,19 +1014,35 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
if !looksLikeDrive(doc) {
continue
}
out[normalizeRedfishPath(bayPath)] = doc
normalizedBayPath := normalizeRedfishPath(bayPath)
if _, exists := out[normalizedBayPath]; exists {
continue
}
out[normalizedBayPath] = doc
postProbeMetrics.NVMEAdded++
c.debugSnapshotf("snapshot nvme bay probe hit path=%s", bayPath)
}
}
// Some BMCs under-report collection Members for sensors/PSU subresources but still serve
// direct numeric child endpoints. Probe common collections to maximize raw snapshot fidelity.
postProbeCollections := make([]string, 0)
for path := range out {
if shouldPostProbeCollectionPath(path) {
postProbeCollections = append(postProbeCollections, normalizeRedfishPath(path))
for path, docAny := range out {
normalized := normalizeRedfishPath(path)
if !shouldPostProbeCollectionPath(normalized) {
continue
}
postProbeMetrics.CollectionCandidates++
doc, _ := docAny.(map[string]interface{})
if shouldAdaptivePostProbeCollectionPath(normalized, doc) {
postProbeCollections = append(postProbeCollections, normalized)
continue
}
if redfishCollectionHasExplicitMembers(doc) {
postProbeMetrics.SkippedExplicit++
}
}
sort.Strings(postProbeCollections)
postProbeMetrics.CollectionSelected = len(postProbeCollections)
postProbeStart := time.Now()
addedPostProbe := 0
for i, path := range postProbeCollections {
@@ -924,6 +1061,8 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
addedPostProbe++
}
}
postProbeMetrics.Added = addedPostProbe
postProbeMetrics.Duration = time.Since(postProbeTotalStart)
if emit != nil && addedPostProbe > 0 {
emit(Progress{
Status: "running",
@@ -931,6 +1070,13 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
Message: fmt.Sprintf("Redfish snapshot: post-probe добавлено %d документов", addedPostProbe),
})
}
if emit != nil {
emit(Progress{
Status: "running",
Progress: 98,
Message: fmt.Sprintf("Redfish snapshot: post-probe метрики candidates=%d selected=%d skipped_explicit=%d added=%d", postProbeMetrics.CollectionCandidates, postProbeMetrics.CollectionSelected, postProbeMetrics.SkippedExplicit, postProbeMetrics.Added),
})
}
if emit != nil {
emit(Progress{
@@ -963,7 +1109,7 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
}
}
return out, errorList
return out, errorList, postProbeMetrics
}
func (c *RedfishConnector) probeSupermicroNVMeDiskBays(ctx context.Context, client *http.Client, req Request, baseURL, backplanePath string) []map[string]interface{} {
@@ -1142,6 +1288,58 @@ func shouldPostProbeCollectionPath(path string) bool {
}
}
func shouldAdaptivePostProbeCollectionPath(path string, collectionDoc map[string]interface{}) bool {
path = normalizeRedfishPath(path)
if !shouldPostProbeCollectionPath(path) {
return false
}
if len(collectionDoc) == 0 {
return true
}
memberRefs := redfishCollectionMemberRefs(collectionDoc)
if len(memberRefs) == 0 {
return true
}
return redfishCollectionHasNumericMemberRefs(memberRefs)
}
func shouldAdaptiveNVMeProbe(collectionDoc map[string]interface{}) bool {
if len(collectionDoc) == 0 {
return true
}
return !redfishCollectionHasExplicitMembers(collectionDoc)
}
func redfishCollectionHasNumericMemberRefs(memberRefs []string) bool {
for _, memberPath := range memberRefs {
if redfishPathTailIsNumeric(memberPath) {
return true
}
}
return false
}
func redfishPathTailIsNumeric(path string) bool {
normalized := normalizeRedfishPath(path)
if normalized == "" {
return false
}
parts := strings.Split(strings.Trim(normalized, "/"), "/")
if len(parts) == 0 {
return false
}
tail := strings.TrimSpace(parts[len(parts)-1])
if tail == "" {
return false
}
for _, r := range tail {
if r < '0' || r > '9' {
return false
}
}
return true
}
func looksLikeRedfishResource(doc map[string]interface{}) bool {
if len(doc) == 0 {
return false