Improve Redfish recovery flow and raw export timing diagnostics

This commit is contained in:
2026-02-28 16:55:58 +03:00
parent 9a30705c9a
commit e0146adfff
10 changed files with 1437 additions and 58 deletions

View File

@@ -538,6 +538,10 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
const workers = 6
const heartbeatInterval = 5 * time.Second
crawlStart := time.Now()
memoryClient := c.httpClientWithTimeout(req, redfishSnapshotMemoryRequestTimeout())
memoryGate := make(chan struct{}, redfishSnapshotMemoryConcurrency())
branchLimiter := newRedfishSnapshotBranchLimiter(redfishSnapshotBranchConcurrency())
branchRetryPause := redfishSnapshotBranchRequeueBackoff()
out := make(map[string]interface{}, maxDocuments)
fetchErrors := make(map[string]string)
@@ -619,9 +623,59 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
for i := 0; i < workers; i++ {
go func(workerID int) {
for current := range jobs {
if !branchLimiter.tryAcquire(current) {
select {
case jobs <- current:
c.debugSnapshotf("worker=%d requeue branch-busy path=%s branch=%s queue_len=%d", workerID, current, redfishSnapshotBranchKey(current), len(jobs))
select {
case <-time.After(branchRetryPause):
case <-ctx.Done():
}
continue
default:
}
if !branchLimiter.waitAcquire(ctx, current, branchRetryPause) {
n := atomic.AddInt32(&processed, 1)
mu.Lock()
if _, ok := fetchErrors[current]; !ok && ctx.Err() != nil {
fetchErrors[current] = ctx.Err().Error()
}
mu.Unlock()
if emit != nil && ctx.Err() != nil {
emit(Progress{
Status: "running",
Progress: 92 + int(minInt32(n/200, 6)),
Message: fmt.Sprintf("Redfish snapshot: ошибка на %s", compactProgressPath(current)),
})
}
wg.Done()
continue
}
}
lastPath.Store(current)
c.debugSnapshotf("worker=%d fetch start path=%s queue_len=%d", workerID, current, len(jobs))
doc, err := c.getJSON(ctx, client, req, baseURL, current)
doc, err := func() (map[string]interface{}, error) {
defer branchLimiter.release(current)
if !isRedfishMemoryMemberPath(current) {
return c.getJSON(ctx, client, req, baseURL, current)
}
select {
case memoryGate <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
defer func() { <-memoryGate }()
return c.getJSONWithRetry(
ctx,
memoryClient,
req,
baseURL,
current,
redfishSnapshotMemoryRetryAttempts(),
redfishSnapshotMemoryRetryBackoff(),
)
}()
if err == nil {
mu.Lock()
out[current] = doc
@@ -1018,6 +1072,7 @@ func redfishCriticalEndpoints(systemPaths, chassisPaths, managerPaths []string)
add(joinPath(p, "/Oem/Public/ThermalConfig"))
add(joinPath(p, "/ThermalConfig"))
add(joinPath(p, "/Processors"))
add(joinPath(p, "/Memory"))
add(joinPath(p, "/Storage"))
add(joinPath(p, "/SimpleStorage"))
add(joinPath(p, "/PCIeDevices"))
@@ -1122,6 +1177,24 @@ func redfishCriticalPlanBAttempts() int {
return 3
}
func redfishCriticalMemberRetryAttempts() int {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_CRITICAL_MEMBER_RETRIES")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 1 && n <= 6 {
return n
}
}
return 1
}
func redfishCriticalMemberRecoveryMax() int {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_CRITICAL_MEMBER_RECOVERY_MAX")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 1 && n <= 1024 {
return n
}
}
return 48
}
func redfishCriticalRetryBackoff() time.Duration {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_CRITICAL_BACKOFF")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d >= 0 {
@@ -1149,6 +1222,128 @@ func redfishCriticalSlowGap() time.Duration {
return 1200 * time.Millisecond
}
func redfishSnapshotMemoryRequestTimeout() time.Duration {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_MEMORY_TIMEOUT")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
return d
}
}
return 25 * time.Second
}
func redfishSnapshotMemoryRetryAttempts() int {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_MEMORY_RETRIES")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 1 && n <= 8 {
return n
}
}
return 2
}
func redfishSnapshotMemoryRetryBackoff() time.Duration {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_MEMORY_BACKOFF")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d >= 0 {
return d
}
}
return 800 * time.Millisecond
}
func redfishSnapshotMemoryConcurrency() int {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_MEMORY_WORKERS")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 1 && n <= 8 {
return n
}
}
return 1
}
func redfishSnapshotBranchConcurrency() int {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_BRANCH_WORKERS")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 1 && n <= 8 {
return n
}
}
return 1
}
func redfishSnapshotBranchRequeueBackoff() time.Duration {
if v := strings.TrimSpace(os.Getenv("LOGPILE_REDFISH_BRANCH_BACKOFF")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d >= 0 {
return d
}
}
return 35 * time.Millisecond
}
type redfishSnapshotBranchLimiter struct {
limit int
mu sync.Mutex
inFlight map[string]int
}
func newRedfishSnapshotBranchLimiter(limit int) *redfishSnapshotBranchLimiter {
if limit < 1 {
limit = 1
}
return &redfishSnapshotBranchLimiter{
limit: limit,
inFlight: make(map[string]int),
}
}
func (l *redfishSnapshotBranchLimiter) tryAcquire(path string) bool {
branch := redfishSnapshotBranchKey(path)
if branch == "" {
return true
}
l.mu.Lock()
defer l.mu.Unlock()
if l.inFlight[branch] >= l.limit {
return false
}
l.inFlight[branch]++
return true
}
func (l *redfishSnapshotBranchLimiter) waitAcquire(ctx context.Context, path string, backoff time.Duration) bool {
branch := redfishSnapshotBranchKey(path)
if branch == "" {
return true
}
if backoff < 0 {
backoff = 0
}
for {
if l.tryAcquire(path) {
return true
}
if ctx.Err() != nil {
return false
}
select {
case <-time.After(backoff):
case <-ctx.Done():
return false
}
}
}
func (l *redfishSnapshotBranchLimiter) release(path string) {
branch := redfishSnapshotBranchKey(path)
if branch == "" {
return
}
l.mu.Lock()
defer l.mu.Unlock()
switch n := l.inFlight[branch]; {
case n <= 1:
delete(l.inFlight, branch)
default:
l.inFlight[branch] = n - 1
}
}
func redfishLinkRefs(doc map[string]interface{}, topKey, nestedKey string) []string {
top, ok := doc[topKey].(map[string]interface{})
if !ok {
@@ -1238,6 +1433,54 @@ func shouldCrawlPath(path string) bool {
return true
}
func isRedfishMemoryMemberPath(path string) bool {
normalized := normalizeRedfishPath(path)
if !strings.Contains(normalized, "/Systems/") {
return false
}
if !strings.Contains(normalized, "/Memory/") {
return false
}
if strings.Contains(normalized, "/MemoryMetrics") || strings.Contains(normalized, "/Assembly") {
return false
}
after := strings.SplitN(normalized, "/Memory/", 2)
if len(after) != 2 {
return false
}
suffix := strings.TrimSpace(after[1])
if suffix == "" || strings.Contains(suffix, "/") {
return false
}
return true
}
func redfishCollectionHasExplicitMembers(doc map[string]interface{}) bool {
return len(redfishCollectionMemberRefs(doc)) > 0
}
func redfishSnapshotBranchKey(path string) string {
normalized := normalizeRedfishPath(path)
if normalized == "" || normalized == "/redfish/v1" {
return ""
}
parts := strings.Split(strings.Trim(normalized, "/"), "/")
if len(parts) < 3 {
return normalized
}
if parts[0] != "redfish" || parts[1] != "v1" {
return normalized
}
// Keep subsystem branches independent, e.g. Systems/1/Memory vs Systems/1/PCIeDevices.
if len(parts) >= 5 && (parts[2] == "Systems" || parts[2] == "Chassis" || parts[2] == "Managers") {
return "/" + strings.Join(parts[:5], "/")
}
if len(parts) >= 4 {
return "/" + strings.Join(parts[:4], "/")
}
return "/" + strings.Join(parts[:3], "/")
}
func (c *RedfishConnector) getLinkedPCIeFunctions(ctx context.Context, client *http.Client, req Request, baseURL string, doc map[string]interface{}) []map[string]interface{} {
// Newer Redfish payloads often keep function references in Links.PCIeFunctions.
if links, ok := doc["Links"].(map[string]interface{}); ok {
@@ -1381,14 +1624,51 @@ func (c *RedfishConnector) getJSONWithRetry(ctx context.Context, client *http.Cl
return nil, lastErr
}
func (c *RedfishConnector) collectCriticalCollectionMembersSequential(ctx context.Context, client *http.Client, req Request, baseURL, collectionPath string, collectionDoc map[string]interface{}) (map[string]interface{}, bool) {
func (c *RedfishConnector) collectCriticalCollectionMembersSequential(
ctx context.Context,
client *http.Client,
req Request,
baseURL string,
collectionDoc map[string]interface{},
rawTree map[string]interface{},
fetchErrs map[string]string,
) (map[string]interface{}, bool) {
memberPaths := redfishCollectionMemberRefs(collectionDoc)
if len(memberPaths) == 0 {
return nil, false
}
out := make(map[string]interface{})
retryableMissing := make([]string, 0, len(memberPaths))
unknownMissing := make([]string, 0, len(memberPaths))
for _, memberPath := range memberPaths {
doc, err := c.getJSONWithRetry(ctx, client, req, baseURL, memberPath, redfishCriticalRetryAttempts(), redfishCriticalRetryBackoff())
memberPath = normalizeRedfishPath(memberPath)
if memberPath == "" {
continue
}
if _, exists := rawTree[memberPath]; exists {
continue
}
if msg, hasErr := fetchErrs[memberPath]; hasErr {
if !isRetryableRedfishFetchError(fmt.Errorf("%s", msg)) {
continue
}
retryableMissing = append(retryableMissing, memberPath)
continue
}
unknownMissing = append(unknownMissing, memberPath)
}
candidates := append(retryableMissing, unknownMissing...)
if len(candidates) == 0 {
return nil, false
}
if maxMembers := redfishCriticalMemberRecoveryMax(); maxMembers > 0 && len(candidates) > maxMembers {
candidates = candidates[:maxMembers]
}
out := make(map[string]interface{}, len(candidates))
for _, memberPath := range candidates {
doc, err := c.getJSONWithRetry(ctx, client, req, baseURL, memberPath, redfishCriticalMemberRetryAttempts(), redfishCriticalRetryBackoff())
if err != nil {
continue
}
@@ -1398,6 +1678,7 @@ func (c *RedfishConnector) collectCriticalCollectionMembersSequential(ctx contex
}
func (c *RedfishConnector) recoverCriticalRedfishDocsPlanB(ctx context.Context, client *http.Client, req Request, baseURL string, criticalPaths []string, rawTree map[string]interface{}, fetchErrs map[string]string, emit ProgressFn) int {
planBStart := time.Now()
var targets []string
seenTargets := make(map[string]struct{})
addTarget := func(path string) {
@@ -1493,15 +1774,20 @@ func (c *RedfishConnector) recoverCriticalRedfishDocsPlanB(ctx context.Context,
rawTree[p] = doc
delete(fetchErrs, p)
recovered++
if members, ok := c.collectCriticalCollectionMembersSequential(ctx, client, req, baseURL, p, doc); ok {
if members, ok := c.collectCriticalCollectionMembersSequential(ctx, client, req, baseURL, doc, rawTree, fetchErrs); ok {
for mp, md := range members {
if _, exists := rawTree[mp]; !exists {
rawTree[mp] = md
recovered++
if _, exists := rawTree[mp]; exists {
continue
}
rawTree[mp] = md
delete(fetchErrs, mp)
recovered++
}
}
if shouldSlowProbeCriticalCollection(p) {
// Numeric slow-probe is expensive; skip it when collection already advertises explicit members.
if shouldSlowProbeCriticalCollection(p) && !redfishCollectionHasExplicitMembers(doc) {
if children := c.probeDirectRedfishCollectionChildrenSlow(ctx, client, req, baseURL, p); len(children) > 0 {
for cp, cd := range children {
if _, exists := rawTree[cp]; exists {
@@ -1514,6 +1800,7 @@ func (c *RedfishConnector) recoverCriticalRedfishDocsPlanB(ctx context.Context,
}
continue
}
fetchErrs[p] = err.Error()
// If collection endpoint times out, still try direct child probing for common numeric paths.
if shouldSlowProbeCriticalCollection(p) {
@@ -1529,6 +1816,13 @@ func (c *RedfishConnector) recoverCriticalRedfishDocsPlanB(ctx context.Context,
}
}
}
if emit != nil {
emit(Progress{
Status: "running",
Progress: 97,
Message: fmt.Sprintf("Redfish: plan-B завершен за %s (targets=%d, recovered=%d)", time.Since(planBStart).Round(time.Second), len(targets), recovered),
})
}
return recovered
}