Files
core/internal/history/cross_entity.go

283 lines
10 KiB
Go

package history
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"reanimator/internal/idgen"
)
type InstallationChangeResult struct {
Changed bool
}
func (s *Service) InstallComponentToAssetWithTx(ctx context.Context, tx *sql.Tx, componentID, assetID string, installedAt time.Time, sourceType string, sourceRef, correlationID, idempotencyKey *string) (InstallationChangeResult, error) {
if tx == nil {
return InstallationChangeResult{}, fmt.Errorf("%w: tx is required", ErrInvalidPatch)
}
if strings.TrimSpace(componentID) == "" || strings.TrimSpace(assetID) == "" {
return InstallationChangeResult{}, fmt.Errorf("%w: component_id and asset_id required", ErrInvalidPatch)
}
when := installedAt.UTC()
componentState, err := s.loadComponentStateForUpdate(ctx, tx, componentID)
if err != nil {
return InstallationChangeResult{}, err
}
assetState, err := s.loadAssetStateForUpdate(ctx, tx, assetID)
if err != nil {
return InstallationChangeResult{}, err
}
if componentState.Installation.CurrentMachineID != nil && *componentState.Installation.CurrentMachineID == assetID {
return InstallationChangeResult{Changed: false}, nil
}
_ = assetState
effText := when.Format(time.RFC3339Nano)
actorType := "ingest"
if strings.TrimSpace(sourceType) == "user" {
actorType = "user"
}
compPatch := []PatchOp{
{Op: "replace", Path: "/installation/current_machine_id", Value: assetID},
{Op: "replace", Path: "/installation/installed_at", Value: effText},
}
compRes, err := s.ApplyComponentPatchWithTx(ctx, tx, ApplyPatchCommand{
EntityID: componentID,
ChangeType: "COMPONENT_INSTALLED",
EffectiveAt: &when,
SourceType: sourceType,
SourceRef: sourceRef,
ActorType: actorType,
CorrelationID: correlationID,
IdempotencyKey: idempotencyKey,
Patch: compPatch,
})
if err != nil {
return InstallationChangeResult{}, err
}
if compRes.Duplicate {
return InstallationChangeResult{Changed: false}, nil
}
updatedAsset, err := s.loadAssetStateForUpdate(ctx, tx, assetID)
if err != nil {
return InstallationChangeResult{}, err
}
if !containsString(updatedAsset.Inventory.InstalledComponentIDs, componentID) {
updatedAsset.Inventory.InstalledComponentIDs = append(updatedAsset.Inventory.InstalledComponentIDs, componentID)
updatedAsset.Inventory.InstalledComponentIDs = sortedStrings(updatedAsset.Inventory.InstalledComponentIDs)
}
assetPatch, err := diffAssetStates(assetState, updatedAsset)
if err != nil {
return InstallationChangeResult{}, err
}
if len(assetPatch) > 0 {
assetIdem := deriveChildIdem(idempotencyKey, "asset")
if _, err := s.ApplyAssetPatchWithTx(ctx, tx, ApplyPatchCommand{
EntityID: assetID,
ChangeType: "ASSET_COMPONENT_SET_CHANGED",
EffectiveAt: &when,
SourceType: sourceType,
SourceRef: sourceRef,
ActorType: actorType,
CorrelationID: correlationID,
IdempotencyKey: assetIdem,
Patch: assetPatch,
}); err != nil {
return InstallationChangeResult{}, err
}
}
if err := s.upsertInstallationProjectionTx(ctx, tx, componentID, assetID, when); err != nil {
return InstallationChangeResult{}, err
}
return InstallationChangeResult{Changed: true}, nil
}
func (s *Service) InstallComponentToAssetWithSlotTx(ctx context.Context, tx *sql.Tx, componentID, assetID string, installedAt time.Time, slotName *string, sourceType string, sourceRef, correlationID, idempotencyKey *string) (InstallationChangeResult, error) {
res, err := s.InstallComponentToAssetWithTx(ctx, tx, componentID, assetID, installedAt, sourceType, sourceRef, correlationID, idempotencyKey)
if err != nil || !res.Changed {
return res, err
}
slot := normalizeStringPtr(slotName)
if slot == nil {
return res, nil
}
_, err = s.ApplyComponentPatchWithTx(ctx, tx, ApplyPatchCommand{
EntityID: componentID,
ChangeType: "COMPONENT_REGISTRY_UPDATED",
EffectiveAt: &installedAt,
SourceType: sourceType,
SourceRef: sourceRef,
ActorType: "user",
CorrelationID: correlationID,
IdempotencyKey: deriveChildIdem(idempotencyKey, "slot"),
Patch: []PatchOp{
{Op: "replace", Path: "/installation/slot_name", Value: valueOrNilString(slot)},
},
})
return res, err
}
func (s *Service) RemoveComponentFromAssetWithTx(ctx context.Context, tx *sql.Tx, componentID, assetID string, removedAt time.Time, sourceType string, sourceRef, correlationID, idempotencyKey *string) (InstallationChangeResult, error) {
if tx == nil {
return InstallationChangeResult{}, fmt.Errorf("%w: tx is required", ErrInvalidPatch)
}
when := removedAt.UTC()
componentState, err := s.loadComponentStateForUpdate(ctx, tx, componentID)
if err != nil {
return InstallationChangeResult{}, err
}
if componentState.Installation.CurrentMachineID == nil || *componentState.Installation.CurrentMachineID != assetID {
return InstallationChangeResult{Changed: false}, nil
}
assetState, err := s.loadAssetStateForUpdate(ctx, tx, assetID)
if err != nil {
return InstallationChangeResult{}, err
}
actorType := "ingest"
if strings.TrimSpace(sourceType) == "user" {
actorType = "user"
}
compPatch := []PatchOp{
{Op: "remove", Path: "/installation/current_machine_id"},
{Op: "remove", Path: "/installation/installed_at"},
{Op: "remove", Path: "/installation/slot_name"},
}
compRes, err := s.ApplyComponentPatchWithTx(ctx, tx, ApplyPatchCommand{
EntityID: componentID,
ChangeType: "COMPONENT_REMOVED",
EffectiveAt: &when,
SourceType: sourceType,
SourceRef: sourceRef,
ActorType: actorType,
CorrelationID: correlationID,
IdempotencyKey: idempotencyKey,
Patch: compPatch,
})
if err != nil {
return InstallationChangeResult{}, err
}
if compRes.Duplicate {
return InstallationChangeResult{Changed: false}, nil
}
updatedAsset := assetState
updatedAsset.Inventory.InstalledComponentIDs = removeString(updatedAsset.Inventory.InstalledComponentIDs, componentID)
updatedAsset.Inventory.InstalledComponentIDs = sortedStrings(updatedAsset.Inventory.InstalledComponentIDs)
assetPatch, err := diffAssetStates(assetState, updatedAsset)
if err != nil {
return InstallationChangeResult{}, err
}
if len(assetPatch) > 0 {
assetIdem := deriveChildIdem(idempotencyKey, "asset")
if _, err := s.ApplyAssetPatchWithTx(ctx, tx, ApplyPatchCommand{
EntityID: assetID,
ChangeType: "ASSET_COMPONENT_SET_CHANGED",
EffectiveAt: &when,
SourceType: sourceType,
SourceRef: sourceRef,
ActorType: actorType,
CorrelationID: correlationID,
IdempotencyKey: assetIdem,
Patch: assetPatch,
}); err != nil {
return InstallationChangeResult{}, err
}
}
if err := s.closeInstallationProjectionTx(ctx, tx, componentID, assetID, when); err != nil {
return InstallationChangeResult{}, err
}
return InstallationChangeResult{Changed: true}, nil
}
func (s *Service) MoveComponentToAssetWithTx(ctx context.Context, tx *sql.Tx, componentID, fromAssetID, toAssetID string, when time.Time, slotName *string, sourceType string, sourceRef, correlationID, idempotencyKey *string) (InstallationChangeResult, error) {
if strings.TrimSpace(componentID) == "" || strings.TrimSpace(fromAssetID) == "" || strings.TrimSpace(toAssetID) == "" {
return InstallationChangeResult{}, fmt.Errorf("%w: component_id/from_asset_id/to_asset_id required", ErrInvalidPatch)
}
if fromAssetID == toAssetID {
return s.InstallComponentToAssetWithSlotTx(ctx, tx, componentID, toAssetID, when, slotName, sourceType, sourceRef, correlationID, idempotencyKey)
}
removed, err := s.RemoveComponentFromAssetWithTx(ctx, tx, componentID, fromAssetID, when, sourceType, sourceRef, correlationID, deriveChildIdem(idempotencyKey, "move_remove"))
if err != nil {
return InstallationChangeResult{}, err
}
if !removed.Changed {
return InstallationChangeResult{Changed: false}, nil
}
installed, err := s.InstallComponentToAssetWithSlotTx(ctx, tx, componentID, toAssetID, when, slotName, sourceType, sourceRef, correlationID, deriveChildIdem(idempotencyKey, "move_install"))
if err != nil {
return InstallationChangeResult{}, err
}
return InstallationChangeResult{Changed: installed.Changed}, nil
}
func (s *Service) upsertInstallationProjectionTx(ctx context.Context, tx *sql.Tx, componentID, assetID string, installedAt time.Time) error {
if _, err := tx.ExecContext(ctx, `
UPDATE installations
SET removed_at = ?
WHERE part_id = ? AND removed_at IS NULL AND machine_id <> ? AND installed_at <= ?`,
installedAt, componentID, assetID, installedAt,
); err != nil {
return err
}
var exists int
if err := tx.QueryRowContext(ctx, `SELECT COUNT(*) FROM installations WHERE part_id = ? AND machine_id = ? AND removed_at IS NULL`, componentID, assetID).Scan(&exists); err != nil {
return err
}
if exists > 0 {
return nil
}
installationID, err := s.generateID(ctx, tx, idgen.Installation)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `INSERT INTO installations (id, machine_id, part_id, installed_at) VALUES (?, ?, ?, ?)`, installationID, assetID, componentID, installedAt)
return err
}
func (s *Service) closeInstallationProjectionTx(ctx context.Context, tx *sql.Tx, componentID, assetID string, removedAt time.Time) error {
_, err := tx.ExecContext(ctx, `
UPDATE installations
SET removed_at = ?
WHERE machine_id = ? AND part_id = ? AND removed_at IS NULL AND installed_at <= ?`,
removedAt, assetID, componentID, removedAt,
)
return err
}
func deriveChildIdem(base *string, suffix string) *string {
if base == nil || strings.TrimSpace(*base) == "" {
return nil
}
value := strings.TrimSpace(*base) + ":" + suffix
return &value
}
func valueOrNilString(value *string) any {
if value == nil {
return nil
}
return *value
}
func containsString(items []string, target string) bool {
for _, item := range items {
if item == target {
return true
}
}
return false
}
func removeString(items []string, target string) []string {
out := make([]string, 0, len(items))
for _, item := range items {
if item == target {
continue
}
out = append(out, item)
}
return out
}