Add stability hardening and self-heal recovery
This commit is contained in:
@@ -110,6 +110,11 @@ func streamCmdJob(j *jobState, cmd *exec.Cmd) error {
|
||||
|
||||
scanDone := make(chan error, 1)
|
||||
go func() {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
scanDone <- fmt.Errorf("stream scanner panic: %v", rec)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(pr)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||||
for scanner.Scan() {
|
||||
|
||||
@@ -84,12 +84,12 @@ func (m *jobManager) create(id string) *jobState {
|
||||
j := &jobState{}
|
||||
m.jobs[id] = j
|
||||
// Schedule cleanup after 30 minutes
|
||||
go func() {
|
||||
goRecoverOnce("job cleanup", func() {
|
||||
time.Sleep(30 * time.Minute)
|
||||
m.mu.Lock()
|
||||
delete(m.jobs, id)
|
||||
m.mu.Unlock()
|
||||
}()
|
||||
})
|
||||
return j
|
||||
}
|
||||
|
||||
|
||||
@@ -17,10 +17,10 @@ import (
|
||||
// It supports multiple concurrent SAT tasks: a shared event window is open
|
||||
// while any SAT task is running, and flushed when all tasks complete.
|
||||
type kmsgWatcher struct {
|
||||
mu sync.Mutex
|
||||
activeCount int // number of in-flight SAT tasks
|
||||
window *kmsgWindow
|
||||
statusDB *app.ComponentStatusDB
|
||||
mu sync.Mutex
|
||||
activeCount int // number of in-flight SAT tasks
|
||||
window *kmsgWindow
|
||||
statusDB *app.ComponentStatusDB
|
||||
}
|
||||
|
||||
type kmsgWindow struct {
|
||||
@@ -48,36 +48,39 @@ func newKmsgWatcher(statusDB *app.ComponentStatusDB) *kmsgWatcher {
|
||||
|
||||
// start launches the background kmsg reading goroutine.
|
||||
func (w *kmsgWatcher) start() {
|
||||
go w.run()
|
||||
goRecoverLoop("kmsg watcher", 5*time.Second, w.run)
|
||||
}
|
||||
|
||||
func (w *kmsgWatcher) run() {
|
||||
f, err := os.Open("/dev/kmsg")
|
||||
if err != nil {
|
||||
slog.Warn("kmsg watcher unavailable", "err", err)
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Best-effort seek to end so we only capture events from now forward.
|
||||
_, _ = f.Seek(0, io.SeekEnd)
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 64*1024), 64*1024)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
evt, ok := parseKmsgLine(line)
|
||||
if !ok {
|
||||
for {
|
||||
f, err := os.Open("/dev/kmsg")
|
||||
if err != nil {
|
||||
slog.Warn("kmsg watcher unavailable", "err", err)
|
||||
time.Sleep(30 * time.Second)
|
||||
continue
|
||||
}
|
||||
w.mu.Lock()
|
||||
if w.window != nil {
|
||||
w.recordEvent(evt)
|
||||
// Best-effort seek to end so we only capture events from now forward.
|
||||
_, _ = f.Seek(0, io.SeekEnd)
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 64*1024), 64*1024)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
evt, ok := parseKmsgLine(line)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
w.mu.Lock()
|
||||
if w.window != nil {
|
||||
w.recordEvent(evt)
|
||||
}
|
||||
w.mu.Unlock()
|
||||
}
|
||||
w.mu.Unlock()
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
slog.Warn("kmsg watcher stopped", "err", err)
|
||||
if err := scanner.Err(); err != nil {
|
||||
slog.Warn("kmsg watcher stopped", "err", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,7 +137,7 @@ func (w *kmsgWatcher) NotifyTaskFinished(taskID string) {
|
||||
if window == nil || len(window.events) == 0 {
|
||||
return
|
||||
}
|
||||
go w.flushWindow(window)
|
||||
goRecoverOnce("kmsg watcher flush", func() { w.flushWindow(window) })
|
||||
}
|
||||
|
||||
func (w *kmsgWatcher) flushWindow(window *kmsgWindow) {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -311,11 +312,11 @@ func NewHandler(opts HandlerOptions) http.Handler {
|
||||
mux.HandleFunc("GET /", h.handlePage)
|
||||
|
||||
h.mux = mux
|
||||
return mux
|
||||
return recoverMiddleware(mux)
|
||||
}
|
||||
|
||||
func (h *handler) startMetricsCollector() {
|
||||
go func() {
|
||||
goRecoverLoop("metrics collector", 2*time.Second, func() {
|
||||
ticker := time.NewTicker(metricsCollectInterval)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
@@ -326,7 +327,7 @@ func (h *handler) startMetricsCollector() {
|
||||
h.feedRings(sample)
|
||||
h.setLatestMetric(sample)
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
func (h *handler) setLatestMetric(sample platform.LiveMetricSample) {
|
||||
@@ -347,7 +348,49 @@ func (h *handler) latestMetric() (platform.LiveMetricSample, bool) {
|
||||
|
||||
// ListenAndServe starts the HTTP server.
|
||||
func ListenAndServe(addr string, opts HandlerOptions) error {
|
||||
return http.ListenAndServe(addr, NewHandler(opts))
|
||||
srv := &http.Server{
|
||||
Addr: addr,
|
||||
Handler: NewHandler(opts),
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
ReadTimeout: 30 * time.Second,
|
||||
IdleTimeout: 2 * time.Minute,
|
||||
}
|
||||
return srv.ListenAndServe()
|
||||
}
|
||||
|
||||
type trackingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
wroteHeader bool
|
||||
}
|
||||
|
||||
func (w *trackingResponseWriter) WriteHeader(statusCode int) {
|
||||
w.wroteHeader = true
|
||||
w.ResponseWriter.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
func (w *trackingResponseWriter) Write(p []byte) (int, error) {
|
||||
w.wroteHeader = true
|
||||
return w.ResponseWriter.Write(p)
|
||||
}
|
||||
|
||||
func recoverMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
tw := &trackingResponseWriter{ResponseWriter: w}
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
slog.Error("http handler panic",
|
||||
"method", r.Method,
|
||||
"path", r.URL.Path,
|
||||
"panic", fmt.Sprint(rec),
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
if !tw.wroteHeader {
|
||||
http.Error(tw, "internal server error", http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}()
|
||||
next.ServeHTTP(tw, r)
|
||||
})
|
||||
}
|
||||
|
||||
// ── Infrastructure handlers ──────────────────────────────────────────────────
|
||||
|
||||
@@ -34,6 +34,23 @@ func TestChartLegendNumber(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecoverMiddlewareReturns500OnPanic(t *testing.T) {
|
||||
handler := recoverMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
panic("boom")
|
||||
}))
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "/panic", nil)
|
||||
|
||||
handler.ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusInternalServerError {
|
||||
t.Fatalf("status=%d want %d", rec.Code, http.StatusInternalServerError)
|
||||
}
|
||||
if !strings.Contains(rec.Body.String(), "internal server error") {
|
||||
t.Fatalf("body=%q", rec.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestChartDataFromSamplesUsesFullHistory(t *testing.T) {
|
||||
samples := []platform.LiveMetricSample{
|
||||
{
|
||||
|
||||
42
audit/internal/webui/stability.go
Normal file
42
audit/internal/webui/stability.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package webui
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"runtime/debug"
|
||||
"time"
|
||||
)
|
||||
|
||||
func goRecoverLoop(name string, restartDelay time.Duration, fn func()) {
|
||||
go func() {
|
||||
for {
|
||||
if !runRecoverable(name, fn) {
|
||||
return
|
||||
}
|
||||
if restartDelay > 0 {
|
||||
time.Sleep(restartDelay)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func goRecoverOnce(name string, fn func()) {
|
||||
go func() {
|
||||
_ = runRecoverable(name, fn)
|
||||
}()
|
||||
}
|
||||
|
||||
func runRecoverable(name string, fn func()) (panicked bool) {
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
panicked = true
|
||||
slog.Error("recovered panic",
|
||||
"component", name,
|
||||
"panic", fmt.Sprint(rec),
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
return false
|
||||
}
|
||||
@@ -4,10 +4,12 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -377,7 +379,7 @@ func (q *taskQueue) startWorker(opts *HandlerOptions) {
|
||||
if !q.started {
|
||||
q.loadLocked()
|
||||
q.started = true
|
||||
go q.worker()
|
||||
goRecoverLoop("task worker", 2*time.Second, q.worker)
|
||||
}
|
||||
hasPending := q.nextPending() != nil
|
||||
q.mu.Unlock()
|
||||
@@ -392,75 +394,90 @@ func (q *taskQueue) startWorker(opts *HandlerOptions) {
|
||||
func (q *taskQueue) worker() {
|
||||
for {
|
||||
<-q.trigger
|
||||
setCPUGovernor("performance")
|
||||
func() {
|
||||
setCPUGovernor("performance")
|
||||
defer setCPUGovernor("powersave")
|
||||
|
||||
// Drain all pending tasks and start them in parallel.
|
||||
q.mu.Lock()
|
||||
var batch []*Task
|
||||
for {
|
||||
t := q.nextPending()
|
||||
if t == nil {
|
||||
break
|
||||
// Drain all pending tasks and start them in parallel.
|
||||
q.mu.Lock()
|
||||
var batch []*Task
|
||||
for {
|
||||
t := q.nextPending()
|
||||
if t == nil {
|
||||
break
|
||||
}
|
||||
now := time.Now()
|
||||
t.Status = TaskRunning
|
||||
t.StartedAt = &now
|
||||
t.DoneAt = nil
|
||||
t.ErrMsg = ""
|
||||
j := newTaskJobState(t.LogPath)
|
||||
t.job = j
|
||||
batch = append(batch, t)
|
||||
}
|
||||
now := time.Now()
|
||||
t.Status = TaskRunning
|
||||
t.StartedAt = &now
|
||||
t.DoneAt = nil
|
||||
t.ErrMsg = ""
|
||||
j := newTaskJobState(t.LogPath)
|
||||
t.job = j
|
||||
batch = append(batch, t)
|
||||
}
|
||||
if len(batch) > 0 {
|
||||
q.persistLocked()
|
||||
}
|
||||
q.mu.Unlock()
|
||||
if len(batch) > 0 {
|
||||
q.persistLocked()
|
||||
}
|
||||
q.mu.Unlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, t := range batch {
|
||||
t := t
|
||||
j := t.job
|
||||
taskCtx, taskCancel := context.WithCancel(context.Background())
|
||||
j.cancel = taskCancel
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var wg sync.WaitGroup
|
||||
for _, t := range batch {
|
||||
t := t
|
||||
j := t.job
|
||||
taskCtx, taskCancel := context.WithCancel(context.Background())
|
||||
j.cancel = taskCancel
|
||||
wg.Add(1)
|
||||
goRecoverOnce("task "+t.Target, func() {
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if rec := recover(); rec != nil {
|
||||
msg := fmt.Sprintf("task panic: %v", rec)
|
||||
slog.Error("task panic",
|
||||
"task_id", t.ID,
|
||||
"target", t.Target,
|
||||
"panic", fmt.Sprint(rec),
|
||||
"stack", string(debug.Stack()),
|
||||
)
|
||||
j.append("ERROR: " + msg)
|
||||
j.finish(msg)
|
||||
}
|
||||
}()
|
||||
|
||||
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
|
||||
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
|
||||
}
|
||||
|
||||
q.runTask(t, j, taskCtx)
|
||||
|
||||
if q.kmsgWatcher != nil {
|
||||
q.kmsgWatcher.NotifyTaskFinished(t.ID)
|
||||
}
|
||||
|
||||
q.mu.Lock()
|
||||
now2 := time.Now()
|
||||
t.DoneAt = &now2
|
||||
if t.Status == TaskRunning {
|
||||
if j.err != "" {
|
||||
t.Status = TaskFailed
|
||||
t.ErrMsg = j.err
|
||||
} else {
|
||||
t.Status = TaskDone
|
||||
if q.kmsgWatcher != nil && isSATTarget(t.Target) {
|
||||
q.kmsgWatcher.NotifyTaskStarted(t.ID, t.Target)
|
||||
}
|
||||
}
|
||||
|
||||
q.runTask(t, j, taskCtx)
|
||||
|
||||
if q.kmsgWatcher != nil {
|
||||
q.kmsgWatcher.NotifyTaskFinished(t.ID)
|
||||
}
|
||||
|
||||
q.mu.Lock()
|
||||
now2 := time.Now()
|
||||
t.DoneAt = &now2
|
||||
if t.Status == TaskRunning {
|
||||
if j.err != "" {
|
||||
t.Status = TaskFailed
|
||||
t.ErrMsg = j.err
|
||||
} else {
|
||||
t.Status = TaskDone
|
||||
}
|
||||
}
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if len(batch) > 0 {
|
||||
q.mu.Lock()
|
||||
q.prune()
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
}()
|
||||
|
||||
if len(batch) > 0 {
|
||||
q.mu.Lock()
|
||||
q.prune()
|
||||
q.persistLocked()
|
||||
q.mu.Unlock()
|
||||
}
|
||||
|
||||
setCPUGovernor("powersave")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user