package sync import ( "encoding/json" "errors" "fmt" "log/slog" "time" "git.mchus.pro/mchus/priceforge/internal/appmeta" "git.mchus.pro/mchus/priceforge/internal/localdb" "git.mchus.pro/mchus/priceforge/internal/models" "git.mchus.pro/mchus/priceforge/internal/repository" "github.com/google/uuid" "gorm.io/gorm" ) // PushPendingChanges pushes all pending changes to the server func (s *Service) PushPendingChanges() (int, error) { if _, err := s.EnsureReadinessForSync(); err != nil { return 0, err } removed, err := s.localDB.PurgeOrphanConfigurationPendingChanges() if err != nil { slog.Warn("failed to purge orphan configuration pending changes", "error", err) } else if removed > 0 { slog.Info("purged orphan configuration pending changes", "removed", removed) } changes, err := s.localDB.GetPendingChanges() if err != nil { return 0, fmt.Errorf("getting pending changes: %w", err) } if len(changes) == 0 { slog.Debug("no pending changes to push") return 0, nil } slog.Info("pushing pending changes", "count", len(changes)) pushed := 0 var syncedIDs []int64 sortedChanges := prioritizeProjectChanges(changes) for _, change := range sortedChanges { err := s.pushSingleChange(&change) if err != nil { slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err) // Increment attempts s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error()) continue } syncedIDs = append(syncedIDs, change.ID) pushed++ } // Mark synced changes as complete by deleting them if len(syncedIDs) > 0 { if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil { slog.Error("failed to mark changes as synced", "error", err) } } slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed) return pushed, nil } // pushSingleChange pushes a single pending change to the server func (s *Service) pushSingleChange(change *localdb.PendingChange) error { switch change.EntityType { case "project": return s.pushProjectChange(change) case "configuration": return s.pushConfigurationChange(change) default: return fmt.Errorf("unknown entity type: %s", change.EntityType) } } func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange { if len(changes) < 2 { return changes } projectChanges := make([]localdb.PendingChange, 0, len(changes)) otherChanges := make([]localdb.PendingChange, 0, len(changes)) for _, change := range changes { if change.EntityType == "project" { projectChanges = append(projectChanges, change) continue } otherChanges = append(otherChanges, change) } sorted := make([]localdb.PendingChange, 0, len(changes)) sorted = append(sorted, projectChanges...) sorted = append(sorted, otherChanges...) return sorted } func (s *Service) pushProjectChange(change *localdb.PendingChange) error { payload, err := decodeProjectChangePayload(change) if err != nil { return fmt.Errorf("decode project payload: %w", err) } mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } projectRepo := repository.NewProjectRepository(mariaDB) project := payload.Snapshot project.UUID = payload.ProjectUUID if err := projectRepo.UpsertByUUID(&project); err != nil { return fmt.Errorf("upsert project on server: %w", err) } localProject, localErr := s.localDB.GetProjectByUUID(project.UUID) if localErr == nil { if project.ID > 0 { serverID := project.ID localProject.ServerID = &serverID } localProject.SyncStatus = "synced" now := time.Now() localProject.SyncedAt = &now _ = s.localDB.SaveProject(localProject) } return nil } func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) { var payload ProjectChangePayload if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" { if payload.Operation == "" { payload.Operation = change.Operation } return payload, nil } var project models.Project if err := json.Unmarshal([]byte(change.Payload), &project); err != nil { return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err) } return ProjectChangePayload{ ProjectUUID: project.UUID, Operation: change.Operation, IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation), Snapshot: project, }, nil } // pushConfigurationChange pushes a configuration change to the server func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error { switch change.Operation { case "create": return s.pushConfigurationCreate(change) case "update": return s.pushConfigurationUpdate(change) case "rollback": return s.pushConfigurationRollback(change) case "deactivate": return s.pushConfigurationDeactivate(change) case "reactivate": return s.pushConfigurationReactivate(change) case "delete": return s.pushConfigurationDelete(change) default: return fmt.Errorf("unknown operation: %s", change.Operation) } } // pushConfigurationCreate creates a configuration on the server func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error { payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change) if err != nil { return err } if isStale { slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey) return nil } // Get database connection mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } // Create repository configRepo := repository.NewConfigurationRepository(mariaDB) if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration owner: %w", err) } if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration project: %w", err) } if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration pricelist: %w", err) } // Create on server if err := configRepo.Create(&cfg); err != nil { // Idempotency fallback: configuration may already be created remotely. serverCfg, getErr := configRepo.GetByUUID(cfg.UUID) if getErr != nil { return fmt.Errorf("creating configuration on server: %w", err) } cfg.ID = serverCfg.ID if updateErr := configRepo.Update(&cfg); updateErr != nil { return fmt.Errorf("create fallback update on server: %w", updateErr) } } // Update local configuration with server ID localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err == nil { serverID := cfg.ID localCfg.ServerID = &serverID localCfg.SyncStatus = "synced" s.localDB.SaveConfiguration(localCfg) } slog.Info("configuration created on server", "uuid", cfg.UUID, "server_id", cfg.ID, "version_no", payload.CurrentVersionNo, "version_id", payload.CurrentVersionID, "idempotency_key", payload.IdempotencyKey, ) return nil } // pushConfigurationUpdate updates a configuration on the server func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error { payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change) if err != nil { return err } if isStale { slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey) return nil } // Get database connection mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } // Create repository configRepo := repository.NewConfigurationRepository(mariaDB) if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration owner: %w", err) } if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration project: %w", err) } if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration pricelist: %w", err) } // Ensure we have a server ID before updating // If the payload doesn't have ID, get it from local configuration if cfg.ID == 0 { localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err != nil { return fmt.Errorf("getting local configuration: %w", err) } if localCfg.ServerID == nil { // Configuration hasn't been synced yet, try to find it on server by UUID. // If not found (e.g. stale create was skipped), create it from current snapshot. serverCfg, getErr := configRepo.GetByUUID(cfg.UUID) if getErr != nil { if !errors.Is(getErr, gorm.ErrRecordNotFound) { return fmt.Errorf("loading configuration from server: %w", getErr) } if createErr := configRepo.Create(&cfg); createErr != nil { // Idempotency fallback: configuration may have been created concurrently. existing, existingErr := configRepo.GetByUUID(cfg.UUID) if existingErr != nil { return fmt.Errorf("creating missing configuration on server: %w", createErr) } cfg.ID = existing.ID } if cfg.ID == 0 { existing, existingErr := configRepo.GetByUUID(cfg.UUID) if existingErr != nil { return fmt.Errorf("loading created configuration from server: %w", existingErr) } cfg.ID = existing.ID } } else { cfg.ID = serverCfg.ID } // Update local with server ID serverID := cfg.ID localCfg.ServerID = &serverID s.localDB.SaveConfiguration(localCfg) } else { cfg.ID = *localCfg.ServerID } } // Update on server if err := configRepo.Update(&cfg); err != nil { return fmt.Errorf("updating configuration on server: %w", err) } // Update local sync status localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err == nil { localCfg.SyncStatus = "synced" s.localDB.SaveConfiguration(localCfg) } slog.Info("configuration updated on server", "uuid", cfg.UUID, "version_no", payload.CurrentVersionNo, "version_id", payload.CurrentVersionID, "idempotency_key", payload.IdempotencyKey, "operation", payload.Operation, "conflict_policy", payload.ConflictPolicy, ) return nil } func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error { // Last-write-wins for now: rollback is pushed as an update with rollback metadata. return s.pushConfigurationUpdate(change) } func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error { // Local deactivate is represented as the latest snapshot push. return s.pushConfigurationUpdate(change) } func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error { // Local reactivate is represented as the latest snapshot push. return s.pushConfigurationUpdate(change) } func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) { payload, err := decodeConfigurationChangePayload(change) if err != nil { return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err) } eventVersionNo := payload.CurrentVersionNo currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID) if err != nil { // Local config may be gone (e.g. stale queue item after delete/cleanup). Treat as no-op. if errors.Is(err, gorm.ErrRecordNotFound) { return payload, payload.Snapshot, true, nil } // create->deactivate race: config may no longer be active/visible locally, skip stale create. if change.Operation == "create" { return payload, payload.Snapshot, true, nil } return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err) } if payload.ConflictPolicy == "" { payload.ConflictPolicy = "last_write_wins" } if currentCfg.UUID != "" { payload.Snapshot = currentCfg if currentVersionID != "" { payload.CurrentVersionID = currentVersionID } if currentVersionNo > 0 { payload.CurrentVersionNo = currentVersionNo } payload.PricelistID = currentCfg.PricelistID } isStale := false if eventVersionNo > 0 && currentVersionNo > eventVersionNo { // Keep only latest intent in queue; older versions become no-op. isStale = true } if !isStale && change.Operation == "create" { localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID) if getErr == nil && !localCfg.IsActive { isStale = true } } return payload, payload.Snapshot, isStale, nil } func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) { var payload ConfigurationChangePayload if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" { if payload.Operation == "" { payload.Operation = change.Operation } return payload, nil } // Backward compatibility: legacy queue stored raw models.Configuration JSON. var cfg models.Configuration if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil { return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err) } return ConfigurationChangePayload{ EventID: "", IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation), ConfigurationUUID: cfg.UUID, ProjectUUID: cfg.ProjectUUID, PricelistID: cfg.PricelistID, Operation: change.Operation, ConflictPolicy: "last_write_wins", Snapshot: cfg, }, nil } func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) { localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID) if err != nil { return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err) } cfg := *localdb.LocalToConfiguration(localCfg) currentVersionID := "" if localCfg.CurrentVersionID != nil { currentVersionID = *localCfg.CurrentVersionID } currentVersionNo := 0 if currentVersionID != "" { var version localdb.LocalConfigurationVersion err = s.localDB.DB(). Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID). First(&version).Error if err == nil { currentVersionNo = version.VersionNo } } if currentVersionNo == 0 { var latest localdb.LocalConfigurationVersion err = s.localDB.DB(). Where("configuration_uuid = ?", configurationUUID). Order("version_no DESC"). First(&latest).Error if err == nil { currentVersionNo = latest.VersionNo currentVersionID = latest.ID } } if currentVersionNo == 0 { return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID) } return cfg, currentVersionID, currentVersionNo, nil } // NOTE: prepared for future conflict resolution: // when server starts storing version metadata, we can compare payload.CurrentVersionNo // against remote version and branch into custom strategies. For now use last-write-wins. // pushConfigurationDelete deletes a configuration from the server func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error { // Get database connection mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } // Create repository configRepo := repository.NewConfigurationRepository(mariaDB) // Get the configuration from server by UUID to get the ID cfg, err := configRepo.GetByUUID(change.EntityUUID) if err != nil { // Already deleted or not found, consider it successful slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID) return nil } // Delete from server if err := configRepo.Delete(cfg.ID); err != nil { return fmt.Errorf("deleting configuration from server: %w", err) } slog.Info("configuration deleted on server", "uuid", change.EntityUUID) return nil } func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configuration) error { if cfg == nil { return fmt.Errorf("configuration is nil") } ownerUsername := cfg.OwnerUsername if ownerUsername == "" { ownerUsername = s.localDB.GetDBUser() cfg.OwnerUsername = ownerUsername } if ownerUsername == "" { return fmt.Errorf("owner username is empty") } // user_id is legacy and no longer used for ownership in local-first mode. // Keep it NULL on writes; ownership is represented by owner_username. cfg.UserID = nil cfg.AppVersion = appmeta.Version() return nil } func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error { if cfg == nil { return fmt.Errorf("configuration is nil") } projectRepo := repository.NewProjectRepository(mariaDB) if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" { _, err := projectRepo.GetByUUID(*cfg.ProjectUUID) if err == nil { return nil } if !errors.Is(err, gorm.ErrRecordNotFound) { return err } localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID) if localErr != nil { return err } modelProject := localdb.LocalToProject(localProject) if modelProject.OwnerUsername == "" { modelProject.OwnerUsername = cfg.OwnerUsername } if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil { return createErr } if modelProject.ID > 0 { serverID := modelProject.ID localProject.ServerID = &serverID localProject.SyncStatus = "synced" now := time.Now() localProject.SyncedAt = &now _ = s.localDB.SaveProject(localProject) } return nil } systemProject := &models.Project{} err := mariaDB. Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true). Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC"). First(systemProject).Error if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { return err } systemProject = &models.Project{ UUID: uuid.NewString(), OwnerUsername: "", Name: "Без проекта", IsActive: true, IsSystem: true, } if createErr := projectRepo.Create(systemProject); createErr != nil { return createErr } } cfg.ProjectUUID = &systemProject.UUID return nil } func (s *Service) ensureConfigurationPricelist(mariaDB *gorm.DB, cfg *models.Configuration) error { if cfg == nil { return fmt.Errorf("configuration is nil") } pricelistRepo := repository.NewPricelistRepository(mariaDB) if cfg.PricelistID != nil && *cfg.PricelistID > 0 { if _, err := pricelistRepo.GetByID(*cfg.PricelistID); err == nil { return nil } } latest, err := pricelistRepo.GetLatestActive() if err != nil { cfg.PricelistID = nil return nil } cfg.PricelistID = &latest.ID return nil }