Implement local DB migrations and archived configuration lifecycle
This commit is contained in:
@@ -45,6 +45,21 @@ type ConfigImportResult struct {
|
||||
Skipped int `json:"skipped"`
|
||||
}
|
||||
|
||||
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
|
||||
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
|
||||
type ConfigurationChangePayload struct {
|
||||
EventID string `json:"event_id"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
ConfigurationUUID string `json:"configuration_uuid"`
|
||||
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
|
||||
CurrentVersionID string `json:"current_version_id,omitempty"`
|
||||
CurrentVersionNo int `json:"current_version_no,omitempty"`
|
||||
ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins
|
||||
Snapshot models.Configuration `json:"snapshot"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
CreatedBy *string `json:"created_by,omitempty"`
|
||||
}
|
||||
|
||||
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
|
||||
// Existing local configs with pending local changes are skipped to avoid data loss.
|
||||
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
|
||||
@@ -78,6 +93,11 @@ func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
|
||||
result.Skipped++
|
||||
continue
|
||||
}
|
||||
if existing != nil && err == nil && !existing.IsActive {
|
||||
// Keep local deactivation sticky: do not resurrect hidden entries from server pull.
|
||||
result.Skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
localCfg := localdb.ConfigurationToLocal(&cfg)
|
||||
now := time.Now()
|
||||
@@ -432,6 +452,12 @@ func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
||||
return s.pushConfigurationCreate(change)
|
||||
case "update":
|
||||
return s.pushConfigurationUpdate(change)
|
||||
case "rollback":
|
||||
return s.pushConfigurationRollback(change)
|
||||
case "deactivate":
|
||||
return s.pushConfigurationDeactivate(change)
|
||||
case "reactivate":
|
||||
return s.pushConfigurationReactivate(change)
|
||||
case "delete":
|
||||
return s.pushConfigurationDelete(change)
|
||||
default:
|
||||
@@ -441,9 +467,13 @@ func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
||||
|
||||
// pushConfigurationCreate creates a configuration on the server
|
||||
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
var cfg models.Configuration
|
||||
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
||||
return fmt.Errorf("unmarshaling configuration: %w", err)
|
||||
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isStale {
|
||||
slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
@@ -457,7 +487,15 @@ func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
|
||||
// Create on server
|
||||
if err := configRepo.Create(&cfg); err != nil {
|
||||
return fmt.Errorf("creating configuration on server: %w", err)
|
||||
// Idempotency fallback: configuration may already be created remotely.
|
||||
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
|
||||
if getErr != nil {
|
||||
return fmt.Errorf("creating configuration on server: %w", err)
|
||||
}
|
||||
cfg.ID = serverCfg.ID
|
||||
if updateErr := configRepo.Update(&cfg); updateErr != nil {
|
||||
return fmt.Errorf("create fallback update on server: %w", updateErr)
|
||||
}
|
||||
}
|
||||
|
||||
// Update local configuration with server ID
|
||||
@@ -469,15 +507,25 @@ func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
s.localDB.SaveConfiguration(localCfg)
|
||||
}
|
||||
|
||||
slog.Info("configuration created on server", "uuid", cfg.UUID, "server_id", cfg.ID)
|
||||
slog.Info("configuration created on server",
|
||||
"uuid", cfg.UUID,
|
||||
"server_id", cfg.ID,
|
||||
"version_no", payload.CurrentVersionNo,
|
||||
"version_id", payload.CurrentVersionID,
|
||||
"idempotency_key", payload.IdempotencyKey,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// pushConfigurationUpdate updates a configuration on the server
|
||||
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
var cfg models.Configuration
|
||||
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
||||
return fmt.Errorf("unmarshaling configuration: %w", err)
|
||||
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if isStale {
|
||||
slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
@@ -526,10 +574,149 @@ func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
s.localDB.SaveConfiguration(localCfg)
|
||||
}
|
||||
|
||||
slog.Info("configuration updated on server", "uuid", cfg.UUID)
|
||||
slog.Info("configuration updated on server",
|
||||
"uuid", cfg.UUID,
|
||||
"version_no", payload.CurrentVersionNo,
|
||||
"version_id", payload.CurrentVersionID,
|
||||
"idempotency_key", payload.IdempotencyKey,
|
||||
"operation", payload.Operation,
|
||||
"conflict_policy", payload.ConflictPolicy,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
|
||||
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
|
||||
return s.pushConfigurationUpdate(change)
|
||||
}
|
||||
|
||||
func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error {
|
||||
// Local deactivate is represented as the latest snapshot push.
|
||||
return s.pushConfigurationUpdate(change)
|
||||
}
|
||||
|
||||
func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error {
|
||||
// Local reactivate is represented as the latest snapshot push.
|
||||
return s.pushConfigurationUpdate(change)
|
||||
}
|
||||
|
||||
func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) {
|
||||
payload, err := decodeConfigurationChangePayload(change)
|
||||
if err != nil {
|
||||
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err)
|
||||
}
|
||||
eventVersionNo := payload.CurrentVersionNo
|
||||
|
||||
currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID)
|
||||
if err != nil {
|
||||
// create->deactivate race: config may no longer be active/visible locally, skip stale create.
|
||||
if change.Operation == "create" {
|
||||
return payload, payload.Snapshot, true, nil
|
||||
}
|
||||
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err)
|
||||
}
|
||||
|
||||
if payload.ConflictPolicy == "" {
|
||||
payload.ConflictPolicy = "last_write_wins"
|
||||
}
|
||||
|
||||
if currentCfg.UUID != "" {
|
||||
payload.Snapshot = currentCfg
|
||||
if currentVersionID != "" {
|
||||
payload.CurrentVersionID = currentVersionID
|
||||
}
|
||||
if currentVersionNo > 0 {
|
||||
payload.CurrentVersionNo = currentVersionNo
|
||||
}
|
||||
}
|
||||
|
||||
isStale := false
|
||||
if eventVersionNo > 0 && currentVersionNo > eventVersionNo {
|
||||
// Keep only latest intent in queue; older versions become no-op.
|
||||
isStale = true
|
||||
}
|
||||
if !isStale && change.Operation == "create" {
|
||||
localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID)
|
||||
if getErr == nil && !localCfg.IsActive {
|
||||
isStale = true
|
||||
}
|
||||
}
|
||||
|
||||
return payload, payload.Snapshot, isStale, nil
|
||||
}
|
||||
|
||||
func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) {
|
||||
var payload ConfigurationChangePayload
|
||||
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" {
|
||||
if payload.Operation == "" {
|
||||
payload.Operation = change.Operation
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
// Backward compatibility: legacy queue stored raw models.Configuration JSON.
|
||||
var cfg models.Configuration
|
||||
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
||||
return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err)
|
||||
}
|
||||
|
||||
return ConfigurationChangePayload{
|
||||
EventID: "",
|
||||
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
|
||||
ConfigurationUUID: cfg.UUID,
|
||||
Operation: change.Operation,
|
||||
ConflictPolicy: "last_write_wins",
|
||||
Snapshot: cfg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) {
|
||||
localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID)
|
||||
if err != nil {
|
||||
return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err)
|
||||
}
|
||||
|
||||
cfg := *localdb.LocalToConfiguration(localCfg)
|
||||
|
||||
currentVersionID := ""
|
||||
if localCfg.CurrentVersionID != nil {
|
||||
currentVersionID = *localCfg.CurrentVersionID
|
||||
}
|
||||
|
||||
currentVersionNo := 0
|
||||
if currentVersionID != "" {
|
||||
var version localdb.LocalConfigurationVersion
|
||||
err = s.localDB.DB().
|
||||
Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID).
|
||||
First(&version).Error
|
||||
if err == nil {
|
||||
currentVersionNo = version.VersionNo
|
||||
}
|
||||
}
|
||||
|
||||
if currentVersionNo == 0 {
|
||||
var latest localdb.LocalConfigurationVersion
|
||||
err = s.localDB.DB().
|
||||
Where("configuration_uuid = ?", configurationUUID).
|
||||
Order("version_no DESC").
|
||||
First(&latest).Error
|
||||
if err == nil {
|
||||
currentVersionNo = latest.VersionNo
|
||||
currentVersionID = latest.ID
|
||||
}
|
||||
}
|
||||
|
||||
if currentVersionNo == 0 {
|
||||
return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID)
|
||||
}
|
||||
|
||||
return cfg, currentVersionID, currentVersionNo, nil
|
||||
}
|
||||
|
||||
// NOTE: prepared for future conflict resolution:
|
||||
// when server starts storing version metadata, we can compare payload.CurrentVersionNo
|
||||
// against remote version and branch into custom strategies. For now use last-write-wins.
|
||||
|
||||
// pushConfigurationDelete deletes a configuration from the server
|
||||
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
// Get database connection
|
||||
@@ -554,6 +741,6 @@ func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
return fmt.Errorf("deleting configuration from server: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("configuration deleted from server", "uuid", change.EntityUUID)
|
||||
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user