Files
core/internal/history/jobs.go

286 lines
8.9 KiB
Go

package history
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
"reanimator/internal/idgen"
)
func (s *Service) GetJob(ctx context.Context, jobID string) (JobRecord, error) {
var job JobRecord
var payloadBytes []byte
var resultBytes []byte
var resultNull sql.NullString
var errText sql.NullString
err := s.db.QueryRowContext(ctx, `
SELECT id, job_type, entity_type, entity_id, status, payload, COALESCE(CAST(result AS CHAR), ''), error, created_at, started_at, finished_at
FROM history_recompute_jobs
WHERE id = ?`,
jobID,
).Scan(&job.ID, &job.JobType, &job.EntityType, &job.EntityID, &job.Status, &payloadBytes, &resultNull, &errText, &job.CreatedAt, &job.StartedAt, &job.FinishedAt)
if err == sql.ErrNoRows {
return JobRecord{}, ErrNotFound
}
if err != nil {
return JobRecord{}, err
}
if err := json.Unmarshal(payloadBytes, &job.Payload); err != nil {
return JobRecord{}, err
}
if strings.TrimSpace(resultNull.String) != "" {
resultBytes = []byte(resultNull.String)
job.Result = map[string]any{}
if err := json.Unmarshal(resultBytes, &job.Result); err != nil {
return JobRecord{}, err
}
}
job.Error = nullStringPtr(errText)
return job, nil
}
func (s *Service) QueueDeleteEventRecompute(ctx context.Context, entityType, entityID, eventID string, reason, requestedBy *string) (JobRecord, error) {
if err := validateEntityType(entityType); err != nil {
return JobRecord{}, err
}
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return JobRecord{}, err
}
defer func() { _ = tx.Rollback() }()
if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
if err := s.softDeleteLogicalEvent(ctx, tx, entityType, entityID, eventID, reason, requestedBy); err != nil {
return JobRecord{}, err
}
payload := map[string]any{
"event_id": eventID,
}
if reason != nil {
payload["reason"] = *reason
}
job, err := s.insertJobTx(ctx, tx, "delete_event_recompute", entityType, entityID, payload, requestedBy)
if err != nil {
return JobRecord{}, err
}
if err := tx.Commit(); err != nil {
return JobRecord{}, err
}
return job, nil
}
func (s *Service) QueueRollback(ctx context.Context, entityType, entityID string, targetVersion *int64, targetSnapshotID *string, mode string, reason, requestedBy *string) (JobRecord, error) {
if err := validateEntityType(entityType); err != nil {
return JobRecord{}, err
}
mode = strings.TrimSpace(mode)
if mode != "compensating" && mode != "hard_restore" {
return JobRecord{}, fmt.Errorf("%w: invalid rollback mode", ErrInvalidPatch)
}
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return JobRecord{}, err
}
defer func() { _ = tx.Rollback() }()
if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
if err := s.ensureEntityExistsForUpdate(ctx, tx, entityType, entityID); err != nil {
return JobRecord{}, err
}
payload := map[string]any{
"mode": mode,
}
if targetVersion != nil {
payload["target_version"] = *targetVersion
}
if targetSnapshotID != nil && strings.TrimSpace(*targetSnapshotID) != "" {
payload["target_snapshot_id"] = strings.TrimSpace(*targetSnapshotID)
}
if reason != nil {
payload["reason"] = *reason
}
jobType := "rollback"
if mode == "hard_restore" {
jobType = "hard_restore"
}
job, err := s.insertJobTx(ctx, tx, jobType, entityType, entityID, payload, requestedBy)
if err != nil {
return JobRecord{}, err
}
if mode == "hard_restore" {
if err := s.insertHardRestoreAuditTx(ctx, tx, entityType, entityID, targetVersion, targetSnapshotID, job.ID, requestedBy, payload); err != nil {
return JobRecord{}, err
}
}
if err := tx.Commit(); err != nil {
return JobRecord{}, err
}
return job, nil
}
func (s *Service) ensureNoRunningJob(ctx context.Context, tx *sql.Tx, entityType, entityID string) error {
var count int
if err := tx.QueryRowContext(ctx, `
SELECT COUNT(*) FROM history_recompute_jobs
WHERE entity_type = ? AND entity_id = ? AND status IN ('queued', 'running')`,
entityType, entityID,
).Scan(&count); err != nil {
return err
}
if count > 0 {
return ErrConflict
}
return nil
}
func (s *Service) softDeleteLogicalEvent(ctx context.Context, tx *sql.Tx, entityType, entityID, eventID string, reason, requestedBy *string) error {
now := time.Now().UTC()
var query string
var correlation sql.NullString
if entityType == "asset" {
if err := tx.QueryRowContext(ctx, `SELECT correlation_id FROM asset_change_events WHERE id = ? AND machine_id = ?`, eventID, entityID).Scan(&correlation); err != nil && err != sql.ErrNoRows {
return err
}
}
switch entityType {
case "component":
query = `UPDATE component_change_events SET is_deleted = TRUE, deleted_at = ?, deleted_by = ?, deleted_reason = ? WHERE id = ? AND part_id = ? AND is_deleted = FALSE`
case "asset":
query = `UPDATE asset_change_events SET is_deleted = TRUE, deleted_at = ?, deleted_by = ?, deleted_reason = ? WHERE id = ? AND machine_id = ? AND is_deleted = FALSE`
default:
return fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
res, err := tx.ExecContext(ctx, query, now, requestedBy, reason, eventID, entityID)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
// idempotent if already deleted and belongs to entity
var exists int
var existsQuery string
switch entityType {
case "component":
existsQuery = `SELECT COUNT(*) FROM component_change_events WHERE id = ? AND part_id = ?`
case "asset":
existsQuery = `SELECT COUNT(*) FROM asset_change_events WHERE id = ? AND machine_id = ?`
}
if err := tx.QueryRowContext(ctx, existsQuery, eventID, entityID).Scan(&exists); err != nil {
return err
}
if exists == 0 {
return ErrNotFound
}
}
if _, err := tx.ExecContext(ctx, `
UPDATE timeline_events
SET is_deleted = TRUE
WHERE logical_entity_type = ? AND logical_event_id = ?`,
entityType, eventID,
); err != nil {
return err
}
if entityType == "asset" && correlation.Valid && strings.TrimSpace(correlation.String) != "" {
if _, err := tx.ExecContext(ctx, `
UPDATE component_change_events
SET is_deleted = TRUE, deleted_at = ?, deleted_by = ?, deleted_reason = COALESCE(?, deleted_reason)
WHERE correlation_id = ? AND is_deleted = FALSE`,
now, requestedBy, reason, correlation.String,
); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `
UPDATE timeline_events
SET is_deleted = TRUE
WHERE logical_entity_type = 'component'
AND logical_event_id IN (SELECT id FROM component_change_events WHERE correlation_id = ?)`,
correlation.String,
); err != nil {
return err
}
}
return nil
}
func (s *Service) insertJobTx(ctx context.Context, tx *sql.Tx, jobType, entityType, entityID string, payload map[string]any, requestedBy *string) (JobRecord, error) {
now := time.Now().UTC()
jobID := fmt.Sprintf("job-%d", now.UnixNano())
payloadBytes, err := json.Marshal(payload)
if err != nil {
return JobRecord{}, err
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO history_recompute_jobs
(id, job_type, entity_type, entity_id, status, requested_by, payload, created_at)
VALUES (?, ?, ?, ?, 'queued', ?, ?, ?)`,
jobID, jobType, entityType, entityID, requestedBy, payloadBytes, now,
); err != nil {
return JobRecord{}, err
}
return JobRecord{
ID: jobID,
JobType: jobType,
EntityType: entityType,
EntityID: entityID,
Status: "queued",
Payload: payload,
CreatedAt: now,
}, nil
}
func (s *Service) insertHardRestoreAuditTx(ctx context.Context, tx *sql.Tx, entityType, entityID string, targetVersion *int64, targetSnapshotID *string, jobID string, requestedBy *string, details map[string]any) error {
auditID, err := s.generateID(ctx, tx, idgen.HistoryAdminAudit)
if err != nil {
return err
}
detailsBytes, err := json.Marshal(details)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `
INSERT INTO history_admin_audit
(id, entity_type, entity_id, operation_type, requested_by, target_version, target_snapshot_id, job_id, details)
VALUES (?, ?, ?, 'hard_restore', ?, ?, ?, ?, ?)`,
auditID, entityType, entityID, requestedBy, targetVersion, targetSnapshotID, jobID, detailsBytes,
)
return err
}
func (s *Service) ensureEntityExistsForUpdate(ctx context.Context, tx *sql.Tx, entityType, entityID string) error {
var query string
switch entityType {
case "component":
query = `SELECT 1 FROM parts WHERE id = ? FOR UPDATE`
case "asset":
query = `SELECT 1 FROM machines WHERE id = ? FOR UPDATE`
default:
return fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
var one int
if err := tx.QueryRowContext(ctx, query, entityID).Scan(&one); err != nil {
if err == sql.ErrNoRows {
return ErrNotFound
}
return err
}
return nil
}
func validateEntityType(entityType string) error {
switch strings.TrimSpace(entityType) {
case "component", "asset":
return nil
default:
return fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
}