Files
core/internal/history/worker.go
Michael Chus 4c284505a8 Async ingest, deferred history, batch delete, vendor normalization, CI identifiers
- history/worker: fix deadlock by moving stale job requeue out of claimNextJob
  into dedicated staleJobRequeuer goroutine (runs every 2 min)
- history/service,tx_apply,cross_entity: add deferred=true mode — write events+
  snapshots but skip projection updates; queue recompute after commit
- ingest/service: IngestHardwareDeferred uses deferred mode; CSV workers up to 8
  (INGEST_CSV_WORKERS env); serial/prefetch lookups use normalize.SerialKey
- api/ingest: JSON /ingest/hardware now async (202 + job_id); new GET
  /ingest/hardware/jobs/{id} endpoint; CSV already async
- history/admin_cancel: replace per-event softDelete loop with batchSoftDeleteEvents
  using IN-clause chunks of 500 to prevent request timeout on large deletes
- normalize: new internal/normalize package with VendorKey, VendorDisplay,
  VendorDisplayPtr, SerialKey, FirmwareKey
- ingest/parser_hardware: vendor fields use normalize.VendorDisplayPtr
- migrations/0021_ci_identifiers: change identifier columns to utf8mb4_unicode_ci
  (case-insensitive) in parts, machines, observations, machine_firmware_states
- bible submodule: update to add identifier-normalization contract

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 22:23:17 +03:00

317 lines
8.2 KiB
Go

