collector/redfish: add ETA estimates to snapshot and plan-B progress
This commit is contained in:
@@ -549,6 +549,7 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
maxDocuments := redfishSnapshotMaxDocuments()
|
||||
const workers = 6
|
||||
const heartbeatInterval = 5 * time.Second
|
||||
crawlStart := time.Now()
|
||||
|
||||
out := make(map[string]interface{}, maxDocuments)
|
||||
fetchErrors := make(map[string]string)
|
||||
@@ -613,10 +614,11 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
last = s
|
||||
}
|
||||
}
|
||||
eta := formatETA(estimateSnapshotETA(crawlStart, int(n), seenN, len(jobs), workers, client.Timeout))
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 92 + int(minInt32(n/200, 6)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: heartbeat документов=%d (ok=%d, seen=%d), корни=%s, последний=%s", n, outN, seenN, strings.Join(roots, ", "), compactProgressPath(last)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: heartbeat документов=%d (ok=%d, seen=%d), ETA≈%s, корни=%s, последний=%s", n, outN, seenN, eta, strings.Join(roots, ", "), compactProgressPath(last)),
|
||||
})
|
||||
case <-stopHeartbeat:
|
||||
return
|
||||
@@ -664,6 +666,7 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
for k, v := range rootCounts {
|
||||
countsCopy[k] = v
|
||||
}
|
||||
seenN := len(seen)
|
||||
mu.Unlock()
|
||||
roots := topRoots(countsCopy, 2)
|
||||
last := current
|
||||
@@ -672,10 +675,11 @@ func (c *RedfishConnector) collectRawRedfishTree(ctx context.Context, client *ht
|
||||
last = s
|
||||
}
|
||||
}
|
||||
eta := formatETA(estimateSnapshotETA(crawlStart, int(n), seenN, len(jobs), workers, client.Timeout))
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 92 + int(minInt32(n/200, 6)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: документов=%d, корни=%s, последний=%s", n, strings.Join(roots, ", "), compactProgressPath(last)),
|
||||
Message: fmt.Sprintf("Redfish snapshot: документов=%d, ETA≈%s, корни=%s, последний=%s", n, eta, strings.Join(roots, ", "), compactProgressPath(last)),
|
||||
})
|
||||
}
|
||||
if n%20 == 0 || err != nil {
|
||||
@@ -1376,7 +1380,12 @@ func (c *RedfishConnector) recoverCriticalRedfishDocsPlanB(ctx context.Context,
|
||||
return 0
|
||||
}
|
||||
if emit != nil {
|
||||
emit(Progress{Status: "running", Progress: 97, Message: "Redfish: cooldown перед повторным добором критичных endpoint..."})
|
||||
totalETA := redfishCriticalCooldown() + estimatePlanBETA(len(targets))
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 97,
|
||||
Message: fmt.Sprintf("Redfish: cooldown перед повторным добором критичных endpoint... ETA≈%s", formatETA(totalETA)),
|
||||
})
|
||||
}
|
||||
select {
|
||||
case <-time.After(redfishCriticalCooldown()):
|
||||
@@ -1387,10 +1396,11 @@ func (c *RedfishConnector) recoverCriticalRedfishDocsPlanB(ctx context.Context,
|
||||
recovered := 0
|
||||
for i, p := range targets {
|
||||
if emit != nil {
|
||||
remaining := len(targets) - i
|
||||
emit(Progress{
|
||||
Status: "running",
|
||||
Progress: 97,
|
||||
Message: fmt.Sprintf("Redfish: plan-B (%d/%d) %s", i+1, len(targets), compactProgressPath(p)),
|
||||
Message: fmt.Sprintf("Redfish: plan-B (%d/%d, ETA≈%s) %s", i+1, len(targets), formatETA(estimatePlanBETA(remaining)), compactProgressPath(p)),
|
||||
})
|
||||
}
|
||||
if i > 0 {
|
||||
@@ -2656,3 +2666,81 @@ func minInt32(a, b int32) int32 {
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func maxInt(values ...int) int {
|
||||
if len(values) == 0 {
|
||||
return 0
|
||||
}
|
||||
max := values[0]
|
||||
for _, v := range values[1:] {
|
||||
if v > max {
|
||||
max = v
|
||||
}
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
func estimateSnapshotETA(start time.Time, processed, seen, queueLen, workers int, requestTimeout time.Duration) time.Duration {
|
||||
remaining := maxInt(seen-processed, queueLen, 0)
|
||||
if remaining == 0 {
|
||||
return 0
|
||||
}
|
||||
if workers <= 0 {
|
||||
workers = 1
|
||||
}
|
||||
if requestTimeout <= 0 {
|
||||
requestTimeout = 10 * time.Second
|
||||
}
|
||||
|
||||
timeoutBased := time.Duration(float64(requestTimeout) * float64(remaining) / float64(workers))
|
||||
if processed <= 0 {
|
||||
return timeoutBased
|
||||
}
|
||||
|
||||
elapsed := time.Since(start)
|
||||
if elapsed <= 0 {
|
||||
return timeoutBased
|
||||
}
|
||||
rateBased := time.Duration(float64(elapsed) * float64(remaining) / float64(processed))
|
||||
if rateBased <= 0 {
|
||||
return timeoutBased
|
||||
}
|
||||
// Blend observed throughput with configured per-request timeout to keep ETA stable
|
||||
// and still bounded by timeout assumptions on slower Redfish branches.
|
||||
return (rateBased + timeoutBased) / 2
|
||||
}
|
||||
|
||||
func estimatePlanBETA(targets int) time.Duration {
|
||||
if targets <= 0 {
|
||||
return 0
|
||||
}
|
||||
attempts := redfishCriticalPlanBAttempts()
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
}
|
||||
timeoutPart := time.Duration(attempts) * redfishCriticalRequestTimeout()
|
||||
backoffPart := time.Duration(attempts-1) * redfishCriticalRetryBackoff()
|
||||
gapPart := redfishCriticalSlowGap()
|
||||
perTarget := timeoutPart + backoffPart + gapPart
|
||||
return time.Duration(targets) * perTarget
|
||||
}
|
||||
|
||||
func formatETA(d time.Duration) string {
|
||||
if d <= 0 {
|
||||
return "<1s"
|
||||
}
|
||||
if d < time.Second {
|
||||
return "<1s"
|
||||
}
|
||||
if d < time.Minute {
|
||||
return fmt.Sprintf("%ds", int(d.Round(time.Second).Seconds()))
|
||||
}
|
||||
totalSec := int(d.Round(time.Second).Seconds())
|
||||
hours := totalSec / 3600
|
||||
minutes := (totalSec % 3600) / 60
|
||||
seconds := totalSec % 60
|
||||
if hours > 0 {
|
||||
return fmt.Sprintf("%dh%02dm%02ds", hours, minutes, seconds)
|
||||
}
|
||||
return fmt.Sprintf("%dm%02ds", minutes, seconds)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user