Files
core/internal/history/tx_apply.go
Michael Chus 4c284505a8 Async ingest, deferred history, batch delete, vendor normalization, CI identifiers
- history/worker: fix deadlock by moving stale job requeue out of claimNextJob
  into dedicated staleJobRequeuer goroutine (runs every 2 min)
- history/service,tx_apply,cross_entity: add deferred=true mode — write events+
  snapshots but skip projection updates; queue recompute after commit
- ingest/service: IngestHardwareDeferred uses deferred mode; CSV workers up to 8
  (INGEST_CSV_WORKERS env); serial/prefetch lookups use normalize.SerialKey
- api/ingest: JSON /ingest/hardware now async (202 + job_id); new GET
  /ingest/hardware/jobs/{id} endpoint; CSV already async
- history/admin_cancel: replace per-event softDelete loop with batchSoftDeleteEvents
  using IN-clause chunks of 500 to prevent request timeout on large deletes
- normalize: new internal/normalize package with VendorKey, VendorDisplay,
  VendorDisplayPtr, SerialKey, FirmwareKey
- ingest/parser_hardware: vendor fields use normalize.VendorDisplayPtr
- migrations/0021_ci_identifiers: change identifier columns to utf8mb4_unicode_ci
  (case-insensitive) in parts, machines, observations, machine_firmware_states
- bible submodule: update to add identifier-normalization contract

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 22:23:17 +03:00

79 lines
3.0 KiB
Go

package history
import (
"context"
"database/sql"
"fmt"
"strings"
)
func (s *Service) ApplyComponentPatchWithTx(ctx context.Context, tx *sql.Tx, cmd ApplyPatchCommand) (ApplyPatchResult, error) {
return s.applyEntityPatchWithTx(ctx, tx, "component", cmd, false)
}
func (s *Service) ApplyAssetPatchWithTx(ctx context.Context, tx *sql.Tx, cmd ApplyPatchCommand) (ApplyPatchResult, error) {
return s.applyEntityPatchWithTx(ctx, tx, "asset", cmd, false)
}
// ApplyComponentPatchDeferredWithTx writes the change event and snapshot but skips
// updating the projection and timeline. Call QueueRecompute after the transaction
// commits to rebuild projections in the background.
func (s *Service) ApplyComponentPatchDeferredWithTx(ctx context.Context, tx *sql.Tx, cmd ApplyPatchCommand) (ApplyPatchResult, error) {
return s.applyEntityPatchWithTx(ctx, tx, "component", cmd, true)
}
// ApplyAssetPatchDeferredWithTx writes the change event and snapshot but skips
// updating the projection and timeline. Call QueueRecompute after the transaction
// commits to rebuild projections in the background.
func (s *Service) ApplyAssetPatchDeferredWithTx(ctx context.Context, tx *sql.Tx, cmd ApplyPatchCommand) (ApplyPatchResult, error) {
return s.applyEntityPatchWithTx(ctx, tx, "asset", cmd, true)
}
func (s *Service) applyEntityPatchWithTx(ctx context.Context, tx *sql.Tx, entityType string, cmd ApplyPatchCommand, deferred bool) (ApplyPatchResult, error) {
if tx == nil {
return ApplyPatchResult{}, fmt.Errorf("%w: tx is required", ErrInvalidPatch)
}
if strings.TrimSpace(cmd.SourceType) == "" {
cmd.SourceType = "user"
}
if strings.TrimSpace(cmd.ActorType) == "" {
cmd.ActorType = "user"
}
if strings.TrimSpace(cmd.EntityID) == "" {
return ApplyPatchResult{}, fmt.Errorf("%w: entity id is required", ErrInvalidPatch)
}
if len(cmd.Patch) == 0 {
return ApplyPatchResult{}, fmt.Errorf("%w: patch is required", ErrInvalidPatch)
}
if strings.TrimSpace(cmd.ChangeType) == "" {
return ApplyPatchResult{}, fmt.Errorf("%w: change_type is required", ErrInvalidPatch)
}
if err := validatePatchOps(entityType, cmd.ChangeType, cmd.SourceType, cmd.Patch); err != nil {
return ApplyPatchResult{}, err
}
if idem := normalizeStringPtr(cmd.IdempotencyKey); idem != nil {
if result, ok, err := s.lookupIdempotentResult(ctx, tx, entityType, cmd.EntityID, cmd.SourceType, *idem); err != nil {
return ApplyPatchResult{}, err
} else if ok {
return result, nil
}
cmd.IdempotencyKey = idem
}
switch entityType {
case "component":
state, err := s.loadComponentStateForUpdate(ctx, tx, cmd.EntityID)
if err != nil {
return ApplyPatchResult{}, err
}
return s.applyComponentPatchTx(ctx, tx, state, cmd, deferred)
case "asset":
state, err := s.loadAssetStateForUpdate(ctx, tx, cmd.EntityID)
if err != nil {
return ApplyPatchResult{}, err
}
return s.applyAssetPatchTx(ctx, tx, state, cmd, deferred)
default:
return ApplyPatchResult{}, fmt.Errorf("%w: unsupported entity type", ErrInvalidPatch)
}
}