Migrate all log.Printf/log.Fatalf to log/slog with structured key-value attributes per bible go-logging contract. - Add withRequestID middleware: generates crypto/rand 8-byte hex ID per request, sets X-Request-ID response header, injects into context - withErrorLogging uses slog with request_id from context - writeError internal log calls migrated to slog.Error/slog.Warn - All handler log calls in api, ingest, history packages use slog - cmd/reanimator-api configures slog.NewTextHandler(os.Stdout) at startup - cmd/reanimator-migrate, cmd/reanimator-reset migrated to slog Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
317 lines
8.2 KiB
Go
317 lines
8.2 KiB
Go
package history
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
workerProcessRetryAttempts = 3
|
|
workerCompleteRetryAttempts = 5
|
|
workerStaleRunningAfter = 30 * time.Minute
|
|
)
|
|
|
|
const defaultWorkerConcurrency = 4
|
|
|
|
func (s *Service) StartWorker(ctx context.Context) {
|
|
if s == nil || s.db == nil {
|
|
return
|
|
}
|
|
if os.Getenv("HISTORY_WORKER_DISABLED") == "1" {
|
|
slog.Info("history worker disabled")
|
|
return
|
|
}
|
|
concurrency := defaultWorkerConcurrency
|
|
if v := os.Getenv("HISTORY_WORKER_CONCURRENCY"); v != "" {
|
|
if n, err := strconv.Atoi(v); err == nil && n > 0 {
|
|
concurrency = n
|
|
}
|
|
}
|
|
slog.Info("history worker starting", "concurrency", concurrency)
|
|
go s.staleJobRequeuer(ctx)
|
|
for i := 0; i < concurrency; i++ {
|
|
go s.workerLoop(ctx)
|
|
}
|
|
}
|
|
|
|
func (s *Service) staleJobRequeuer(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(2 * time.Minute):
|
|
}
|
|
now := time.Now().UTC()
|
|
staleBefore := now.Add(-workerStaleRunningAfter)
|
|
res, err := s.db.ExecContext(ctx, `
|
|
UPDATE history_recompute_jobs
|
|
SET status = 'queued',
|
|
started_at = NULL,
|
|
locked_at = NULL,
|
|
worker_id = NULL,
|
|
error = CONCAT('auto-requeued stale running job at ', ?, '; previous error: ', COALESCE(error, 'none'))
|
|
WHERE status = 'running'
|
|
AND (
|
|
(locked_at IS NOT NULL AND locked_at < ?)
|
|
OR (locked_at IS NULL AND started_at IS NOT NULL AND started_at < ?)
|
|
)`,
|
|
now.Format(time.RFC3339), staleBefore, staleBefore,
|
|
)
|
|
if err != nil {
|
|
slog.Error("history stale requeue failed", "err", err)
|
|
continue
|
|
}
|
|
if n, _ := res.RowsAffected(); n > 0 {
|
|
slog.Info("history stale requeue", "requeued", n)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) workerLoop(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
job, ok, err := s.claimNextJob(ctx)
|
|
if err != nil {
|
|
slog.Error("history worker claim failed", "err", err)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(300 * time.Millisecond):
|
|
}
|
|
continue
|
|
}
|
|
if !ok {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(150 * time.Millisecond):
|
|
}
|
|
continue
|
|
}
|
|
slog.Info("history worker claimed", "job_id", job.ID, "type", job.JobType, "entity", job.EntityType+":"+job.EntityID)
|
|
started := time.Now()
|
|
result, procErr := s.processJobWithRetry(ctx, job)
|
|
duration := time.Since(started)
|
|
if procErr != nil {
|
|
slog.Error("history worker job failed", "job_id", job.ID, "type", job.JobType, "entity", job.EntityType+":"+job.EntityID, "duration_ms", duration.Milliseconds(), "err", procErr)
|
|
if isTransientDBErr(procErr) {
|
|
if rqErr := s.requeueJobTransient(ctx, job.ID, procErr); rqErr != nil {
|
|
slog.Error("history worker transient requeue failed", "job_id", job.ID, "err", rqErr)
|
|
} else {
|
|
slog.Info("history worker transient requeued", "job_id", job.ID)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(750 * time.Millisecond):
|
|
}
|
|
continue
|
|
}
|
|
} else {
|
|
slog.Info("history worker job completed", "job_id", job.ID, "type", job.JobType, "entity", job.EntityType+":"+job.EntityID, "duration_ms", duration.Milliseconds())
|
|
}
|
|
if completeErr := s.completeJobWithRetry(ctx, job.ID, result, procErr); completeErr != nil {
|
|
slog.Error("history worker complete failed", "job_id", job.ID, "err", completeErr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) requeueJobTransient(ctx context.Context, jobID string, procErr error) error {
|
|
now := time.Now().UTC()
|
|
_, err := s.db.ExecContext(ctx, `
|
|
UPDATE history_recompute_jobs
|
|
SET status = 'queued',
|
|
started_at = NULL,
|
|
locked_at = NULL,
|
|
worker_id = NULL,
|
|
finished_at = NULL,
|
|
error = ?
|
|
WHERE id = ?`,
|
|
fmt.Sprintf("transient db error at %s: %v", now.Format(time.RFC3339), procErr), jobID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (s *Service) claimNextJob(ctx context.Context) (JobRecord, bool, error) {
|
|
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
|
|
if err != nil {
|
|
return JobRecord{}, false, err
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
now := time.Now().UTC()
|
|
|
|
var jobID string
|
|
err = tx.QueryRowContext(ctx, `
|
|
SELECT id
|
|
FROM history_recompute_jobs
|
|
WHERE status = 'queued'
|
|
ORDER BY created_at, id
|
|
LIMIT 1
|
|
FOR UPDATE`).Scan(&jobID)
|
|
if err == sql.ErrNoRows {
|
|
return JobRecord{}, false, nil
|
|
}
|
|
if err != nil {
|
|
return JobRecord{}, false, err
|
|
}
|
|
if _, err := tx.ExecContext(ctx, `
|
|
UPDATE history_recompute_jobs
|
|
SET status = 'running', started_at = ?, locked_at = ?, attempt_count = attempt_count + 1, worker_id = ?
|
|
WHERE id = ?`,
|
|
now, now, "local-worker", jobID,
|
|
); err != nil {
|
|
return JobRecord{}, false, err
|
|
}
|
|
if err := tx.Commit(); err != nil {
|
|
return JobRecord{}, false, err
|
|
}
|
|
job, err := s.GetJob(ctx, jobID)
|
|
if err != nil {
|
|
return JobRecord{}, false, err
|
|
}
|
|
return job, true, nil
|
|
}
|
|
|
|
func (s *Service) completeJob(ctx context.Context, jobID string, result map[string]any, procErr error) error {
|
|
var resultJSON any
|
|
if result != nil {
|
|
b, err := json.Marshal(result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resultJSON = b
|
|
}
|
|
now := time.Now().UTC()
|
|
if procErr != nil {
|
|
_, err := s.db.ExecContext(ctx, `
|
|
UPDATE history_recompute_jobs
|
|
SET status = 'failed', finished_at = ?, error = ?, result = ?
|
|
WHERE id = ?`,
|
|
now, procErr.Error(), resultJSON, jobID,
|
|
)
|
|
return err
|
|
}
|
|
_, err := s.db.ExecContext(ctx, `
|
|
UPDATE history_recompute_jobs
|
|
SET status = 'completed', finished_at = ?, error = NULL, result = ?
|
|
WHERE id = ?`,
|
|
now, resultJSON, jobID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (s *Service) processJob(ctx context.Context, job JobRecord) (map[string]any, error) {
|
|
switch job.JobType {
|
|
case "recompute":
|
|
return s.processRecomputeJob(ctx, job)
|
|
case "recompute_batch":
|
|
return s.processRecomputeBatchJob(ctx, job)
|
|
case "delete_event_recompute":
|
|
return s.processDeleteEventRecompute(ctx, job)
|
|
case "rollback":
|
|
return s.processRollbackJob(ctx, job)
|
|
case "hard_restore":
|
|
return s.processHardRestoreJob(ctx, job)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported job type %q", job.JobType)
|
|
}
|
|
}
|
|
|
|
func (s *Service) processRecomputeBatchJob(ctx context.Context, job JobRecord) (map[string]any, error) {
|
|
rawKeys, _ := job.Payload["entity_keys"].([]any)
|
|
entityKeys := make([]string, 0, len(rawKeys))
|
|
for _, v := range rawKeys {
|
|
if k, ok := v.(string); ok && k != "" {
|
|
entityKeys = append(entityKeys, k)
|
|
}
|
|
}
|
|
recomputed := 0
|
|
skipped := 0
|
|
failed := 0
|
|
var firstErr error
|
|
for _, key := range entityKeys {
|
|
entityType, entityID := splitEntityKey(key)
|
|
var err error
|
|
switch entityType {
|
|
case "component":
|
|
_, err = s.recomputeComponentEntity(ctx, entityID, nil, nil)
|
|
case "asset":
|
|
_, err = s.recomputeAssetEntity(ctx, entityID, nil, nil)
|
|
if err == nil {
|
|
if _, linkErr := s.recomputeComponentsLinkedToAsset(ctx, entityID); linkErr != nil {
|
|
err = linkErr
|
|
}
|
|
}
|
|
default:
|
|
skipped++
|
|
continue
|
|
}
|
|
if err != nil {
|
|
if errors.Is(err, ErrNotFound) {
|
|
skipped++
|
|
continue
|
|
}
|
|
failed++
|
|
if firstErr == nil {
|
|
firstErr = fmt.Errorf("recompute %s: %w", key, err)
|
|
}
|
|
continue
|
|
}
|
|
recomputed++
|
|
}
|
|
result := map[string]any{
|
|
"total": len(entityKeys),
|
|
"recomputed": recomputed,
|
|
"skipped": skipped,
|
|
"failed": failed,
|
|
}
|
|
if firstErr != nil {
|
|
return result, firstErr
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (s *Service) processJobWithRetry(ctx context.Context, job JobRecord) (map[string]any, error) {
|
|
var (
|
|
result map[string]any
|
|
err error
|
|
)
|
|
for attempt := 0; attempt < workerProcessRetryAttempts; attempt++ {
|
|
result, err = s.processJob(ctx, job)
|
|
if err == nil {
|
|
return result, nil
|
|
}
|
|
if !isTransientDBErr(err) || attempt == workerProcessRetryAttempts-1 {
|
|
return nil, err
|
|
}
|
|
time.Sleep(time.Duration(120*(attempt+1)) * time.Millisecond)
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
func (s *Service) completeJobWithRetry(ctx context.Context, jobID string, result map[string]any, procErr error) error {
|
|
var err error
|
|
for attempt := 0; attempt < workerCompleteRetryAttempts; attempt++ {
|
|
err = s.completeJob(ctx, jobID, result, procErr)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if !isTransientDBErr(err) || attempt == workerCompleteRetryAttempts-1 {
|
|
return err
|
|
}
|
|
time.Sleep(time.Duration(120*(attempt+1)) * time.Millisecond)
|
|
}
|
|
return err
|
|
}
|