Fix Redfish snapshot crawl deadlock and add debug progress
This commit is contained in:
@@ -6,8 +6,10 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
@@ -22,11 +24,23 @@ import (
|
||||
|
||||
type RedfishConnector struct {
|
||||
timeout time.Duration
|
||||
debug bool
|
||||
debugSnapshot bool
|
||||
}
|
||||
|
||||
func NewRedfishConnector() *RedfishConnector {
|
||||
debug := false
|
||||
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_DEBUG")); v != "" && v != "0" && !strings.EqualFold(v, "false") {
|
||||
debug = true
|
||||
}
|
||||
debugSnapshot := false
|
||||
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_SNAPSHOT_DEBUG")); v != "" && v != "0" && !strings.EqualFold(v, "false") {
|
||||
debugSnapshot = true
|
||||
}
|
||||
return &RedfishConnector{
|
||||
timeout: 10 * time.Second,
|
||||
timeout: 10 * time.Second,
|
||||
debug: debug,
|
||||
debugSnapshot: debugSnapshot || debug,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +48,20 @@ func (c *RedfishConnector) Protocol() string {
|
||||
return "redfish"
|
||||
}
|
||||
|
||||
func (c *RedfishConnector) debugf(format string, args ...interface{}) {
|
||||
if !c.debug {
|
||||
return
|
||||
}
|
||||
log.Printf("redfish-debug: "+format, args...)
|
||||
}
|
||||
|
||||
func (c *RedfishConnector) debugSnapshotf(format string, args ...interface{}) {
|
||||
if !c.debugSnapshot {
|
||||
return
|
||||
}
|
||||
log.Printf("redfish-snapshot-debug: "+format, args...)
|
||||
}
|
||||
|
||||
func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit ProgressFn) (*models.AnalysisResult, error) {
|
||||
baseURL, err := c.baseURL(req)
|
||||
if err != nil {
|
||||
@@ -84,7 +112,9 @@ func (c *RedfishConnector) Collect(ctx context.Context, req Request, emit Progre
|
||||
if emit != nil {
|
||||
emit(Progress{Status: "running", Progress: 90, Message: "Redfish: сбор расширенного snapshot..."})
|
||||
}
|
||||
c.debugSnapshotf("snapshot crawl start host=%s port=%d", req.Host, req.Port)
|
||||
rawTree := c.collectRawRedfishTree(ctx, client, req, baseURL, emit)
|
||||
c.debugSnapshotf("snapshot crawl done docs=%d", len(rawTree))
|
||||
|
||||
result := &models.AnalysisResult{
|
||||
Events: make([]models.Event, 0),
|
||||
@@ -422,14 +452,19 @@ func (c *RedfishConnector) discoverMemberPaths(ctx context.Context, client *http
|
||||
func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *http.Client, req Request, baseURL string, emit ProgressFn) map[string]interface{} {
|
||||
const maxDocuments = 1200
|
||||
const workers = 6
|
||||
const heartbeatInterval = 5 * time.Second
|
||||
|
||||
out := make(map[string]interface{}, maxDocuments)
|
||||
seen := make(map[string]struct{}, maxDocuments)
|
||||
rootCounts := make(map[string]int)
|
||||
var mu sync.Mutex
|
||||
var processed int32
|
||||
var lastPath atomic.Value
|
||||
|
||||
jobs := make(chan string, 256)
|
||||
// Workers enqueue newly discovered links into the same queue they consume.
|
||||
// The queue capacity must be at least the crawl cap to avoid producer/consumer
|
||||
// deadlock when several workers discover many links at once.
|
||||
jobs := make(chan string, maxDocuments)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
enqueue := func(path string) {
|
||||
@@ -453,9 +488,49 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
}
|
||||
|
||||
enqueue("/redfish/v1")
|
||||
for i := 0; i < workers; i++ {
|
||||
c.debugSnapshotf("snapshot queue initialized workers=%d max_documents=%d", workers, maxDocuments)
|
||||
stopHeartbeat := make(chan struct{})
|
||||
if emit != nil {
|
||||
go func() {
|
||||
ticker := time.NewTicker(heartbeatInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
n := atomic.LoadInt32(&processed)
|
||||
mu.Lock()
|
||||
countsCopy := make(map[string]int, len(rootCounts))
|
||||
for k, v := range rootCounts {
|
||||
countsCopy[k] = v
|
||||
}
|
||||
seenN := len(seen)
|
||||
outN := len(out)
|
||||
mu.Unlock()
|
||||
roots := topRoots(countsCopy, 2)
|
||||
last := "/redfish/v1"
|
||||
if v := lastPath.Load(); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
last = s
|
||||
}
|
||||
}
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 92 + int(minInt32(n/200, 6)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: heartbeat документов=%d (ok=%d, seen=%d), корни=%s, последний=%s", n, outN, seenN, strings.Join(roots, ", "), compactProgressPath(last)),
|
||||
})
|
||||
case <-stopHeartbeat:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
for i := 0; i < workers; i++ {
|
||||
go func(workerID int) {
|
||||
for current := range jobs {
|
||||
lastPath.Store(current)
|
||||
c.debugSnapshotf("worker=%d fetch start path=%s queue_len=%d", workerID, current, len(jobs))
|
||||
doc, err := c.getJSON(ctx, client, req, baseURL, current)
|
||||
if err == nil {
|
||||
mu.Lock()
|
||||
@@ -467,8 +542,17 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
enqueue(ref)
|
||||
}
|
||||
}
|
||||
|
||||
n := atomic.AddInt32(&processed, 1)
|
||||
if err != nil {
|
||||
c.debugSnapshotf("worker=%d fetch error path=%s err=%v", workerID, current, err)
|
||||
if emit != nil {
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 92 + int(minInt32(n/200, 6)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: ошибка на %s", compactProgressPath(current)),
|
||||
})
|
||||
}
|
||||
}
|
||||
if emit != nil && n%40 == 0 {
|
||||
mu.Lock()
|
||||
countsCopy := make(map[string]int, len(rootCounts))
|
||||
@@ -477,19 +561,33 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
}
|
||||
mu.Unlock()
|
||||
roots := topRoots(countsCopy, 2)
|
||||
last := current
|
||||
if v := lastPath.Load(); v != nil {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
last = s
|
||||
}
|
||||
}
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 92 + int(minInt32(n/200, 6)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: документов=%d, корни=%s", n, strings.Join(roots, ", ")),
|
||||
Message: fmt.Sprintf("Redfish snapshot: документов=%d, корни=%s, последний=%s", n, strings.Join(roots, ", "), compactProgressPath(last)),
|
||||
})
|
||||
}
|
||||
if n%20 == 0 || err != nil {
|
||||
mu.Lock()
|
||||
seenN := len(seen)
|
||||
outN := len(out)
|
||||
mu.Unlock()
|
||||
c.debugSnapshotf("snapshot progress processed=%d stored=%d seen=%d queue_len=%d", n, outN, seenN, len(jobs))
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}
|
||||
}()
|
||||
}(i + 1)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(stopHeartbeat)
|
||||
close(jobs)
|
||||
|
||||
if emit != nil {
|
||||
@@ -591,6 +689,7 @@ func (c *RedfishConnector) getCollectionMembers(ctx context.Context, client *htt
|
||||
}
|
||||
|
||||
func (c *RedfishConnector) getJSON(ctx context.Context, client *http.Client, req Request, baseURL, requestPath string) (map[string]interface{}, error) {
|
||||
start := time.Now()
|
||||
rel := requestPath
|
||||
if rel == "" {
|
||||
rel = "/"
|
||||
@@ -621,21 +720,26 @@ func (c *RedfishConnector) getJSON(ctx context.Context, client *http.Client, req
|
||||
|
||||
resp, err := client.Do(httpReq)
|
||||
if err != nil {
|
||||
c.debugf("http get path=%s error=%v dur=%s", requestPath, err, time.Since(start).Round(time.Millisecond))
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||||
return nil, fmt.Errorf("status %d from %s: %s", resp.StatusCode, requestPath, strings.TrimSpace(string(body)))
|
||||
err := fmt.Errorf("status %d from %s: %s", resp.StatusCode, requestPath, strings.TrimSpace(string(body)))
|
||||
c.debugf("http get path=%s status=%d dur=%s", requestPath, resp.StatusCode, time.Since(start).Round(time.Millisecond))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var doc map[string]interface{}
|
||||
dec := json.NewDecoder(resp.Body)
|
||||
dec.UseNumber()
|
||||
if err := dec.Decode(&doc); err != nil {
|
||||
c.debugf("http get path=%s decode_error=%v dur=%s", requestPath, err, time.Since(start).Round(time.Millisecond))
|
||||
return nil, err
|
||||
}
|
||||
c.debugf("http get path=%s status=%d dur=%s", requestPath, resp.StatusCode, time.Since(start).Round(time.Millisecond))
|
||||
|
||||
return doc, nil
|
||||
}
|
||||
@@ -1340,6 +1444,14 @@ func topRoots(counts map[string]int, limit int) []string {
|
||||
return out
|
||||
}
|
||||
|
||||
func compactProgressPath(p string) string {
|
||||
const maxLen = 72
|
||||
if len(p) <= maxLen {
|
||||
return p
|
||||
}
|
||||
return "..." + p[len(p)-maxLen+3:]
|
||||
}
|
||||
|
||||
func minInt32(a, b int32) int32 {
|
||||
if a < b {
|
||||
return a
|
||||
|
||||
Reference in New Issue
Block a user