Files
core/internal/history/jobs.go

530 lines
16 KiB
Go

package history
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
"reanimator/internal/idgen"
)
func (s *Service) GetJob(ctx context.Context, jobID string) (JobRecord, error) {
var job JobRecord
var payloadBytes []byte
var resultBytes []byte
var resultNull sql.NullString
var errText sql.NullString
err := s.db.QueryRowContext(ctx, `
SELECT id, job_type, entity_type, entity_id, status, payload, COALESCE(CAST(result AS CHAR), ''), error, created_at, started_at, finished_at
FROM history_recompute_jobs
WHERE id = ?`,
jobID,
).Scan(&job.ID, &job.JobType, &job.EntityType, &job.EntityID, &job.Status, &payloadBytes, &resultNull, &errText, &job.CreatedAt, &job.StartedAt, &job.FinishedAt)
if err == sql.ErrNoRows {
return JobRecord{}, ErrNotFound
}
if err != nil {
return JobRecord{}, err
}
if err := json.Unmarshal(payloadBytes, &job.Payload); err != nil {
return JobRecord{}, err
}
if strings.TrimSpace(resultNull.String) != "" {
resultBytes = []byte(resultNull.String)
job.Result = map[string]any{}
if err := json.Unmarshal(resultBytes, &job.Result); err != nil {
return JobRecord{}, err
}
}
job.Error = nullStringPtr(errText)
return job, nil
}
type JobListCursor struct {
Time time.Time
ID string
}
func EncodeJobListCursor(cursor JobListCursor) string {
return fmt.Sprintf("%d:%s", cursor.Time.UTC().UnixNano(), cursor.ID)
}
func DecodeJobListCursor(value string) (JobListCursor, error) {
parts := strings.Split(value, ":")
if len(parts) != 2 || strings.TrimSpace(parts[1]) == "" {
return JobListCursor{}, fmt.Errorf("invalid cursor")
}
nanos, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return JobListCursor{}, fmt.Errorf("invalid cursor")
}
return JobListCursor{Time: time.Unix(0, nanos).UTC(), ID: parts[1]}, nil
}
type JobListResult struct {
Items []JobRecord `json:"items"`
NextCursor *string `json:"next_cursor,omitempty"`
}
func (s *Service) ListJobs(ctx context.Context, statuses []string, limit int, cursor *JobListCursor) (JobListResult, error) {
if s == nil || s.db == nil {
return JobListResult{}, fmt.Errorf("history service unavailable")
}
if limit <= 0 || limit > 200 {
limit = 50
}
query := `
SELECT id, job_type, entity_type, entity_id, status, payload, COALESCE(CAST(result AS CHAR), ''), error, created_at, started_at, finished_at
FROM history_recompute_jobs`
args := []any{}
if len(statuses) > 0 {
clause, vals := inClauseStrings(statuses)
query += ` WHERE status IN (` + clause + `)`
args = append(args, vals...)
}
if cursor != nil {
if len(args) == 0 {
query += ` WHERE`
} else {
query += ` AND`
}
query += ` (created_at < ? OR (created_at = ? AND id < ?))`
args = append(args, cursor.Time.UTC(), cursor.Time.UTC(), cursor.ID)
}
query += ` ORDER BY created_at DESC, id DESC LIMIT ?`
args = append(args, limit+1)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return JobListResult{}, err
}
defer rows.Close()
var out []JobRecord
for rows.Next() {
var (
job JobRecord
payloadBytes []byte
resultNull sql.NullString
errText sql.NullString
)
if err := rows.Scan(&job.ID, &job.JobType, &job.EntityType, &job.EntityID, &job.Status, &payloadBytes, &resultNull, &errText, &job.CreatedAt, &job.StartedAt, &job.FinishedAt); err != nil {
return JobListResult{}, err
}
job.Payload = map[string]any{}
if len(payloadBytes) > 0 {
if err := json.Unmarshal(payloadBytes, &job.Payload); err != nil {
return JobListResult{}, err
}
}
job.Error = nullStringPtr(errText)
if strings.TrimSpace(resultNull.String) != "" {
job.Result = map[string]any{}
if err := json.Unmarshal([]byte(resultNull.String), &job.Result); err != nil {
return JobListResult{}, err
}
}
out = append(out, job)
}
if err := rows.Err(); err != nil {
return JobListResult{}, err
}
var nextCursor *string
if len(out) > limit {
last := out[limit-1]
c := EncodeJobListCursor(JobListCursor{Time: last.CreatedAt.UTC(), ID: last.ID})
nextCursor = &c
out = out[:limit]
}
return JobListResult{Items: out, NextCursor: nextCursor}, nil
}
func (s *Service) QueueDeleteEventRecompute(ctx context.Context, entityType, entityID, eventID string, reason, requestedBy *string) (JobRecord, error) {
if err := validateEntityType(entityType); err != nil {
return JobRecord{}, err
}
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return JobRecord{}, err
}
defer func() { _ = tx.Rollback() }()
if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
if err := s.softDeleteLogicalEvent(ctx, tx, entityType, entityID, eventID, reason, requestedBy); err != nil {
return JobRecord{}, err
}
payload := map[string]any{
"event_id": eventID,
}
if reason != nil {
payload["reason"] = *reason
}
job, err := s.insertJobTx(ctx, tx, "delete_event_recompute", entityType, entityID, payload, requestedBy)
if err != nil {
return JobRecord{}, err
}
if err := tx.Commit(); err != nil {
return JobRecord{}, err
}
return job, nil
}
func (s *Service) QueueRollback(ctx context.Context, entityType, entityID string, targetVersion *int64, targetSnapshotID *string, mode string, reason, requestedBy *string) (JobRecord, error) {
if err := validateEntityType(entityType); err != nil {
return JobRecord{}, err
}
mode = strings.TrimSpace(mode)
if mode != "compensating" && mode != "hard_restore" {
return JobRecord{}, fmt.Errorf("%w: invalid rollback mode", ErrInvalidPatch)
}
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return JobRecord{}, err
}
defer func() { _ = tx.Rollback() }()
if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
if err := s.ensureEntityExistsForUpdate(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
payload := map[string]any{
"mode": mode,
}
if targetVersion != nil {
payload["target_version"] = *targetVersion
}
if targetSnapshotID != nil && strings.TrimSpace(*targetSnapshotID) != "" {
payload["target_snapshot_id"] = strings.TrimSpace(*targetSnapshotID)
}
if reason != nil {
payload["reason"] = *reason
}
jobType := "rollback"
if mode == "hard_restore" {
jobType = "hard_restore"
}
job, err := s.insertJobTx(ctx, tx, jobType, entityType, entityID, payload, requestedBy)
if err != nil {
return JobRecord{}, err
}
if mode == "hard_restore" {
if err := s.insertHardRestoreAuditTx(ctx, tx, entityType, entityID, targetVersion, targetSnapshotID, job.ID, requestedBy, payload); err != nil {
return JobRecord{}, err
}
}
if err := tx.Commit(); err != nil {
return JobRecord{}, err
}
return job, nil
}
func (s *Service) QueueRecompute(ctx context.Context, entityType, entityID string, requestedBy *string, reason *string) (JobRecord, error) {
if err := validateEntityType(entityType); err != nil {
return JobRecord{}, err
}
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return JobRecord{}, err
}
defer func() { _ = tx.Rollback() }()
if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
if err := s.ensureEntityExistsForUpdate(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
payload := map[string]any{"manual": true}
if reason != nil && strings.TrimSpace(*reason) != "" {
payload["reason"] = strings.TrimSpace(*reason)
}
job, err := s.insertJobTx(ctx, tx, "recompute", entityType, entityID, payload, requestedBy)
if err != nil {
return JobRecord{}, err
}
if err := tx.Commit(); err != nil {
return JobRecord{}, err
}
return job, nil
}
type QueueRecomputeMaskResult struct {
EntityType string `json:"entity_type"`
Mask string `json:"mask"`
MatchedEntities int `json:"matched_entities"`
QueuedJobs []JobRecord `json:"queued_jobs"`
QueuedJobsCount int `json:"queued_jobs_count"`
SkippedConflicts []string `json:"skipped_conflicts,omitempty"`
}
func (s *Service) QueueRecomputeByMask(ctx context.Context, entityType, mask string, requestedBy *string, reason *string) (QueueRecomputeMaskResult, error) {
if err := validateEntityType(entityType); err != nil {
return QueueRecomputeMaskResult{}, err
}
mask = strings.TrimSpace(mask)
if mask == "" {
return QueueRecomputeMaskResult{}, fmt.Errorf("%w: entity id/mask required", ErrInvalidPatch)
}
ids, err := s.findEntityIDsByMask(ctx, entityType, mask, 500)
if err != nil {
return QueueRecomputeMaskResult{}, err
}
if len(ids) == 0 {
return QueueRecomputeMaskResult{}, ErrNotFound
}
result := QueueRecomputeMaskResult{
EntityType: entityType,
Mask: mask,
MatchedEntities: len(ids),
QueuedJobs: make([]JobRecord, 0, len(ids)),
}
for _, id := range ids {
job, err := s.QueueRecompute(ctx, entityType, id, requestedBy, reason)
if err != nil {
if err == ErrConflict {
result.SkippedConflicts = append(result.SkippedConflicts, id)
continue
}
return QueueRecomputeMaskResult{}, err
}
result.QueuedJobs = append(result.QueuedJobs, job)
}
result.QueuedJobsCount = len(result.QueuedJobs)
return result, nil
}
func (s *Service) findEntityIDsByMask(ctx context.Context, entityType, mask string, limit int) ([]string, error) {
if limit <= 0 || limit > 2000 {
limit = 500
}
like, err := globMaskToSQLLike(mask)
if err != nil {
return nil, err
}
var query string
switch entityType {
case "component":
query = `SELECT id FROM parts WHERE id LIKE ? ESCAPE '\\' ORDER BY id LIMIT ?`
case "asset":
query = `SELECT id FROM machines WHERE id LIKE ? ESCAPE '\\' ORDER BY id LIMIT ?`
default:
return nil, fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
rows, err := s.db.QueryContext(ctx, query, like, limit)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]string, 0)
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
out = append(out, id)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
func globMaskToSQLLike(mask string) (string, error) {
mask = strings.TrimSpace(mask)
if mask == "" {
return "", fmt.Errorf("%w: empty mask", ErrInvalidPatch)
}
var b strings.Builder
for _, r := range mask {
switch r {
case '*':
b.WriteByte('%')
case '?':
b.WriteByte('_')
case '%', '_', '\\':
b.WriteByte('\\')
b.WriteRune(r)
default:
b.WriteRune(r)
}
}
return b.String(), nil
}
func (s *Service) ensureNoRunningJob(ctx context.Context, tx *sql.Tx, entityType, entityID string) error {
var count int
if err := tx.QueryRowContext(ctx, `
SELECT COUNT(*) FROM history_recompute_jobs
WHERE entity_type = ? AND entity_id = ? AND status IN ('queued', 'running')`,
entityType, entityID,
).Scan(&count); err != nil {
return err
}
if count > 0 {
return ErrConflict
}
return nil
}
func (s *Service) softDeleteLogicalEvent(ctx context.Context, tx *sql.Tx, entityType, entityID, eventID string, reason, requestedBy *string) error {
now := time.Now().UTC()
var query string
var correlation sql.NullString
if entityType == "asset" {
if err := tx.QueryRowContext(ctx, `SELECT correlation_id FROM asset_change_events WHERE id = ? AND machine_id = ?`, eventID, entityID).Scan(&correlation); err != nil && err != sql.ErrNoRows {
return err
}
}
switch entityType {
case "component":
query = `UPDATE component_change_events SET is_deleted = TRUE, deleted_at = ?, deleted_by = ?, deleted_reason = ? WHERE id = ? AND part_id = ? AND is_deleted = FALSE`
case "asset":
query = `UPDATE asset_change_events SET is_deleted = TRUE, deleted_at = ?, deleted_by = ?, deleted_reason = ? WHERE id = ? AND machine_id = ? AND is_deleted = FALSE`
default:
return fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
res, err := tx.ExecContext(ctx, query, now, requestedBy, reason, eventID, entityID)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
// idempotent if already deleted and belongs to entity
var exists int
var existsQuery string
switch entityType {
case "component":
existsQuery = `SELECT COUNT(*) FROM component_change_events WHERE id = ? AND part_id = ?`
case "asset":
existsQuery = `SELECT COUNT(*) FROM asset_change_events WHERE id = ? AND machine_id = ?`
}
if err := tx.QueryRowContext(ctx, existsQuery, eventID, entityID).Scan(&exists); err != nil {
return err
}
if exists == 0 {
return ErrNotFound
}
}
if _, err := tx.ExecContext(ctx, `
UPDATE timeline_events
SET is_deleted = TRUE
WHERE logical_entity_type = ? AND logical_event_id = ?`,
entityType, eventID,
); err != nil {
return err
}
if entityType == "asset" && correlation.Valid && strings.TrimSpace(correlation.String) != "" {
if _, err := tx.ExecContext(ctx, `
UPDATE component_change_events
SET is_deleted = TRUE, deleted_at = ?, deleted_by = ?, deleted_reason = COALESCE(?, deleted_reason)
WHERE correlation_id = ? AND is_deleted = FALSE`,
now, requestedBy, reason, correlation.String,
); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `
UPDATE timeline_events
SET is_deleted = TRUE
WHERE logical_entity_type = 'component'
AND logical_event_id IN (SELECT id FROM component_change_events WHERE correlation_id = ?)`,
correlation.String,
); err != nil {
return err
}
}
return nil
}
func (s *Service) insertJobTx(ctx context.Context, tx *sql.Tx, jobType, entityType, entityID string, payload map[string]any, requestedBy *string) (JobRecord, error) {
now := time.Now().UTC()
payloadBytes, err := json.Marshal(payload)
if err != nil {
return JobRecord{}, err
}
var jobID string
inserted := false
for attempt := 0; attempt < 5; attempt++ {
jobID = fmt.Sprintf("job-%d", time.Now().UTC().UnixNano()+int64(attempt))
if _, err := tx.ExecContext(ctx, `
INSERT INTO history_recompute_jobs
(id, job_type, entity_type, entity_id, status, requested_by, payload, created_at)
VALUES (?, ?, ?, ?, 'queued', ?, ?, ?)`,
jobID, jobType, entityType, entityID, requestedBy, payloadBytes, now,
); err != nil {
msg := strings.ToLower(err.Error())
if strings.Contains(msg, "duplicate") && strings.Contains(msg, "history_recompute_jobs") {
continue
}
return JobRecord{}, err
}
inserted = true
break
}
if !inserted {
return JobRecord{}, fmt.Errorf("failed to allocate unique job id")
}
return JobRecord{
ID: jobID,
JobType: jobType,
EntityType: entityType,
EntityID: entityID,
Status: "queued",
Payload: payload,
CreatedAt: now,
}, nil
}
func (s *Service) insertHardRestoreAuditTx(ctx context.Context, tx *sql.Tx, entityType, entityID string, targetVersion *int64, targetSnapshotID *string, jobID string, requestedBy *string, details map[string]any) error {
auditID, err := s.generateID(ctx, tx, idgen.HistoryAdminAudit)
if err != nil {
return err
}
detailsBytes, err := json.Marshal(details)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `
INSERT INTO history_admin_audit
(id, entity_type, entity_id, operation_type, requested_by, target_version, target_snapshot_id, job_id, details)
VALUES (?, ?, ?, 'hard_restore', ?, ?, ?, ?, ?)`,
auditID, entityType, entityID, requestedBy, targetVersion, targetSnapshotID, jobID, detailsBytes,
)
return err
}
func (s *Service) ensureEntityExistsForUpdate(ctx context.Context, tx *sql.Tx, entityType, entityID string) error {
var query string
switch entityType {
case "component":
query = `SELECT 1 FROM parts WHERE id = ? FOR UPDATE`
case "asset":
query = `SELECT 1 FROM machines WHERE id = ? FOR UPDATE`
default:
return fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
var one int
if err := tx.QueryRowContext(ctx, query, entityID).Scan(&one); err != nil {
if err == sql.ErrNoRows {
return ErrNotFound
}
return err
}
return nil
}
func validateEntityType(entityType string) error {
switch strings.TrimSpace(entityType) {
case "component", "asset":
return nil
default:
return fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
}