package history
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"strconv"
"time"
)
const (
workerProcessRetryAttempts = 3
workerCompleteRetryAttempts = 5
workerStaleRunningAfter = 30 * time.Minute
)
const defaultWorkerConcurrency = 4
func (s *Service) StartWorker(ctx context.Context) {
if s == nil || s.db == nil {
return
}
if os.Getenv("HISTORY_WORKER_DISABLED") == "1" {
log.Printf("history worker disabled via HISTORY_WORKER_DISABLED=1")
return
}
concurrency := defaultWorkerConcurrency
if v := os.Getenv("HISTORY_WORKER_CONCURRENCY"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
concurrency = n
}
}
log.Printf("history worker starting concurrency=%d", concurrency)
go s.staleJobRequeuer(ctx)
for i := 0; i < concurrency; i++ {
go s.workerLoop(ctx)
}
}
func (s *Service) staleJobRequeuer(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Minute):
}
now := time.Now().UTC()
staleBefore := now.Add(-workerStaleRunningAfter)
res, err := s.db.ExecContext(ctx, `
UPDATE history_recompute_jobs
SET status = 'queued',
started_at = NULL,
locked_at = NULL,
worker_id = NULL,
error = CONCAT('auto-requeued stale running job at ', ?, '; previous error: ', COALESCE(error, 'none'))
WHERE status = 'running'
AND (
(locked_at IS NOT NULL AND locked_at < ?)
OR (locked_at IS NULL AND started_at IS NOT NULL AND started_at < ?)
)`,
now.Format(time.RFC3339), staleBefore, staleBefore,
)
if err != nil {
log.Printf("history stale requeue failed err=%v", err)
continue
}
if n, _ := res.RowsAffected(); n > 0 {
log.Printf("history stale requeue requeued=%d", n)
}
}
}
func (s *Service) workerLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
job, ok, err := s.claimNextJob(ctx)
if err != nil {
log.Printf("history worker claim failed err=%v", err)
select {
case <-ctx.Done():
return
case <-time.After(300 * time.Millisecond):
}
continue
}
if !ok {
select {
case <-ctx.Done():
return
case <-time.After(150 * time.Millisecond):
}
continue
}
log.Printf("history worker claimed job_id=%s type=%s entity=%s:%s", job.ID, job.JobType, job.EntityType, job.EntityID)
started := time.Now()
result, procErr := s.processJobWithRetry(ctx, job)
duration := time.Since(started)
if procErr != nil {
log.Printf("history worker job failed job_id=%s type=%s entity=%s:%s duration_ms=%d err=%v", job.ID, job.JobType, job.EntityType, job.EntityID, duration.Milliseconds(), procErr)
if isTransientDBErr(procErr) {
if rqErr := s.requeueJobTransient(ctx, job.ID, procErr); rqErr != nil {
log.Printf("history worker transient requeue failed job_id=%s err=%v", job.ID, rqErr)
} else {
log.Printf("history worker transient requeued job_id=%s", job.ID)
}
select {
case <-ctx.Done():
return
case <-time.After(750 * time.Millisecond):
}
continue
}
} else {
log.Printf("history worker job completed job_id=%s type=%s entity=%s:%s duration_ms=%d", job.ID, job.JobType, job.EntityType, job.EntityID, duration.Milliseconds())
}
if completeErr := s.completeJobWithRetry(ctx, job.ID, result, procErr); completeErr != nil {
log.Printf("history worker complete failed job_id=%s err=%v", job.ID, completeErr)
}
}
}
func (s *Service) requeueJobTransient(ctx context.Context, jobID string, procErr error) error {
now := time.Now().UTC()
_, err := s.db.ExecContext(ctx, `
UPDATE history_recompute_jobs
SET status = 'queued',
started_at = NULL,
locked_at = NULL,
worker_id = NULL,
finished_at = NULL,
error = ?
WHERE id = ?`,
fmt.Sprintf("transient db error at %s: %v", now.Format(time.RFC3339), procErr), jobID,
)
return err
}
func (s *Service) claimNextJob(ctx context.Context) (JobRecord, bool, error) {
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return JobRecord{}, false, err
}
defer func() { _ = tx.Rollback() }()
now := time.Now().UTC()
var jobID string
err = tx.QueryRowContext(ctx, `
SELECT id
FROM history_recompute_jobs
WHERE status = 'queued'
ORDER BY created_at, id
LIMIT 1
FOR UPDATE`).Scan(&jobID)
if err == sql.ErrNoRows {
return JobRecord{}, false, nil
}
if err != nil {
return JobRecord{}, false, err
}
if _, err := tx.ExecContext(ctx, `
UPDATE history_recompute_jobs
SET status = 'running', started_at = ?, locked_at = ?, attempt_count = attempt_count + 1, worker_id = ?
WHERE id = ?`,
now, now, "local-worker", jobID,
); err != nil {
return JobRecord{}, false, err
}
if err := tx.Commit(); err != nil {
return JobRecord{}, false, err
}
job, err := s.GetJob(ctx, jobID)
if err != nil {
return JobRecord{}, false, err
}
return job, true, nil
}
func (s *Service) completeJob(ctx context.Context, jobID string, result map[string]any, procErr error) error {
var resultJSON any
if result != nil {
b, err := json.Marshal(result)
if err != nil {
return err
}
resultJSON = b
}
now := time.Now().UTC()
if procErr != nil {
_, err := s.db.ExecContext(ctx, `
UPDATE history_recompute_jobs
SET status = 'failed', finished_at = ?, error = ?, result = ?
WHERE id = ?`,
now, procErr.Error(), resultJSON, jobID,
)
return err
}
_, err := s.db.ExecContext(ctx, `
UPDATE history_recompute_jobs
SET status = 'completed', finished_at = ?, error = NULL, result = ?
WHERE id = ?`,
now, resultJSON, jobID,
)
return err
}
func (s *Service) processJob(ctx context.Context, job JobRecord) (map[string]any, error) {
switch job.JobType {
case "recompute":
return s.processRecomputeJob(ctx, job)
case "recompute_batch":
return s.processRecomputeBatchJob(ctx, job)
case "delete_event_recompute":
return s.processDeleteEventRecompute(ctx, job)
case "rollback":
return s.processRollbackJob(ctx, job)
case "hard_restore":
return s.processHardRestoreJob(ctx, job)
default:
return nil, fmt.Errorf("unsupported job type %q", job.JobType)
}
}
func (s *Service) processRecomputeBatchJob(ctx context.Context, job JobRecord) (map[string]any, error) {
rawKeys, _ := job.Payload["entity_keys"].([]any)
entityKeys := make([]string, 0, len(rawKeys))
for _, v := range rawKeys {
if k, ok := v.(string); ok && k != "" {
entityKeys = append(entityKeys, k)
}
}
recomputed := 0
skipped := 0
failed := 0
var firstErr error
for _, key := range entityKeys {
entityType, entityID := splitEntityKey(key)
var err error
switch entityType {
case "component":
_, err = s.recomputeComponentEntity(ctx, entityID, nil, nil)
case "asset":
_, err = s.recomputeAssetEntity(ctx, entityID, nil, nil)
if err == nil {
if _, linkErr := s.recomputeComponentsLinkedToAsset(ctx, entityID); linkErr != nil {
err = linkErr
}
}
default:
skipped++
continue
}
if err != nil {
if errors.Is(err, ErrNotFound) {
skipped++
continue
}
failed++
if firstErr == nil {
firstErr = fmt.Errorf("recompute %s: %w", key, err)
}
continue
}
recomputed++
}
result := map[string]any{
"total": len(entityKeys),
"recomputed": recomputed,
"skipped": skipped,
"failed": failed,
}
if firstErr != nil {
return result, firstErr
}
return result, nil
}
func (s *Service) processJobWithRetry(ctx context.Context, job JobRecord) (map[string]any, error) {
var (
result map[string]any
err error
)
for attempt := 0; attempt < workerProcessRetryAttempts; attempt++ {
result, err = s.processJob(ctx, job)
if err == nil {
return result, nil
}
if !isTransientDBErr(err) || attempt == workerProcessRetryAttempts-1 {
return nil, err
}
time.Sleep(time.Duration(120*(attempt+1)) * time.Millisecond)
}
return result, err
}
func (s *Service) completeJobWithRetry(ctx context.Context, jobID string, result map[string]any, procErr error) error {
var err error
for attempt := 0; attempt < workerCompleteRetryAttempts; attempt++ {
err = s.completeJob(ctx, jobID, result, procErr)
if err == nil {
return nil
}
if !isTransientDBErr(err) || attempt == workerCompleteRetryAttempts-1 {
return err
}
time.Sleep(time.Duration(120*(attempt+1)) * time.Millisecond)
}
return err
}