From 5d9e9d73de49f8d1849803efb3b35a35aef32393 Mon Sep 17 00:00:00 2001 From: Mikhail Chusavitin Date: Tue, 24 Feb 2026 16:22:37 +0300 Subject: [PATCH] Fix Redfish snapshot crawl deadlock and add debug progress --- internal/collector/redfish.go | 126 ++++++++++++++++++++++++++++++++-- 1 file changed, 119 insertions(+), 7 deletions(-) diff --git a/internal/collector/redfish.go b/internal/collector/redfish.go index 2bdb4c4..8f41b35 100644 --- a/internal/collector/redfish.go +++ b/internal/collector/redfish.go @@ -6,8 +6,10 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "net/url" + "os" "path" "sort" "strconv" @@ -22,11 +24,23 @@ import ( type RedfishConnector struct { timeout time.Duration + debug bool + debugSnapshot bool } func NewRedfishConnector() *RedfishConnector { + debug := false + if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_DEBUG")); v != "" && v != "0" && !strings.EqualFold(v, "false") { + debug = true + } + debugSnapshot := false + if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_SNAPSHOT_DEBUG")); v != "" && v != "0" && !strings.EqualFold(v, "false") { + debugSnapshot = true + } return &RedfishConnector{ - timeout: 10 * time.Second, + timeout: 10 * time.Second, + debug: debug, + debugSnapshot: debugSnapshot || debug, } } @@ -34,6 +48,20 @@ func (c *RedfishConnector) Protocol() string { return "redfish" } +func (c *RedfishConnector) debugf(format string, args ...interface{}) { + if !c.debug { + return + } + log.Printf("redfish-debug: "+format, args...) +} + +func (c *RedfishConnector) debugSnapshotf(format string, args ...interface{}) { + if !c.debugSnapshot { + return + } + log.Printf("redfish-snapshot-debug: "+format, args...) +} + func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit ProgressFn) (*models.AnalysisResult, error) { baseURL, err := c.baseURL(req) if err != nil { @@ -84,7 +112,9 @@ func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit Progre if emit != nil { emit(Progress{Status: "running", Progress: 90, Message: "Redfish: сбор расширенного snapshot..."}) } + c.debugSnapshotf("snapshot crawl start host=%s port=%d", req.Host, req.Port) rawTree := c.collectRawRedfishTree(ctx, client, req, baseURL, emit) + c.debugSnapshotf("snapshot crawl done docs=%d", len(rawTree)) result := &models.AnalysisResult{ Events: make([]models.Event, 0), @@ -422,14 +452,19 @@ func (c *RedfishConnector) discoverMemberPaths(ctx context.Context, client *http func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *http.Client, req Request, baseURL string, emit ProgressFn) map[string]interface{} { const maxDocuments = 1200 const workers = 6 + const heartbeatInterval = 5 * time.Second out := make(map[string]interface{}, maxDocuments) seen := make(map[string]struct{}, maxDocuments) rootCounts := make(map[string]int) var mu sync.Mutex var processed int32 + var lastPath atomic.Value - jobs := make(chan string, 256) + // Workers enqueue newly discovered links into the same queue they consume. + // The queue capacity must be at least the crawl cap to avoid producer/consumer + // deadlock when several workers discover many links at once. + jobs := make(chan string, maxDocuments) var wg sync.WaitGroup enqueue := func(path string) { @@ -453,9 +488,49 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht } enqueue("/redfish/v1") - for i := 0; i < workers; i++ { + c.debugSnapshotf("snapshot queue initialized workers=%d max_documents=%d", workers, maxDocuments) + stopHeartbeat := make(chan struct{}) + if emit != nil { go func() { + ticker := time.NewTicker(heartbeatInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + n := atomic.LoadInt32(&processed) + mu.Lock() + countsCopy := make(map[string]int, len(rootCounts)) + for k, v := range rootCounts { + countsCopy[k] = v + } + seenN := len(seen) + outN := len(out) + mu.Unlock() + roots := topRoots(countsCopy, 2) + last := "/redfish/v1" + if v := lastPath.Load(); v != nil { + if s, ok := v.(string); ok && s != "" { + last = s + } + } + emit(Progress{ + Status: "running", + Progress: 92 + int(minInt32(n/200, 6)), + Message: fmt.Sprintf("Redfish snapshot: heartbeat документов=%d (ok=%d, seen=%d), корни=%s, последний=%s", n, outN, seenN, strings.Join(roots, ", "), compactProgressPath(last)), + }) + case <-stopHeartbeat: + return + case <-ctx.Done(): + return + } + } + }() + } + for i := 0; i < workers; i++ { + go func(workerID int) { for current := range jobs { + lastPath.Store(current) + c.debugSnapshotf("worker=%d fetch start path=%s queue_len=%d", workerID, current, len(jobs)) doc, err := c.getJSON(ctx, client, req, baseURL, current) if err == nil { mu.Lock() @@ -467,8 +542,17 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht enqueue(ref) } } - n := atomic.AddInt32(&processed, 1) + if err != nil { + c.debugSnapshotf("worker=%d fetch error path=%s err=%v", workerID, current, err) + if emit != nil { + emit(Progress{ + Status: "running", + Progress: 92 + int(minInt32(n/200, 6)), + Message: fmt.Sprintf("Redfish snapshot: ошибка на %s", compactProgressPath(current)), + }) + } + } if emit != nil && n%40 == 0 { mu.Lock() countsCopy := make(map[string]int, len(rootCounts)) @@ -477,19 +561,33 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht } mu.Unlock() roots := topRoots(countsCopy, 2) + last := current + if v := lastPath.Load(); v != nil { + if s, ok := v.(string); ok && s != "" { + last = s + } + } emit(Progress{ Status: "running", Progress: 92 + int(minInt32(n/200, 6)), - Message: fmt.Sprintf("Redfish snapshot: документов=%d, корни=%s", n, strings.Join(roots, ", ")), + Message: fmt.Sprintf("Redfish snapshot: документов=%d, корни=%s, последний=%s", n, strings.Join(roots, ", "), compactProgressPath(last)), }) } + if n%20 == 0 || err != nil { + mu.Lock() + seenN := len(seen) + outN := len(out) + mu.Unlock() + c.debugSnapshotf("snapshot progress processed=%d stored=%d seen=%d queue_len=%d", n, outN, seenN, len(jobs)) + } wg.Done() } - }() + }(i + 1) } wg.Wait() + close(stopHeartbeat) close(jobs) if emit != nil { @@ -591,6 +689,7 @@ func (c *RedfishConnector) getCollectionMembers(ctx context.Context, client *htt } func (c *RedfishConnector) getJSON(ctx context.Context, client *http.Client, req Request, baseURL, requestPath string) (map[string]interface{}, error) { + start := time.Now() rel := requestPath if rel == "" { rel = "/" @@ -621,21 +720,26 @@ func (c *RedfishConnector) getJSON(ctx context.Context, client *http.Client, req resp, err := client.Do(httpReq) if err != nil { + c.debugf("http get path=%s error=%v dur=%s", requestPath, err, time.Since(start).Round(time.Millisecond)) return nil, err } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) - return nil, fmt.Errorf("status %d from %s: %s", resp.StatusCode, requestPath, strings.TrimSpace(string(body))) + err := fmt.Errorf("status %d from %s: %s", resp.StatusCode, requestPath, strings.TrimSpace(string(body))) + c.debugf("http get path=%s status=%d dur=%s", requestPath, resp.StatusCode, time.Since(start).Round(time.Millisecond)) + return nil, err } var doc map[string]interface{} dec := json.NewDecoder(resp.Body) dec.UseNumber() if err := dec.Decode(&doc); err != nil { + c.debugf("http get path=%s decode_error=%v dur=%s", requestPath, err, time.Since(start).Round(time.Millisecond)) return nil, err } + c.debugf("http get path=%s status=%d dur=%s", requestPath, resp.StatusCode, time.Since(start).Round(time.Millisecond)) return doc, nil } @@ -1340,6 +1444,14 @@ func topRoots(counts map[string]int, limit int) []string { return out } +func compactProgressPath(p string) string { + const maxLen = 72 + if len(p) <= maxLen { + return p + } + return "..." + p[len(p)-maxLen+3:] +} + func minInt32(a, b int32) int32 { if a < b { return a