package history import ( "context" "database/sql" "encoding/json" "fmt" "log" "time" ) func (s *Service) StartWorker(ctx context.Context) { if s == nil || s.db == nil { return } go s.workerLoop(ctx) } func (s *Service) workerLoop(ctx context.Context) { for { select { case <-ctx.Done(): return default: } job, ok, err := s.claimNextJob(ctx) if err != nil { log.Printf("history worker claim failed err=%v", err) select { case <-ctx.Done(): return case <-time.After(300 * time.Millisecond): } continue } if !ok { select { case <-ctx.Done(): return case <-time.After(150 * time.Millisecond): } continue } log.Printf("history worker claimed job_id=%s type=%s entity=%s:%s", job.ID, job.JobType, job.EntityType, job.EntityID) started := time.Now() result, procErr := s.processJob(ctx, job) duration := time.Since(started) if procErr != nil { log.Printf("history worker job failed job_id=%s type=%s entity=%s:%s duration_ms=%d err=%v", job.ID, job.JobType, job.EntityType, job.EntityID, duration.Milliseconds(), procErr) } else { log.Printf("history worker job completed job_id=%s type=%s entity=%s:%s duration_ms=%d", job.ID, job.JobType, job.EntityType, job.EntityID, duration.Milliseconds()) } if completeErr := s.completeJob(ctx, job.ID, result, procErr); completeErr != nil { log.Printf("history worker complete failed job_id=%s err=%v", job.ID, completeErr) } } } func (s *Service) claimNextJob(ctx context.Context) (JobRecord, bool, error) { tx, err := s.db.BeginTx(ctx, &sql.TxOptions{}) if err != nil { return JobRecord{}, false, err } defer func() { _ = tx.Rollback() }() var jobID string err = tx.QueryRowContext(ctx, ` SELECT id FROM history_recompute_jobs WHERE status = 'queued' ORDER BY created_at, id LIMIT 1 FOR UPDATE`).Scan(&jobID) if err == sql.ErrNoRows { return JobRecord{}, false, nil } if err != nil { return JobRecord{}, false, err } now := time.Now().UTC() if _, err := tx.ExecContext(ctx, ` UPDATE history_recompute_jobs SET status = 'running', started_at = ?, locked_at = ?, attempt_count = attempt_count + 1, worker_id = ? WHERE id = ?`, now, now, "local-worker", jobID, ); err != nil { return JobRecord{}, false, err } if err := tx.Commit(); err != nil { return JobRecord{}, false, err } job, err := s.GetJob(ctx, jobID) if err != nil { return JobRecord{}, false, err } return job, true, nil } func (s *Service) completeJob(ctx context.Context, jobID string, result map[string]any, procErr error) error { var resultJSON any if result != nil { b, err := json.Marshal(result) if err != nil { return err } resultJSON = b } now := time.Now().UTC() if procErr != nil { _, err := s.db.ExecContext(ctx, ` UPDATE history_recompute_jobs SET status = 'failed', finished_at = ?, error = ?, result = ? WHERE id = ?`, now, procErr.Error(), resultJSON, jobID, ) return err } _, err := s.db.ExecContext(ctx, ` UPDATE history_recompute_jobs SET status = 'completed', finished_at = ?, error = NULL, result = ? WHERE id = ?`, now, resultJSON, jobID, ) return err } func (s *Service) processJob(ctx context.Context, job JobRecord) (map[string]any, error) { switch job.JobType { case "recompute": return s.processRecomputeJob(ctx, job) case "delete_event_recompute": return s.processDeleteEventRecompute(ctx, job) case "rollback": return s.processRollbackJob(ctx, job) case "hard_restore": return s.processHardRestoreJob(ctx, job) default: return nil, fmt.Errorf("unsupported job type %q", job.JobType) } }