Files
core/internal/history/admin_cancel.go

379 lines
12 KiB
Go

package history
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
)
type AdminCancelSourceFilter struct {
SourceTypes []string `json:"source_types"`
DateFrom *time.Time `json:"date_from,omitempty"`
DateTo *time.Time `json:"date_to,omitempty"`
SourceRefContains *string `json:"source_ref_contains,omitempty"`
Reason *string `json:"reason,omitempty"`
MaxEvents int `json:"max_events,omitempty"`
RequestedBy *string `json:"requested_by,omitempty"`
}
type AdminCancelPreview struct {
MatchedEvents int `json:"matched_events"`
ComponentEvents int `json:"component_events"`
AssetEvents int `json:"asset_events"`
AffectedEntities int `json:"affected_entities"`
AffectedComponents int `json:"affected_components"`
AffectedAssets int `json:"affected_assets"`
BySource map[string]int `json:"by_source"`
EntityCountsBySource map[string]map[string]int `json:"entity_counts_by_source"`
Sample []AdminCancelPreviewEvent `json:"sample"`
Truncated bool `json:"truncated"`
MaxEvents int `json:"max_events"`
}
type AdminCancelPreviewEvent struct {
EntityType string `json:"entity_type"`
EntityID string `json:"entity_id"`
EventID string `json:"event_id"`
SourceType string `json:"source_type"`
SourceRef *string `json:"source_ref,omitempty"`
ChangeType string `json:"change_type"`
EffectiveAt time.Time `json:"effective_at"`
CorrelationID *string `json:"correlation_id,omitempty"`
}
type AdminCancelExecuteResult struct {
Preview AdminCancelPreview `json:"preview"`
SoftDeletedEvents int `json:"soft_deleted_events"`
QueuedJobs []JobRecord `json:"queued_jobs"`
QueuedJobsCount int `json:"queued_jobs_count"`
}
type adminCancelMatch struct {
EntityType string
EntityID string
EventID string
SourceType string
SourceRef *string
ChangeType string
EffectiveAt time.Time
CorrelationID *string
}
func (s *Service) PreviewCancelEventsBySource(ctx context.Context, filter AdminCancelSourceFilter) (AdminCancelPreview, error) {
if s == nil || s.db == nil {
return AdminCancelPreview{}, fmt.Errorf("history service unavailable")
}
filter = normalizeAdminCancelFilter(filter)
matches, err := s.findAdminCancelMatches(ctx, nil, filter)
if err != nil {
return AdminCancelPreview{}, err
}
return s.buildAdminCancelPreview(ctx, nil, matches, filter.MaxEvents), nil
}
func (s *Service) QueueCancelEventsBySource(ctx context.Context, filter AdminCancelSourceFilter) (AdminCancelExecuteResult, error) {
if s == nil || s.db == nil {
return AdminCancelExecuteResult{}, fmt.Errorf("history service unavailable")
}
filter = normalizeAdminCancelFilter(filter)
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return AdminCancelExecuteResult{}, err
}
defer func() { _ = tx.Rollback() }()
matches, err := s.findAdminCancelMatches(ctx, tx, filter)
if err != nil {
return AdminCancelExecuteResult{}, err
}
preview := s.buildAdminCancelPreview(ctx, tx, matches, filter.MaxEvents)
if len(matches) > filter.MaxEvents {
return AdminCancelExecuteResult{}, fmt.Errorf("%w: preview truncated, narrow filters or increase max_events", ErrConflict)
}
if len(matches) == 0 {
if err := tx.Commit(); err != nil {
return AdminCancelExecuteResult{}, err
}
return AdminCancelExecuteResult{Preview: preview}, nil
}
affectedEntities, err := s.collectAffectedEntitiesForAdminCancel(ctx, tx, matches)
if err != nil {
return AdminCancelExecuteResult{}, err
}
for key := range affectedEntities {
entityType, entityID := splitEntityKey(key)
if err := s.ensureNoRunningJob(ctx, tx, entityType, entityID); err != nil {
return AdminCancelExecuteResult{}, err
}
}
softDeleted := 0
for _, m := range matches {
if err := s.softDeleteLogicalEvent(ctx, tx, m.EntityType, m.EntityID, m.EventID, filter.Reason, filter.RequestedBy); err != nil {
return AdminCancelExecuteResult{}, err
}
softDeleted++
}
jobs := make([]JobRecord, 0, len(affectedEntities))
for key := range affectedEntities {
entityType, entityID := splitEntityKey(key)
payload := map[string]any{
"batch": true,
"source_types": filter.SourceTypes,
}
if filter.DateFrom != nil {
payload["date_from"] = filter.DateFrom.UTC().Format(time.RFC3339Nano)
}
if filter.DateTo != nil {
payload["date_to"] = filter.DateTo.UTC().Format(time.RFC3339Nano)
}
if filter.SourceRefContains != nil {
payload["source_ref_contains"] = *filter.SourceRefContains
}
job, err := s.insertJobTx(ctx, tx, "recompute", entityType, entityID, payload, filter.RequestedBy)
if err != nil {
return AdminCancelExecuteResult{}, err
}
jobs = append(jobs, job)
}
if err := tx.Commit(); err != nil {
return AdminCancelExecuteResult{}, err
}
return AdminCancelExecuteResult{
Preview: preview,
SoftDeletedEvents: softDeleted,
QueuedJobs: jobs,
QueuedJobsCount: len(jobs),
}, nil
}
func normalizeAdminCancelFilter(filter AdminCancelSourceFilter) AdminCancelSourceFilter {
cleaned := make([]string, 0, len(filter.SourceTypes))
seen := map[string]struct{}{}
for _, raw := range filter.SourceTypes {
v := strings.TrimSpace(strings.ToLower(raw))
if v == "" {
continue
}
if _, ok := seen[v]; ok {
continue
}
seen[v] = struct{}{}
cleaned = append(cleaned, v)
}
filter.SourceTypes = cleaned
if filter.MaxEvents <= 0 || filter.MaxEvents > 5000 {
filter.MaxEvents = 1000
}
filter.SourceRefContains = normalizeStringPtr(filter.SourceRefContains)
filter.Reason = normalizeStringPtr(filter.Reason)
filter.RequestedBy = normalizeStringPtr(filter.RequestedBy)
return filter
}
func (s *Service) findAdminCancelMatches(ctx context.Context, tx *sql.Tx, filter AdminCancelSourceFilter) ([]adminCancelMatch, error) {
if len(filter.SourceTypes) == 0 {
return nil, fmt.Errorf("%w: source_types required", ErrInvalidPatch)
}
sourceClause, sourceArgs := inClauseStrings(filter.SourceTypes)
if sourceClause == "" {
return nil, fmt.Errorf("%w: source_types required", ErrInvalidPatch)
}
var args []any
query := `
SELECT entity_type, entity_id, event_id, source_type, source_ref, change_type, effective_at, correlation_id
FROM (
SELECT 'component' AS entity_type, c.part_id AS entity_id, c.id AS event_id, c.source_type, c.source_ref, c.change_type, c.effective_at, c.correlation_id
FROM component_change_events c
WHERE c.is_deleted = FALSE AND c.source_type IN (` + sourceClause + `)
UNION ALL
SELECT 'asset' AS entity_type, a.machine_id AS entity_id, a.id AS event_id, a.source_type, a.source_ref, a.change_type, a.effective_at, a.correlation_id
FROM asset_change_events a
WHERE a.is_deleted = FALSE AND a.source_type IN (` + sourceClause + `)
) x
WHERE 1=1`
args = append(args, sourceArgs...)
args = append(args, sourceArgs...)
if filter.DateFrom != nil {
query += ` AND x.effective_at >= ?`
args = append(args, filter.DateFrom.UTC())
}
if filter.DateTo != nil {
query += ` AND x.effective_at <= ?`
args = append(args, filter.DateTo.UTC())
}
if filter.SourceRefContains != nil {
query += ` AND x.source_ref LIKE ?`
args = append(args, "%"+*filter.SourceRefContains+"%")
}
query += ` ORDER BY x.effective_at, x.entity_type, x.entity_id, x.event_id LIMIT ?`
args = append(args, filter.MaxEvents+1)
var (
rows *sql.Rows
err error
)
if tx != nil {
rows, err = tx.QueryContext(ctx, query, args...)
} else {
rows, err = s.db.QueryContext(ctx, query, args...)
}
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]adminCancelMatch, 0, minInt(filter.MaxEvents, 256))
for rows.Next() {
var m adminCancelMatch
var sourceRef sql.NullString
var correlation sql.NullString
if err := rows.Scan(&m.EntityType, &m.EntityID, &m.EventID, &m.SourceType, &sourceRef, &m.ChangeType, &m.EffectiveAt, &correlation); err != nil {
return nil, err
}
m.SourceRef = nullStringPtr(sourceRef)
m.CorrelationID = nullStringPtr(correlation)
out = append(out, m)
}
if err := rows.Err(); err != nil {
return nil, err
}
return out, nil
}
func (s *Service) buildAdminCancelPreview(ctx context.Context, tx *sql.Tx, matches []adminCancelMatch, maxEvents int) AdminCancelPreview {
preview := AdminCancelPreview{
BySource: map[string]int{},
EntityCountsBySource: map[string]map[string]int{},
MaxEvents: maxEvents,
}
entitySet := map[string]struct{}{}
componentSet := map[string]struct{}{}
assetSet := map[string]struct{}{}
for i, m := range matches {
if i >= maxEvents {
preview.Truncated = true
break
}
preview.MatchedEvents++
if m.EntityType == "component" {
preview.ComponentEvents++
componentSet[m.EntityID] = struct{}{}
} else {
preview.AssetEvents++
assetSet[m.EntityID] = struct{}{}
}
entitySet[entityKey(m.EntityType, m.EntityID)] = struct{}{}
preview.BySource[m.SourceType]++
if preview.EntityCountsBySource[m.SourceType] == nil {
preview.EntityCountsBySource[m.SourceType] = map[string]int{}
}
preview.EntityCountsBySource[m.SourceType][m.EntityType]++
if len(preview.Sample) < 25 {
preview.Sample = append(preview.Sample, AdminCancelPreviewEvent{
EntityType: m.EntityType,
EntityID: m.EntityID,
EventID: m.EventID,
SourceType: m.SourceType,
SourceRef: m.SourceRef,
ChangeType: m.ChangeType,
EffectiveAt: m.EffectiveAt,
CorrelationID: m.CorrelationID,
})
}
}
preview.AffectedEntities = len(entitySet)
preview.AffectedComponents = len(componentSet)
preview.AffectedAssets = len(assetSet)
if tx != nil {
if expanded, err := s.collectAffectedEntitiesForAdminCancel(ctx, tx, matches); err == nil {
preview.AffectedEntities = len(expanded)
preview.AffectedComponents = 0
preview.AffectedAssets = 0
for key := range expanded {
if strings.HasPrefix(key, "component:") {
preview.AffectedComponents++
}
if strings.HasPrefix(key, "asset:") {
preview.AffectedAssets++
}
}
}
}
return preview
}
func (s *Service) collectAffectedEntitiesForAdminCancel(ctx context.Context, tx *sql.Tx, matches []adminCancelMatch) (map[string]struct{}, error) {
out := map[string]struct{}{}
for _, m := range matches {
out[entityKey(m.EntityType, m.EntityID)] = struct{}{}
if m.EntityType != "asset" || m.CorrelationID == nil || strings.TrimSpace(*m.CorrelationID) == "" {
continue
}
rows, err := tx.QueryContext(ctx, `SELECT DISTINCT part_id FROM component_change_events WHERE correlation_id = ?`, *m.CorrelationID)
if err != nil {
return nil, err
}
for rows.Next() {
var partID string
if err := rows.Scan(&partID); err != nil {
rows.Close()
return nil, err
}
out[entityKey("component", partID)] = struct{}{}
}
if err := rows.Err(); err != nil {
rows.Close()
return nil, err
}
rows.Close()
}
return out, nil
}
func entityKey(entityType, entityID string) string {
return entityType + ":" + entityID
}
func splitEntityKey(key string) (string, string) {
parts := strings.SplitN(key, ":", 2)
if len(parts) != 2 {
return "", key
}
return parts[0], parts[1]
}
func inClauseStrings(values []string) (string, []any) {
if len(values) == 0 {
return "", nil
}
ph := make([]string, 0, len(values))
args := make([]any, 0, len(values))
for _, v := range values {
ph = append(ph, "?")
args = append(args, v)
}
return strings.Join(ph, ","), args
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func (s *Service) queueGenericRecomputeTx(ctx context.Context, tx *sql.Tx, entityType, entityID string, payload map[string]any, requestedBy *string) (JobRecord, error) {
return s.insertJobTx(ctx, tx, "recompute", entityType, entityID, payload, requestedBy)
}
func (s *Service) MarshalAdminCancelPreview(preview AdminCancelPreview) ([]byte, error) {
return json.Marshal(preview)
}