package history import ( "context" "database/sql" "encoding/json" "fmt" "strings" "time" ) type AdminCancelSourceFilter struct { SourceTypes []string `json:"source_types"` DateFrom *time.Time `json:"date_from,omitempty"` DateTo *time.Time `json:"date_to,omitempty"` SourceRefEquals *string `json:"source_ref_equals,omitempty"` SourceRefContains *string `json:"source_ref_contains,omitempty"` AllowExistingJobs bool `json:"allow_existing_jobs,omitempty"` Reason *string `json:"reason,omitempty"` MaxEvents int `json:"max_events,omitempty"` RequestedBy *string `json:"requested_by,omitempty"` } type AdminCancelPreview struct { MatchedEvents int `json:"matched_events"` ComponentEvents int `json:"component_events"` AssetEvents int `json:"asset_events"` AffectedEntities int `json:"affected_entities"` AffectedComponents int `json:"affected_components"` AffectedAssets int `json:"affected_assets"` BySource map[string]int `json:"by_source"` EntityCountsBySource map[string]map[string]int `json:"entity_counts_by_source"` Sample []AdminCancelPreviewEvent `json:"sample"` Truncated bool `json:"truncated"` MaxEvents int `json:"max_events"` } type AdminCancelPreviewEvent struct { EntityType string `json:"entity_type"` EntityID string `json:"entity_id"` EventID string `json:"event_id"` SourceType string `json:"source_type"` SourceRef *string `json:"source_ref,omitempty"` ChangeType string `json:"change_type"` EffectiveAt time.Time `json:"effective_at"` CorrelationID *string `json:"correlation_id,omitempty"` } type AdminCancelExecuteResult struct { Preview AdminCancelPreview `json:"preview"` SoftDeletedEvents int `json:"soft_deleted_events"` QueuedJobs []JobRecord `json:"queued_jobs"` QueuedJobsCount int `json:"queued_jobs_count"` SkippedJobConflicts int `json:"skipped_job_conflicts,omitempty"` } type adminCancelMatch struct { EntityType string EntityID string EventID string SourceType string SourceRef *string ChangeType string EffectiveAt time.Time CorrelationID *string } const adminCancelMaxEventsLimit = 100000 func (s *Service) PreviewCancelEventsBySource(ctx context.Context, filter AdminCancelSourceFilter) (AdminCancelPreview, error) { if s == nil || s.db == nil { return AdminCancelPreview{}, fmt.Errorf("history service unavailable") } filter = normalizeAdminCancelFilter(filter) matches, err := s.findAdminCancelMatches(ctx, nil, filter) if err != nil { return AdminCancelPreview{}, err } return s.buildAdminCancelPreview(ctx, nil, matches, filter.MaxEvents), nil } func (s *Service) QueueCancelEventsBySource(ctx context.Context, filter AdminCancelSourceFilter) (AdminCancelExecuteResult, error) { if s == nil || s.db == nil { return AdminCancelExecuteResult{}, fmt.Errorf("history service unavailable") } filter = normalizeAdminCancelFilter(filter) tx, err := s.db.BeginTx(ctx, &sql.TxOptions{}) if err != nil { return AdminCancelExecuteResult{}, err } defer func() { _ = tx.Rollback() }() matches, err := s.findAdminCancelMatches(ctx, tx, filter) if err != nil { return AdminCancelExecuteResult{}, err } preview := s.buildAdminCancelPreview(ctx, tx, matches, filter.MaxEvents) if len(matches) > filter.MaxEvents { return AdminCancelExecuteResult{}, fmt.Errorf("%w: preview truncated, narrow filters or increase max_events", ErrConflict) } if len(matches) == 0 { if err := tx.Commit(); err != nil { return AdminCancelExecuteResult{}, err } return AdminCancelExecuteResult{Preview: preview}, nil } affectedEntities, err := s.collectAffectedEntitiesForAdminCancel(ctx, tx, matches) if err != nil { return AdminCancelExecuteResult{}, err } for key := range affectedEntities { entityType, entityID := splitEntityKey(key) if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil { if filter.AllowExistingJobs && err == ErrConflict { continue } return AdminCancelExecuteResult{}, err } } softDeleted := 0 for _, m := range matches { if err := s.softDeleteLogicalEvent(ctx, tx, m.EntityType, m.EntityID, m.EventID, filter.Reason, filter.RequestedBy); err != nil { return AdminCancelExecuteResult{}, err } softDeleted++ } jobs := make([]JobRecord, 0, len(affectedEntities)) skippedJobConflicts := 0 for key := range affectedEntities { entityType, entityID := splitEntityKey(key) if filter.AllowExistingJobs { if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil { if err == ErrConflict { skippedJobConflicts++ continue } return AdminCancelExecuteResult{}, err } } payload := map[string]any{ "batch": true, "source_types": filter.SourceTypes, } if filter.DateFrom != nil { payload["date_from"] = filter.DateFrom.UTC().Format(time.RFC3339Nano) } if filter.DateTo != nil { payload["date_to"] = filter.DateTo.UTC().Format(time.RFC3339Nano) } if filter.SourceRefContains != nil { payload["source_ref_contains"] = *filter.SourceRefContains } if filter.SourceRefEquals != nil { payload["source_ref_equals"] = *filter.SourceRefEquals } job, err := s.insertJobTx(ctx, tx, "recompute", entityType, entityID, payload, filter.RequestedBy) if err != nil { return AdminCancelExecuteResult{}, err } jobs = append(jobs, job) } if err := tx.Commit(); err != nil { return AdminCancelExecuteResult{}, err } return AdminCancelExecuteResult{ Preview: preview, SoftDeletedEvents: softDeleted, QueuedJobs: jobs, QueuedJobsCount: len(jobs), SkippedJobConflicts: skippedJobConflicts, }, nil } func normalizeAdminCancelFilter(filter AdminCancelSourceFilter) AdminCancelSourceFilter { cleaned := make([]string, 0, len(filter.SourceTypes)) seen := map[string]struct{}{} for _, raw := range filter.SourceTypes { v := strings.TrimSpace(strings.ToLower(raw)) if v == "" { continue } if _, ok := seen[v]; ok { continue } seen[v] = struct{}{} cleaned = append(cleaned, v) } filter.SourceTypes = cleaned if filter.MaxEvents <= 0 { filter.MaxEvents = 1000 } else if filter.MaxEvents > adminCancelMaxEventsLimit { filter.MaxEvents = adminCancelMaxEventsLimit } filter.SourceRefEquals = normalizeStringPtr(filter.SourceRefEquals) filter.SourceRefContains = normalizeStringPtr(filter.SourceRefContains) filter.Reason = normalizeStringPtr(filter.Reason) filter.RequestedBy = normalizeStringPtr(filter.RequestedBy) return filter } func (s *Service) findAdminCancelMatches(ctx context.Context, tx *sql.Tx, filter AdminCancelSourceFilter) ([]adminCancelMatch, error) { if len(filter.SourceTypes) == 0 { return nil, fmt.Errorf("%w: source_types required", ErrInvalidPatch) } sourceClause, sourceArgs := inClauseStrings(filter.SourceTypes) if sourceClause == "" { return nil, fmt.Errorf("%w: source_types required", ErrInvalidPatch) } var args []any query := ` SELECT entity_type, entity_id, event_id, source_type, source_ref, change_type, effective_at, correlation_id FROM ( SELECT 'component' AS entity_type, c.part_id AS entity_id, c.id AS event_id, c.source_type, c.source_ref, c.change_type, c.effective_at, c.correlation_id FROM component_change_events c WHERE c.is_deleted = FALSE AND c.source_type IN (` + sourceClause + `) UNION ALL SELECT 'asset' AS entity_type, a.machine_id AS entity_id, a.id AS event_id, a.source_type, a.source_ref, a.change_type, a.effective_at, a.correlation_id FROM asset_change_events a WHERE a.is_deleted = FALSE AND a.source_type IN (` + sourceClause + `) ) x WHERE 1=1` args = append(args, sourceArgs...) args = append(args, sourceArgs...) if filter.DateFrom != nil { query += ` AND x.effective_at >= ?` args = append(args, filter.DateFrom.UTC()) } if filter.DateTo != nil { query += ` AND x.effective_at <= ?` args = append(args, filter.DateTo.UTC()) } if filter.SourceRefEquals != nil { query += ` AND x.source_ref = ?` args = append(args, *filter.SourceRefEquals) } if filter.SourceRefContains != nil { query += ` AND x.source_ref LIKE ?` args = append(args, "%"+*filter.SourceRefContains+"%") } query += ` ORDER BY x.effective_at, x.entity_type, x.entity_id, x.event_id LIMIT ?` args = append(args, filter.MaxEvents+1) var ( rows *sql.Rows err error ) if tx != nil { rows, err = tx.QueryContext(ctx, query, args...) } else { rows, err = s.db.QueryContext(ctx, query, args...) } if err != nil { return nil, err } defer rows.Close() out := make([]adminCancelMatch, 0, minInt(filter.MaxEvents, 256)) for rows.Next() { var m adminCancelMatch var sourceRef sql.NullString var correlation sql.NullString if err := rows.Scan(&m.EntityType, &m.EntityID, &m.EventID, &m.SourceType, &sourceRef, &m.ChangeType, &m.EffectiveAt, &correlation); err != nil { return nil, err } m.SourceRef = nullStringPtr(sourceRef) m.CorrelationID = nullStringPtr(correlation) out = append(out, m) } if err := rows.Err(); err != nil { return nil, err } return out, nil } func (s *Service) buildAdminCancelPreview(ctx context.Context, tx *sql.Tx, matches []adminCancelMatch, maxEvents int) AdminCancelPreview { preview := AdminCancelPreview{ BySource: map[string]int{}, EntityCountsBySource: map[string]map[string]int{}, MaxEvents: maxEvents, } entitySet := map[string]struct{}{} componentSet := map[string]struct{}{} assetSet := map[string]struct{}{} for i, m := range matches { if i >= maxEvents { preview.Truncated = true break } preview.MatchedEvents++ if m.EntityType == "component" { preview.ComponentEvents++ componentSet[m.EntityID] = struct{}{} } else { preview.AssetEvents++ assetSet[m.EntityID] = struct{}{} } entitySet[entityKey(m.EntityType, m.EntityID)] = struct{}{} preview.BySource[m.SourceType]++ if preview.EntityCountsBySource[m.SourceType] == nil { preview.EntityCountsBySource[m.SourceType] = map[string]int{} } preview.EntityCountsBySource[m.SourceType][m.EntityType]++ if len(preview.Sample) < 25 { preview.Sample = append(preview.Sample, AdminCancelPreviewEvent{ EntityType: m.EntityType, EntityID: m.EntityID, EventID: m.EventID, SourceType: m.SourceType, SourceRef: m.SourceRef, ChangeType: m.ChangeType, EffectiveAt: m.EffectiveAt, CorrelationID: m.CorrelationID, }) } } preview.AffectedEntities = len(entitySet) preview.AffectedComponents = len(componentSet) preview.AffectedAssets = len(assetSet) if tx != nil { if expanded, err := s.collectAffectedEntitiesForAdminCancel(ctx, tx, matches); err == nil { preview.AffectedEntities = len(expanded) preview.AffectedComponents = 0 preview.AffectedAssets = 0 for key := range expanded { if strings.HasPrefix(key, "component:") { preview.AffectedComponents++ } if strings.HasPrefix(key, "asset:") { preview.AffectedAssets++ } } } } return preview } func (s *Service) collectAffectedEntitiesForAdminCancel(ctx context.Context, tx *sql.Tx, matches []adminCancelMatch) (map[string]struct{}, error) { out := map[string]struct{}{} for _, m := range matches { out[entityKey(m.EntityType, m.EntityID)] = struct{}{} if m.EntityType != "asset" || m.CorrelationID == nil || strings.TrimSpace(*m.CorrelationID) == "" { continue } rows, err := tx.QueryContext(ctx, `SELECT DISTINCT part_id FROM component_change_events WHERE correlation_id = ?`, *m.CorrelationID) if err != nil { return nil, err } for rows.Next() { var partID string if err := rows.Scan(&partID); err != nil { rows.Close() return nil, err } out[entityKey("component", partID)] = struct{}{} } if err := rows.Err(); err != nil { rows.Close() return nil, err } rows.Close() } return out, nil } func entityKey(entityType, entityID string) string { return entityType + ":" + entityID } func splitEntityKey(key string) (string, string) { parts := strings.SplitN(key, ":", 2) if len(parts) != 2 { return "", key } return parts[0], parts[1] } func inClauseStrings(values []string) (string, []any) { if len(values) == 0 { return "", nil } ph := make([]string, 0, len(values)) args := make([]any, 0, len(values)) for _, v := range values { ph = append(ph, "?") args = append(args, v) } return strings.Join(ph, ","), args } func minInt(a, b int) int { if a < b { return a } return b } func (s *Service) queueGenericRecomputeTx(ctx context.Context, tx *sql.Tx, entityType, entityID string, payload map[string]any, requestedBy *string) (JobRecord, error) { return s.insertJobTx(ctx, tx, "recompute", entityType, entityID, payload, requestedBy) } func (s *Service) MarshalAdminCancelPreview(preview AdminCancelPreview) ([]byte, error) { return json.Marshal(preview) }