package sync import ( "encoding/json" "errors" "fmt" "log/slog" "sort" "strings" "time" "git.mchus.pro/mchus/quoteforge/internal/appmeta" "git.mchus.pro/mchus/quoteforge/internal/db" "git.mchus.pro/mchus/quoteforge/internal/localdb" "git.mchus.pro/mchus/quoteforge/internal/models" "git.mchus.pro/mchus/quoteforge/internal/repository" "github.com/google/uuid" "gorm.io/gorm" ) var ErrOffline = errors.New("database is offline") // Service handles synchronization between MariaDB and local SQLite type Service struct { connMgr *db.ConnectionManager localDB *localdb.LocalDB directDB *gorm.DB } // NewService creates a new sync service func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service { return &Service{ connMgr: connMgr, localDB: localDB, } } // NewServiceWithDB creates sync service that uses a direct DB handle (used in tests). func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service { return &Service{ localDB: localDB, directDB: mariaDB, } } // SyncStatus represents the current sync status type SyncStatus struct { LastSyncAt *time.Time `json:"last_sync_at"` ServerPricelists int `json:"server_pricelists"` LocalPricelists int `json:"local_pricelists"` NeedsSync bool `json:"needs_sync"` } type UserSyncStatus struct { Username string `json:"username"` LastSyncAt time.Time `json:"last_sync_at"` AppVersion string `json:"app_version,omitempty"` IsOnline bool `json:"is_online"` } // ConfigImportResult represents server->local configuration import stats. type ConfigImportResult struct { Imported int `json:"imported"` Updated int `json:"updated"` Skipped int `json:"skipped"` } // ProjectImportResult represents server->local project import stats. type ProjectImportResult struct { Imported int `json:"imported"` Updated int `json:"updated"` Skipped int `json:"skipped"` } // ConfigurationChangePayload is stored in pending_changes.payload for configuration events. // It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution. type ConfigurationChangePayload struct { EventID string `json:"event_id"` IdempotencyKey string `json:"idempotency_key"` ConfigurationUUID string `json:"configuration_uuid"` ProjectUUID *string `json:"project_uuid,omitempty"` PricelistID *uint `json:"pricelist_id,omitempty"` Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete CurrentVersionID string `json:"current_version_id,omitempty"` CurrentVersionNo int `json:"current_version_no,omitempty"` ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins Snapshot models.Configuration `json:"snapshot"` CreatedAt time.Time `json:"created_at"` CreatedBy *string `json:"created_by,omitempty"` } type ProjectChangePayload struct { EventID string `json:"event_id"` IdempotencyKey string `json:"idempotency_key"` ProjectUUID string `json:"project_uuid"` Operation string `json:"operation"` Snapshot models.Project `json:"snapshot"` CreatedAt time.Time `json:"created_at"` } // ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite. // Existing local configs with pending local changes are skipped to avoid data loss. func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) { mariaDB, err := s.getDB() if err != nil { return nil, ErrOffline } configRepo := repository.NewConfigurationRepository(mariaDB) result := &ConfigImportResult{} offset := 0 const limit = 200 for { serverConfigs, _, err := configRepo.ListAll(offset, limit) if err != nil { return nil, fmt.Errorf("listing server configurations: %w", err) } if len(serverConfigs) == 0 { break } for i := range serverConfigs { cfg := serverConfigs[i] existing, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return nil, fmt.Errorf("getting local configuration %s: %w", cfg.UUID, err) } if existing != nil && err == nil && existing.SyncStatus == "pending" { result.Skipped++ continue } if existing != nil && err == nil && !existing.IsActive { // Keep local deactivation sticky: do not resurrect hidden entries from server pull. result.Skipped++ continue } localCfg := localdb.ConfigurationToLocal(&cfg) now := time.Now() localCfg.SyncedAt = &now localCfg.SyncStatus = "synced" localCfg.UpdatedAt = now if existing != nil && err == nil { localCfg.ID = existing.ID if localCfg.Line <= 0 && existing.Line > 0 { localCfg.Line = existing.Line } result.Updated++ } else { result.Imported++ } if err := s.localDB.SaveConfiguration(localCfg); err != nil { return nil, fmt.Errorf("saving local configuration %s: %w", cfg.UUID, err) } } offset += len(serverConfigs) } return result, nil } // ImportProjectsToLocal imports projects from MariaDB into local SQLite. // Existing local projects with pending local changes are skipped to avoid data loss. func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) { mariaDB, err := s.getDB() if err != nil { return nil, ErrOffline } projectRepo := repository.NewProjectRepository(mariaDB) result := &ProjectImportResult{} offset := 0 const limit = 200 for { serverProjects, _, err := projectRepo.List(offset, limit, true) if err != nil { return nil, fmt.Errorf("listing server projects: %w", err) } if len(serverProjects) == 0 { break } now := time.Now() for i := range serverProjects { project := serverProjects[i] existing, getErr := s.localDB.GetProjectByUUID(project.UUID) if getErr != nil && !errors.Is(getErr, gorm.ErrRecordNotFound) { return nil, fmt.Errorf("getting local project %s: %w", project.UUID, getErr) } if existing != nil && getErr == nil { // Keep unsynced local changes intact. if existing.SyncStatus == "pending" { result.Skipped++ continue } existing.OwnerUsername = project.OwnerUsername existing.Code = project.Code existing.Name = project.Name existing.TrackerURL = project.TrackerURL existing.IsActive = project.IsActive existing.IsSystem = project.IsSystem existing.CreatedAt = project.CreatedAt existing.UpdatedAt = project.UpdatedAt serverID := project.ID existing.ServerID = &serverID existing.SyncStatus = "synced" existing.SyncedAt = &now if err := s.localDB.SaveProject(existing); err != nil { return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err) } result.Updated++ continue } localProject := localdb.ProjectToLocal(&project) localProject.SyncStatus = "synced" localProject.SyncedAt = &now if err := s.localDB.SaveProject(localProject); err != nil { return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err) } result.Imported++ } offset += len(serverProjects) } return result, nil } // GetStatus returns the current sync status func (s *Service) GetStatus() (*SyncStatus, error) { lastSync := s.localDB.GetLastSyncTime() // Count server pricelists (only if already connected, don't reconnect) serverCount := 0 connStatus := s.getConnectionStatus() if connStatus.IsConnected { if mariaDB, err := s.getDB(); err == nil && mariaDB != nil { pricelistRepo := repository.NewPricelistRepository(mariaDB) activeCount, err := pricelistRepo.CountActive() if err == nil { serverCount = int(activeCount) } } } // Count local pricelists localCount := s.localDB.CountLocalPricelists() needsSync, _ := s.NeedSync() return &SyncStatus{ LastSyncAt: lastSync, ServerPricelists: serverCount, LocalPricelists: int(localCount), NeedsSync: needsSync, }, nil } // NeedSync checks if synchronization is needed // Returns true if there are new pricelists on server or last sync was >1 hour ago func (s *Service) NeedSync() (bool, error) { lastSync := s.localDB.GetLastSyncTime() // If never synced, need sync if lastSync == nil { return true, nil } // If last sync was more than 1 hour ago, suggest sync if time.Since(*lastSync) > time.Hour { return true, nil } // Check if there are new pricelists on server (only if already connected) connStatus := s.getConnectionStatus() if !connStatus.IsConnected { // If offline, can't check server, no need to sync return false, nil } mariaDB, err := s.getDB() if err != nil { // If offline, can't check server, no need to sync return false, nil } pricelistRepo := repository.NewPricelistRepository(mariaDB) sources := []models.PricelistSource{ models.PricelistSourceEstimate, models.PricelistSourceWarehouse, models.PricelistSourceCompetitor, } for _, source := range sources { latestServer, err := pricelistRepo.GetLatestActiveBySource(string(source)) if err != nil { // No active pricelist for this source yet. continue } latestLocal, err := s.localDB.GetLatestLocalPricelistBySource(string(source)) if err != nil { // No local pricelist for an existing source on server. return true, nil } // If server has newer pricelist for this source, need sync. if latestServer.ID != latestLocal.ServerID { return true, nil } } return false, nil } // SyncPricelists synchronizes all active pricelists from server to local SQLite func (s *Service) SyncPricelists() (int, error) { slog.Info("starting pricelist sync") if _, err := s.EnsureReadinessForSync(); err != nil { return 0, err } // Get database connection mariaDB, err := s.getDB() if err != nil { return 0, fmt.Errorf("database not available: %w", err) } // Create repository pricelistRepo := repository.NewPricelistRepository(mariaDB) // Get active pricelists from server (up to 100) serverPricelists, _, err := pricelistRepo.ListActive(0, 100) if err != nil { return 0, fmt.Errorf("getting active server pricelists: %w", err) } serverPricelistIDs := make([]uint, 0, len(serverPricelists)) for i := range serverPricelists { serverPricelistIDs = append(serverPricelistIDs, serverPricelists[i].ID) } synced := 0 for _, pl := range serverPricelists { // Check if pricelist already exists locally existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID) if existing != nil { continue } // Create local pricelist localPL := &localdb.LocalPricelist{ ServerID: pl.ID, Source: pl.Source, Version: pl.Version, Name: pl.Notification, // Using notification as name CreatedAt: pl.CreatedAt, SyncedAt: time.Now(), IsUsed: false, } if err := s.localDB.SaveLocalPricelist(localPL); err != nil { slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err) continue } // Sync items for the newly created pricelist itemCount, err := s.SyncPricelistItems(localPL.ID) if err != nil { slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err) // Continue even if items sync fails - we have the pricelist metadata } else { slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount) } synced++ } removed, err := s.localDB.DeleteUnusedLocalPricelistsMissingOnServer(serverPricelistIDs) if err != nil { slog.Warn("failed to cleanup stale local pricelists", "error", err) } else if removed > 0 { slog.Info("deleted stale local pricelists", "deleted", removed) } // Backfill lot_category for used pricelists (older local caches may miss the column values). s.backfillUsedPricelistItemCategories(pricelistRepo, serverPricelistIDs) // Update last sync time s.localDB.SetLastSyncTime(time.Now()) s.RecordSyncHeartbeat() slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists)) return synced, nil } func (s *Service) backfillUsedPricelistItemCategories(pricelistRepo *repository.PricelistRepository, activeServerPricelistIDs []uint) { if s.localDB == nil || pricelistRepo == nil { return } activeSet := make(map[uint]struct{}, len(activeServerPricelistIDs)) for _, id := range activeServerPricelistIDs { activeSet[id] = struct{}{} } type row struct { ID uint `gorm:"column:id"` } var usedRows []row if err := s.localDB.DB().Raw(` SELECT DISTINCT pricelist_id AS id FROM local_configurations WHERE is_active = 1 AND pricelist_id IS NOT NULL UNION SELECT DISTINCT warehouse_pricelist_id AS id FROM local_configurations WHERE is_active = 1 AND warehouse_pricelist_id IS NOT NULL UNION SELECT DISTINCT competitor_pricelist_id AS id FROM local_configurations WHERE is_active = 1 AND competitor_pricelist_id IS NOT NULL `).Scan(&usedRows).Error; err != nil { slog.Warn("pricelist category backfill: failed to list used pricelists", "error", err) return } for _, r := range usedRows { serverID := r.ID if serverID == 0 { continue } if _, ok := activeSet[serverID]; !ok { // Not present on server (or not active) - cannot backfill from remote. continue } localPL, err := s.localDB.GetLocalPricelistByServerID(serverID) if err != nil || localPL == nil { continue } if s.localDB.CountLocalPricelistItems(localPL.ID) == 0 { continue } missing, err := s.localDB.CountLocalPricelistItemsWithEmptyCategory(localPL.ID) if err != nil { slog.Warn("pricelist category backfill: failed to check local items", "server_id", serverID, "error", err) continue } if missing == 0 { continue } serverItems, _, err := pricelistRepo.GetItems(serverID, 0, 10000, "") if err != nil { slog.Warn("pricelist category backfill: failed to load server items", "server_id", serverID, "error", err) continue } localItems := make([]localdb.LocalPricelistItem, len(serverItems)) for i := range serverItems { localItems[i] = *localdb.PricelistItemToLocal(&serverItems[i], localPL.ID) } if err := s.localDB.ReplaceLocalPricelistItems(localPL.ID, localItems); err != nil { slog.Warn("pricelist category backfill: failed to replace local items", "server_id", serverID, "error", err) continue } slog.Info("pricelist category backfill: refreshed local items", "server_id", serverID, "items", len(localItems)) } } // RecordSyncHeartbeat updates shared sync heartbeat for current DB user. // Only users with write rights are expected to be able to update this table. func (s *Service) RecordSyncHeartbeat() { username := strings.TrimSpace(s.localDB.GetDBUser()) if username == "" { return } mariaDB, err := s.getDB() if err != nil || mariaDB == nil { return } if err := ensureUserSyncStatusTable(mariaDB); err != nil { slog.Warn("sync heartbeat: failed to ensure table", "error", err) return } now := time.Now().UTC() if err := mariaDB.Exec(` INSERT INTO qt_pricelist_sync_status (username, last_sync_at, updated_at, app_version) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE last_sync_at = VALUES(last_sync_at), updated_at = VALUES(updated_at), app_version = VALUES(app_version) `, username, now, now, appmeta.Version()).Error; err != nil { slog.Debug("sync heartbeat: skipped", "username", username, "error", err) } } // ListUserSyncStatuses returns users who have recorded sync heartbeat. func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyncStatus, error) { mariaDB, err := s.getDB() if err != nil || mariaDB == nil { return nil, ErrOffline } if err := ensureUserSyncStatusTable(mariaDB); err != nil { return nil, fmt.Errorf("ensure sync status table: %w", err) } type row struct { Username string `gorm:"column:username"` LastSyncAt time.Time `gorm:"column:last_sync_at"` AppVersion string `gorm:"column:app_version"` } var rows []row if err := mariaDB.Raw(` SELECT username, last_sync_at, COALESCE(app_version, '') AS app_version FROM qt_pricelist_sync_status ORDER BY last_sync_at DESC, username ASC `).Scan(&rows).Error; err != nil { return nil, fmt.Errorf("load sync status rows: %w", err) } activeUsers, err := s.listConnectedDBUsers(mariaDB) if err != nil { slog.Debug("sync status: failed to load connected DB users", "error", err) activeUsers = map[string]struct{}{} } now := time.Now().UTC() result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers)) for i := range rows { r := rows[i] username := strings.TrimSpace(r.Username) if username == "" { continue } isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold if _, connected := activeUsers[username]; connected { isOnline = true delete(activeUsers, username) } appVersion := strings.TrimSpace(r.AppVersion) result = append(result, UserSyncStatus{ Username: username, LastSyncAt: r.LastSyncAt, AppVersion: appVersion, IsOnline: isOnline, }) } for username := range activeUsers { result = append(result, UserSyncStatus{ Username: username, LastSyncAt: now, AppVersion: "", IsOnline: true, }) } sort.SliceStable(result, func(i, j int) bool { if result[i].IsOnline != result[j].IsOnline { return result[i].IsOnline } if result[i].LastSyncAt.Equal(result[j].LastSyncAt) { return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username) } return result[i].LastSyncAt.After(result[j].LastSyncAt) }) return result, nil } func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) { type processUserRow struct { Username string `gorm:"column:username"` } var rows []processUserRow if err := mariaDB.Raw(` SELECT DISTINCT TRIM(USER) AS username FROM information_schema.PROCESSLIST WHERE COALESCE(TRIM(USER), '') <> '' AND DB = DATABASE() `).Scan(&rows).Error; err != nil { return nil, err } users := make(map[string]struct{}, len(rows)) for i := range rows { username := strings.TrimSpace(rows[i].Username) if username == "" { continue } users[username] = struct{}{} } return users, nil } func ensureUserSyncStatusTable(db *gorm.DB) error { // Check if table exists instead of trying to create (avoids permission issues) if !tableExists(db, "qt_pricelist_sync_status") { if err := db.Exec(` CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status ( username VARCHAR(100) NOT NULL, last_sync_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, app_version VARCHAR(64) NULL, PRIMARY KEY (username), INDEX idx_qt_pricelist_sync_status_last_sync (last_sync_at) ) `).Error; err != nil { return fmt.Errorf("create qt_pricelist_sync_status table: %w", err) } } // Backward compatibility for environments where table was created without app_version. // Only try to add column if table exists. if tableExists(db, "qt_pricelist_sync_status") { if err := db.Exec(` ALTER TABLE qt_pricelist_sync_status ADD COLUMN IF NOT EXISTS app_version VARCHAR(64) NULL `).Error; err != nil { // Log but don't fail if alter fails (column might already exist) slog.Debug("failed to add app_version column", "error", err) } } return nil } // SyncPricelistItems synchronizes items for a specific pricelist func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) { // Get local pricelist localPL, err := s.localDB.GetLocalPricelistByID(localPricelistID) if err != nil { return 0, fmt.Errorf("getting local pricelist: %w", err) } // Check if items already exist existingCount := s.localDB.CountLocalPricelistItems(localPricelistID) if existingCount > 0 { slog.Debug("pricelist items already synced", "pricelist_id", localPricelistID, "count", existingCount) return int(existingCount), nil } // Get database connection mariaDB, err := s.getDB() if err != nil { return 0, fmt.Errorf("database not available: %w", err) } // Create repository pricelistRepo := repository.NewPricelistRepository(mariaDB) // Get items from server serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "") if err != nil { return 0, fmt.Errorf("getting server pricelist items: %w", err) } // Convert and save locally localItems := make([]localdb.LocalPricelistItem, len(serverItems)) for i, item := range serverItems { localItems[i] = *localdb.PricelistItemToLocal(&item, localPricelistID) } if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil { return 0, fmt.Errorf("saving local pricelist items: %w", err) } slog.Info("synced pricelist items", "pricelist_id", localPricelistID, "items", len(localItems)) return len(localItems), nil } // SyncPricelistItemsByServerID syncs items for a pricelist by its server ID func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) { localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID) if err != nil { return 0, fmt.Errorf("local pricelist not found for server ID %d", serverPricelistID) } return s.SyncPricelistItems(localPL.ID) } // GetLocalPriceForLot returns the price for a lot from a local pricelist func (s *Service) GetLocalPriceForLot(localPricelistID uint, lotName string) (float64, error) { return s.localDB.GetLocalPriceForLot(localPricelistID, lotName) } // GetPricelistForOffline returns a pricelist suitable for offline use // If items are not synced, it will sync them first func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.LocalPricelist, error) { // Ensure pricelist is synced localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID) if err != nil { // Try to sync pricelists first if _, err := s.SyncPricelists(); err != nil { return nil, fmt.Errorf("syncing pricelists: %w", err) } // Try again localPL, err = s.localDB.GetLocalPricelistByServerID(serverPricelistID) if err != nil { return nil, fmt.Errorf("pricelist not found on server: %w", err) } } // Ensure items are synced if _, err := s.SyncPricelistItems(localPL.ID); err != nil { return nil, fmt.Errorf("syncing pricelist items: %w", err) } return localPL, nil } // SyncPricelistsIfNeeded checks for new pricelists and syncs if needed // This should be called before creating a new configuration when online func (s *Service) SyncPricelistsIfNeeded() error { needSync, err := s.NeedSync() if err != nil { slog.Warn("failed to check if sync needed", "error", err) return nil // Don't fail on check error } if !needSync { slog.Debug("pricelists are up to date, no sync needed") return nil } slog.Info("new pricelists detected, syncing...") _, err = s.SyncPricelists() if err != nil { return fmt.Errorf("syncing pricelists: %w", err) } return nil } // PushPendingChanges pushes all pending changes to the server func (s *Service) PushPendingChanges() (int, error) { if _, err := s.EnsureReadinessForSync(); err != nil { return 0, err } removed, err := s.localDB.PurgeOrphanConfigurationPendingChanges() if err != nil { slog.Warn("failed to purge orphan configuration pending changes", "error", err) } else if removed > 0 { slog.Info("purged orphan configuration pending changes", "removed", removed) } changes, err := s.localDB.GetPendingChanges() if err != nil { return 0, fmt.Errorf("getting pending changes: %w", err) } if len(changes) == 0 { slog.Debug("no pending changes to push") return 0, nil } slog.Info("pushing pending changes", "count", len(changes)) pushed := 0 var syncedIDs []int64 sortedChanges := prioritizeProjectChanges(changes) for _, change := range sortedChanges { err := s.pushSingleChange(&change) if err != nil { slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err) // Increment attempts s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error()) continue } syncedIDs = append(syncedIDs, change.ID) pushed++ } // Mark synced changes as complete by deleting them if len(syncedIDs) > 0 { if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil { slog.Error("failed to mark changes as synced", "error", err) } } slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed) return pushed, nil } // pushSingleChange pushes a single pending change to the server func (s *Service) pushSingleChange(change *localdb.PendingChange) error { switch change.EntityType { case "project": return s.pushProjectChange(change) case "configuration": return s.pushConfigurationChange(change) default: return fmt.Errorf("unknown entity type: %s", change.EntityType) } } func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange { if len(changes) < 2 { return changes } projectChanges := make([]localdb.PendingChange, 0, len(changes)) otherChanges := make([]localdb.PendingChange, 0, len(changes)) for _, change := range changes { if change.EntityType == "project" { projectChanges = append(projectChanges, change) continue } otherChanges = append(otherChanges, change) } sorted := make([]localdb.PendingChange, 0, len(changes)) sorted = append(sorted, projectChanges...) sorted = append(sorted, otherChanges...) return sorted } func (s *Service) pushProjectChange(change *localdb.PendingChange) error { payload, err := decodeProjectChangePayload(change) if err != nil { return fmt.Errorf("decode project payload: %w", err) } mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } projectRepo := repository.NewProjectRepository(mariaDB) project := payload.Snapshot project.UUID = payload.ProjectUUID if strings.TrimSpace(project.Code) == "" { project.Code = strings.TrimSpace(derefString(project.Name)) if project.Code == "" { project.Code = project.UUID } } // Try upsert by UUID first err = projectRepo.UpsertByUUID(&project) if err != nil { // Check if it's a duplicate (code, variant) constraint violation // In this case, find existing project with same (code, variant) and link to it var existing models.Project lookupErr := mariaDB.Where("code = ? AND variant = ?", project.Code, project.Variant).First(&existing).Error if lookupErr == nil { // Found duplicate - link local project to existing server project slog.Info("project duplicate found, linking to existing", "local_uuid", project.UUID, "server_uuid", existing.UUID, "server_id", existing.ID, "code", project.Code, "variant", project.Variant) project.ID = existing.ID } else { // Not a duplicate issue, return original error return fmt.Errorf("upsert project on server: %w", err) } } // Update local project with server ID localProject, localErr := s.localDB.GetProjectByUUID(project.UUID) if localErr == nil { if project.ID > 0 { serverID := project.ID localProject.ServerID = &serverID } localProject.SyncStatus = "synced" now := time.Now() localProject.SyncedAt = &now _ = s.localDB.SaveProject(localProject) } return nil } func derefString(value *string) string { if value == nil { return "" } return *value } func ptrString(value string) *string { return &value } func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) { var payload ProjectChangePayload if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" { if payload.Operation == "" { payload.Operation = change.Operation } return payload, nil } var project models.Project if err := json.Unmarshal([]byte(change.Payload), &project); err != nil { return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err) } return ProjectChangePayload{ ProjectUUID: project.UUID, Operation: change.Operation, IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation), Snapshot: project, }, nil } // pushConfigurationChange pushes a configuration change to the server func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error { switch change.Operation { case "create": return s.pushConfigurationCreate(change) case "update": return s.pushConfigurationUpdate(change) case "rollback": return s.pushConfigurationRollback(change) case "deactivate": return s.pushConfigurationDeactivate(change) case "reactivate": return s.pushConfigurationReactivate(change) case "delete": return s.pushConfigurationDelete(change) default: return fmt.Errorf("unknown operation: %s", change.Operation) } } // pushConfigurationCreate creates a configuration on the server func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error { payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change) if err != nil { return err } if isStale { slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey) return nil } // Get database connection mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } // Create repository configRepo := repository.NewConfigurationRepository(mariaDB) if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration owner: %w", err) } if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration project: %w", err) } if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration pricelist: %w", err) } // Create on server if err := configRepo.Create(&cfg); err != nil { // Idempotency fallback: configuration may already be created remotely. serverCfg, getErr := configRepo.GetByUUID(cfg.UUID) if getErr != nil { return fmt.Errorf("creating configuration on server: %w", err) } cfg.ID = serverCfg.ID if updateErr := configRepo.Update(&cfg); updateErr != nil { return fmt.Errorf("create fallback update on server: %w", updateErr) } } // Update local configuration with server ID localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err == nil { serverID := cfg.ID localCfg.ServerID = &serverID localCfg.SyncStatus = "synced" s.localDB.SaveConfiguration(localCfg) } slog.Info("configuration created on server", "uuid", cfg.UUID, "server_id", cfg.ID, "version_no", payload.CurrentVersionNo, "version_id", payload.CurrentVersionID, "idempotency_key", payload.IdempotencyKey, ) return nil } // pushConfigurationUpdate updates a configuration on the server func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error { payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change) if err != nil { return err } if isStale { slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey) return nil } // Get database connection mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } // Create repository configRepo := repository.NewConfigurationRepository(mariaDB) if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration owner: %w", err) } if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration project: %w", err) } if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil { return fmt.Errorf("resolve configuration pricelist: %w", err) } // Ensure we have a server ID before updating // If the payload doesn't have ID, get it from local configuration if cfg.ID == 0 { localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err != nil { return fmt.Errorf("getting local configuration: %w", err) } if localCfg.ServerID == nil { // Configuration hasn't been synced yet, try to find it on server by UUID. // If not found (e.g. stale create was skipped), create it from current snapshot. serverCfg, getErr := configRepo.GetByUUID(cfg.UUID) if getErr != nil { if !errors.Is(getErr, gorm.ErrRecordNotFound) { return fmt.Errorf("loading configuration from server: %w", getErr) } if createErr := configRepo.Create(&cfg); createErr != nil { // Idempotency fallback: configuration may have been created concurrently. existing, existingErr := configRepo.GetByUUID(cfg.UUID) if existingErr != nil { return fmt.Errorf("creating missing configuration on server: %w", createErr) } cfg.ID = existing.ID } if cfg.ID == 0 { existing, existingErr := configRepo.GetByUUID(cfg.UUID) if existingErr != nil { return fmt.Errorf("loading created configuration from server: %w", existingErr) } cfg.ID = existing.ID } } else { cfg.ID = serverCfg.ID } // Update local with server ID serverID := cfg.ID localCfg.ServerID = &serverID s.localDB.SaveConfiguration(localCfg) } else { cfg.ID = *localCfg.ServerID } } // Update on server if err := configRepo.Update(&cfg); err != nil { return fmt.Errorf("updating configuration on server: %w", err) } // Update local sync status localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID) if err == nil { localCfg.SyncStatus = "synced" s.localDB.SaveConfiguration(localCfg) } slog.Info("configuration updated on server", "uuid", cfg.UUID, "version_no", payload.CurrentVersionNo, "version_id", payload.CurrentVersionID, "idempotency_key", payload.IdempotencyKey, "operation", payload.Operation, "conflict_policy", payload.ConflictPolicy, ) return nil } func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configuration) error { if cfg == nil { return fmt.Errorf("configuration is nil") } ownerUsername := cfg.OwnerUsername if ownerUsername == "" { ownerUsername = s.localDB.GetDBUser() cfg.OwnerUsername = ownerUsername } if ownerUsername == "" { return fmt.Errorf("owner username is empty") } // user_id is legacy and no longer used for ownership in local-first mode. // Keep it NULL on writes; ownership is represented by owner_username. cfg.UserID = nil cfg.AppVersion = appmeta.Version() return nil } func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error { if cfg == nil { return fmt.Errorf("configuration is nil") } projectRepo := repository.NewProjectRepository(mariaDB) if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" { _, err := projectRepo.GetByUUID(*cfg.ProjectUUID) if err == nil { return nil } if !errors.Is(err, gorm.ErrRecordNotFound) { return err } localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID) if localErr != nil { return err } modelProject := localdb.LocalToProject(localProject) if modelProject.OwnerUsername == "" { modelProject.OwnerUsername = cfg.OwnerUsername } if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil { return createErr } if modelProject.ID > 0 { serverID := modelProject.ID localProject.ServerID = &serverID localProject.SyncStatus = "synced" now := time.Now() localProject.SyncedAt = &now _ = s.localDB.SaveProject(localProject) } return nil } systemProject := &models.Project{} err := mariaDB. Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true). Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC"). First(systemProject).Error if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { return err } systemProject = &models.Project{ UUID: uuid.NewString(), OwnerUsername: "", Code: "Без проекта", Name: ptrString("Без проекта"), IsActive: true, IsSystem: true, } if createErr := projectRepo.Create(systemProject); createErr != nil { return createErr } } cfg.ProjectUUID = &systemProject.UUID return nil } func (s *Service) ensureConfigurationPricelist(mariaDB *gorm.DB, cfg *models.Configuration) error { if cfg == nil { return fmt.Errorf("configuration is nil") } pricelistRepo := repository.NewPricelistRepository(mariaDB) if cfg.PricelistID != nil && *cfg.PricelistID > 0 { if _, err := pricelistRepo.GetByID(*cfg.PricelistID); err == nil { return nil } } latest, err := pricelistRepo.GetLatestActive() if err != nil { cfg.PricelistID = nil return nil } cfg.PricelistID = &latest.ID return nil } func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error { // Last-write-wins for now: rollback is pushed as an update with rollback metadata. return s.pushConfigurationUpdate(change) } func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error { // Local deactivate is represented as the latest snapshot push. return s.pushConfigurationUpdate(change) } func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error { // Local reactivate is represented as the latest snapshot push. return s.pushConfigurationUpdate(change) } func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) { payload, err := decodeConfigurationChangePayload(change) if err != nil { return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err) } eventVersionNo := payload.CurrentVersionNo currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID) if err != nil { // Local config may be gone (e.g. stale queue item after delete/cleanup). Treat as no-op. if errors.Is(err, gorm.ErrRecordNotFound) { return payload, payload.Snapshot, true, nil } // create->deactivate race: config may no longer be active/visible locally, skip stale create. if change.Operation == "create" { return payload, payload.Snapshot, true, nil } return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err) } if payload.ConflictPolicy == "" { payload.ConflictPolicy = "last_write_wins" } if currentCfg.UUID != "" { payload.Snapshot = currentCfg if currentVersionID != "" { payload.CurrentVersionID = currentVersionID } if currentVersionNo > 0 { payload.CurrentVersionNo = currentVersionNo } payload.PricelistID = currentCfg.PricelistID } isStale := false if eventVersionNo > 0 && currentVersionNo > eventVersionNo { // Keep only latest intent in queue; older versions become no-op. isStale = true } if !isStale && change.Operation == "create" { localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID) if getErr == nil && !localCfg.IsActive { isStale = true } } return payload, payload.Snapshot, isStale, nil } func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) { var payload ConfigurationChangePayload if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" { if payload.Operation == "" { payload.Operation = change.Operation } return payload, nil } // Backward compatibility: legacy queue stored raw models.Configuration JSON. var cfg models.Configuration if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil { return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err) } return ConfigurationChangePayload{ EventID: "", IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation), ConfigurationUUID: cfg.UUID, ProjectUUID: cfg.ProjectUUID, PricelistID: cfg.PricelistID, Operation: change.Operation, ConflictPolicy: "last_write_wins", Snapshot: cfg, }, nil } func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) { localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID) if err != nil { return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err) } cfg := *localdb.LocalToConfiguration(localCfg) currentVersionID := "" if localCfg.CurrentVersionID != nil { currentVersionID = *localCfg.CurrentVersionID } currentVersionNo := 0 if currentVersionID != "" { var version localdb.LocalConfigurationVersion err = s.localDB.DB(). Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID). First(&version).Error if err == nil { currentVersionNo = version.VersionNo } } if currentVersionNo == 0 { var latest localdb.LocalConfigurationVersion err = s.localDB.DB(). Where("configuration_uuid = ?", configurationUUID). Order("version_no DESC"). First(&latest).Error if err == nil { currentVersionNo = latest.VersionNo currentVersionID = latest.ID } } if currentVersionNo == 0 { if err := s.repairMissingConfigurationVersion(localCfg); err != nil { return models.Configuration{}, "", 0, fmt.Errorf("repair missing configuration version: %w", err) } var latest localdb.LocalConfigurationVersion err = s.localDB.DB(). Where("configuration_uuid = ?", configurationUUID). Order("version_no DESC"). First(&latest).Error if err == nil { currentVersionNo = latest.VersionNo currentVersionID = latest.ID } } if currentVersionNo == 0 { return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID) } return cfg, currentVersionID, currentVersionNo, nil } func (s *Service) repairMissingConfigurationVersion(localCfg *localdb.LocalConfiguration) error { if localCfg == nil { return fmt.Errorf("local configuration is nil") } return s.localDB.DB().Transaction(func(tx *gorm.DB) error { var cfg localdb.LocalConfiguration if err := tx.Where("uuid = ?", localCfg.UUID).First(&cfg).Error; err != nil { return fmt.Errorf("load local configuration: %w", err) } // If versions exist, just make sure current_version_id is set. var latest localdb.LocalConfigurationVersion if err := tx.Where("configuration_uuid = ?", cfg.UUID). Order("version_no DESC"). First(&latest).Error; err == nil { if cfg.CurrentVersionID == nil || *cfg.CurrentVersionID == "" { if err := tx.Model(&localdb.LocalConfiguration{}). Where("uuid = ?", cfg.UUID). Update("current_version_id", latest.ID).Error; err != nil { return fmt.Errorf("set current version id: %w", err) } } return nil } else if !errors.Is(err, gorm.ErrRecordNotFound) { return fmt.Errorf("load latest version: %w", err) } snapshot, err := localdb.BuildConfigurationSnapshot(&cfg) if err != nil { return fmt.Errorf("build configuration snapshot: %w", err) } note := "Auto-repaired missing local version" version := localdb.LocalConfigurationVersion{ ID: uuid.NewString(), ConfigurationUUID: cfg.UUID, VersionNo: 1, Data: snapshot, ChangeNote: ¬e, AppVersion: appmeta.Version(), CreatedAt: time.Now(), } if err := tx.Create(&version).Error; err != nil { return fmt.Errorf("create initial version: %w", err) } if err := tx.Model(&localdb.LocalConfiguration{}). Where("uuid = ?", cfg.UUID). Update("current_version_id", version.ID).Error; err != nil { return fmt.Errorf("set current version id: %w", err) } slog.Warn("repaired missing local configuration version", "uuid", cfg.UUID, "version_no", version.VersionNo) return nil }) } // NOTE: prepared for future conflict resolution: // when server starts storing version metadata, we can compare payload.CurrentVersionNo // against remote version and branch into custom strategies. For now use last-write-wins. // pushConfigurationDelete deletes a configuration from the server func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error { // Get database connection mariaDB, err := s.getDB() if err != nil { return fmt.Errorf("database not available: %w", err) } // Create repository configRepo := repository.NewConfigurationRepository(mariaDB) // Get the configuration from server by UUID to get the ID cfg, err := configRepo.GetByUUID(change.EntityUUID) if err != nil { // Already deleted or not found, consider it successful slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID) return nil } // Delete from server if err := configRepo.Delete(cfg.ID); err != nil { return fmt.Errorf("deleting configuration from server: %w", err) } slog.Info("configuration deleted on server", "uuid", change.EntityUUID) return nil } func (s *Service) getDB() (*gorm.DB, error) { if s.directDB != nil { return s.directDB, nil } if s.connMgr == nil { return nil, ErrOffline } return s.connMgr.GetDB() } func (s *Service) getConnectionStatus() db.ConnectionStatus { if s.directDB != nil { return db.ConnectionStatus{IsConnected: true} } if s.connMgr == nil { return db.ConnectionStatus{IsConnected: false} } return s.connMgr.GetStatus() }