Files
QuoteForge/internal/services/sync/service.go
2026-02-27 10:11:20 +03:00

1489 lines
45 KiB
Go

package sync
import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"sort"
"strings"
"time"
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
"git.mchus.pro/mchus/quoteforge/internal/db"
"git.mchus.pro/mchus/quoteforge/internal/localdb"
"git.mchus.pro/mchus/quoteforge/internal/models"
"git.mchus.pro/mchus/quoteforge/internal/repository"
"github.com/google/uuid"
"gorm.io/gorm"
)
var ErrOffline = errors.New("database is offline")
// Service handles synchronization between MariaDB and local SQLite
type Service struct {
connMgr *db.ConnectionManager
localDB *localdb.LocalDB
directDB *gorm.DB
}
// NewService creates a new sync service
func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service {
return &Service{
connMgr: connMgr,
localDB: localDB,
}
}
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
return &Service{
localDB: localDB,
directDB: mariaDB,
}
}
// SyncStatus represents the current sync status
type SyncStatus struct {
LastSyncAt *time.Time `json:"last_sync_at"`
ServerPricelists int `json:"server_pricelists"`
LocalPricelists int `json:"local_pricelists"`
NeedsSync bool `json:"needs_sync"`
}
type UserSyncStatus struct {
Username string `json:"username"`
LastSyncAt time.Time `json:"last_sync_at"`
AppVersion string `json:"app_version,omitempty"`
IsOnline bool `json:"is_online"`
}
// ConfigImportResult represents server->local configuration import stats.
type ConfigImportResult struct {
Imported int `json:"imported"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
}
// ProjectImportResult represents server->local project import stats.
type ProjectImportResult struct {
Imported int `json:"imported"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
}
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
type ConfigurationChangePayload struct {
EventID string `json:"event_id"`
IdempotencyKey string `json:"idempotency_key"`
ConfigurationUUID string `json:"configuration_uuid"`
ProjectUUID *string `json:"project_uuid,omitempty"`
PricelistID *uint `json:"pricelist_id,omitempty"`
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
CurrentVersionID string `json:"current_version_id,omitempty"`
CurrentVersionNo int `json:"current_version_no,omitempty"`
ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins
Snapshot models.Configuration `json:"snapshot"`
CreatedAt time.Time `json:"created_at"`
CreatedBy *string `json:"created_by,omitempty"`
}
type ProjectChangePayload struct {
EventID string `json:"event_id"`
IdempotencyKey string `json:"idempotency_key"`
ProjectUUID string `json:"project_uuid"`
Operation string `json:"operation"`
Snapshot models.Project `json:"snapshot"`
CreatedAt time.Time `json:"created_at"`
}
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
// Existing local configs with pending local changes are skipped to avoid data loss.
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
mariaDB, err := s.getDB()
if err != nil {
return nil, ErrOffline
}
configRepo := repository.NewConfigurationRepository(mariaDB)
result := &ConfigImportResult{}
offset := 0
const limit = 200
for {
serverConfigs, _, err := configRepo.ListAll(offset, limit)
if err != nil {
return nil, fmt.Errorf("listing server configurations: %w", err)
}
if len(serverConfigs) == 0 {
break
}
for i := range serverConfigs {
cfg := serverConfigs[i]
existing, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("getting local configuration %s: %w", cfg.UUID, err)
}
if existing != nil && err == nil && existing.SyncStatus == "pending" {
result.Skipped++
continue
}
if existing != nil && err == nil && !existing.IsActive {
// Keep local deactivation sticky: do not resurrect hidden entries from server pull.
result.Skipped++
continue
}
localCfg := localdb.ConfigurationToLocal(&cfg)
now := time.Now()
localCfg.SyncedAt = &now
localCfg.SyncStatus = "synced"
localCfg.UpdatedAt = now
if existing != nil && err == nil {
localCfg.ID = existing.ID
if localCfg.Line <= 0 && existing.Line > 0 {
localCfg.Line = existing.Line
}
// vendor_spec is local-only for BOM tab and is not stored on server.
// Preserve it during server pull updates.
localCfg.VendorSpec = existing.VendorSpec
result.Updated++
} else {
result.Imported++
}
if err := s.localDB.SaveConfiguration(localCfg); err != nil {
return nil, fmt.Errorf("saving local configuration %s: %w", cfg.UUID, err)
}
}
offset += len(serverConfigs)
}
return result, nil
}
// ImportProjectsToLocal imports projects from MariaDB into local SQLite.
// Existing local projects with pending local changes are skipped to avoid data loss.
func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) {
mariaDB, err := s.getDB()
if err != nil {
return nil, ErrOffline
}
projectRepo := repository.NewProjectRepository(mariaDB)
result := &ProjectImportResult{}
offset := 0
const limit = 200
for {
serverProjects, _, err := projectRepo.List(offset, limit, true)
if err != nil {
return nil, fmt.Errorf("listing server projects: %w", err)
}
if len(serverProjects) == 0 {
break
}
now := time.Now()
for i := range serverProjects {
project := serverProjects[i]
existing, getErr := s.localDB.GetProjectByUUID(project.UUID)
if getErr != nil && !errors.Is(getErr, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("getting local project %s: %w", project.UUID, getErr)
}
if existing != nil && getErr == nil {
// Keep unsynced local changes intact.
if existing.SyncStatus == "pending" {
result.Skipped++
continue
}
existing.OwnerUsername = project.OwnerUsername
existing.Code = project.Code
existing.Name = project.Name
existing.TrackerURL = project.TrackerURL
existing.IsActive = project.IsActive
existing.IsSystem = project.IsSystem
existing.CreatedAt = project.CreatedAt
existing.UpdatedAt = project.UpdatedAt
serverID := project.ID
existing.ServerID = &serverID
existing.SyncStatus = "synced"
existing.SyncedAt = &now
if err := s.localDB.SaveProject(existing); err != nil {
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
}
result.Updated++
continue
}
localProject := localdb.ProjectToLocal(&project)
localProject.SyncStatus = "synced"
localProject.SyncedAt = &now
if err := s.localDB.SaveProject(localProject); err != nil {
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
}
result.Imported++
}
offset += len(serverProjects)
}
return result, nil
}
// GetStatus returns the current sync status
func (s *Service) GetStatus() (*SyncStatus, error) {
lastSync := s.localDB.GetLastSyncTime()
// Count server pricelists (only if already connected, don't reconnect)
serverCount := 0
connStatus := s.getConnectionStatus()
if connStatus.IsConnected {
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
pricelistRepo := repository.NewPricelistRepository(mariaDB)
activeCount, err := pricelistRepo.CountActive()
if err == nil {
serverCount = int(activeCount)
}
}
}
// Count local pricelists
localCount := s.localDB.CountLocalPricelists()
needsSync, _ := s.NeedSync()
return &SyncStatus{
LastSyncAt: lastSync,
ServerPricelists: serverCount,
LocalPricelists: int(localCount),
NeedsSync: needsSync,
}, nil
}
// NeedSync checks if synchronization is needed
// Returns true if there are new pricelists on server or last sync was >1 hour ago
func (s *Service) NeedSync() (bool, error) {
lastSync := s.localDB.GetLastSyncTime()
// If never synced, need sync
if lastSync == nil {
return true, nil
}
// If last sync was more than 1 hour ago, suggest sync
if time.Since(*lastSync) > time.Hour {
return true, nil
}
// Check if there are new pricelists on server (only if already connected)
connStatus := s.getConnectionStatus()
if !connStatus.IsConnected {
// If offline, can't check server, no need to sync
return false, nil
}
mariaDB, err := s.getDB()
if err != nil {
// If offline, can't check server, no need to sync
return false, nil
}
pricelistRepo := repository.NewPricelistRepository(mariaDB)
sources := []models.PricelistSource{
models.PricelistSourceEstimate,
models.PricelistSourceWarehouse,
models.PricelistSourceCompetitor,
}
for _, source := range sources {
latestServer, err := pricelistRepo.GetLatestActiveBySource(string(source))
if err != nil {
// No active pricelist for this source yet.
continue
}
latestLocal, err := s.localDB.GetLatestLocalPricelistBySource(string(source))
if err != nil {
// No local pricelist for an existing source on server.
return true, nil
}
// If server has newer pricelist for this source, need sync.
if latestServer.ID != latestLocal.ServerID {
return true, nil
}
}
return false, nil
}
// SyncPricelists synchronizes all active pricelists from server to local SQLite
func (s *Service) SyncPricelists() (int, error) {
slog.Info("starting pricelist sync")
if _, err := s.EnsureReadinessForSync(); err != nil {
return 0, err
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return 0, fmt.Errorf("database not available: %w", err)
}
// Create repository
pricelistRepo := repository.NewPricelistRepository(mariaDB)
// Get active pricelists from server (up to 100)
serverPricelists, _, err := pricelistRepo.ListActive(0, 100)
if err != nil {
return 0, fmt.Errorf("getting active server pricelists: %w", err)
}
serverPricelistIDs := make([]uint, 0, len(serverPricelists))
for i := range serverPricelists {
serverPricelistIDs = append(serverPricelistIDs, serverPricelists[i].ID)
}
synced := 0
for _, pl := range serverPricelists {
// Check if pricelist already exists locally
existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID)
if existing != nil {
// Backfill items for legacy/partial local caches where only pricelist metadata exists.
if s.localDB.CountLocalPricelistItems(existing.ID) == 0 {
itemCount, err := s.SyncPricelistItems(existing.ID)
if err != nil {
slog.Warn("failed to sync missing pricelist items for existing local pricelist", "version", pl.Version, "error", err)
} else {
slog.Info("synced missing pricelist items for existing local pricelist", "version", pl.Version, "items", itemCount)
}
}
continue
}
// Create local pricelist
localPL := &localdb.LocalPricelist{
ServerID: pl.ID,
Source: pl.Source,
Version: pl.Version,
Name: pl.Notification, // Using notification as name
CreatedAt: pl.CreatedAt,
SyncedAt: time.Now(),
IsUsed: false,
}
if err := s.localDB.SaveLocalPricelist(localPL); err != nil {
slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err)
continue
}
// Sync items for the newly created pricelist
itemCount, err := s.SyncPricelistItems(localPL.ID)
if err != nil {
slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err)
// Continue even if items sync fails - we have the pricelist metadata
} else {
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
}
synced++
}
removed, err := s.localDB.DeleteUnusedLocalPricelistsMissingOnServer(serverPricelistIDs)
if err != nil {
slog.Warn("failed to cleanup stale local pricelists", "error", err)
} else if removed > 0 {
slog.Info("deleted stale local pricelists", "deleted", removed)
}
// Backfill lot_category for used pricelists (older local caches may miss the column values).
s.backfillUsedPricelistItemCategories(pricelistRepo, serverPricelistIDs)
// Update last sync time
s.localDB.SetLastSyncTime(time.Now())
s.RecordSyncHeartbeat()
slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists))
return synced, nil
}
func (s *Service) backfillUsedPricelistItemCategories(pricelistRepo *repository.PricelistRepository, activeServerPricelistIDs []uint) {
if s.localDB == nil || pricelistRepo == nil {
return
}
activeSet := make(map[uint]struct{}, len(activeServerPricelistIDs))
for _, id := range activeServerPricelistIDs {
activeSet[id] = struct{}{}
}
type row struct {
ID uint `gorm:"column:id"`
}
var usedRows []row
if err := s.localDB.DB().Raw(`
SELECT DISTINCT pricelist_id AS id
FROM local_configurations
WHERE is_active = 1 AND pricelist_id IS NOT NULL
UNION
SELECT DISTINCT warehouse_pricelist_id AS id
FROM local_configurations
WHERE is_active = 1 AND warehouse_pricelist_id IS NOT NULL
UNION
SELECT DISTINCT competitor_pricelist_id AS id
FROM local_configurations
WHERE is_active = 1 AND competitor_pricelist_id IS NOT NULL
`).Scan(&usedRows).Error; err != nil {
slog.Warn("pricelist category backfill: failed to list used pricelists", "error", err)
return
}
for _, r := range usedRows {
serverID := r.ID
if serverID == 0 {
continue
}
if _, ok := activeSet[serverID]; !ok {
// Not present on server (or not active) - cannot backfill from remote.
continue
}
localPL, err := s.localDB.GetLocalPricelistByServerID(serverID)
if err != nil || localPL == nil {
continue
}
if s.localDB.CountLocalPricelistItems(localPL.ID) == 0 {
continue
}
missing, err := s.localDB.CountLocalPricelistItemsWithEmptyCategory(localPL.ID)
if err != nil {
slog.Warn("pricelist category backfill: failed to check local items", "server_id", serverID, "error", err)
continue
}
if missing == 0 {
continue
}
serverItems, _, err := pricelistRepo.GetItems(serverID, 0, 10000, "")
if err != nil {
slog.Warn("pricelist category backfill: failed to load server items", "server_id", serverID, "error", err)
continue
}
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
for i := range serverItems {
localItems[i] = *localdb.PricelistItemToLocal(&serverItems[i], localPL.ID)
}
if err := s.localDB.ReplaceLocalPricelistItems(localPL.ID, localItems); err != nil {
slog.Warn("pricelist category backfill: failed to replace local items", "server_id", serverID, "error", err)
continue
}
slog.Info("pricelist category backfill: refreshed local items", "server_id", serverID, "items", len(localItems))
}
}
// RecordSyncHeartbeat updates shared sync heartbeat for current DB user.
// Only users with write rights are expected to be able to update this table.
func (s *Service) RecordSyncHeartbeat() {
username := strings.TrimSpace(s.localDB.GetDBUser())
if username == "" {
return
}
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return
}
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
slog.Warn("sync heartbeat: failed to ensure table", "error", err)
return
}
now := time.Now().UTC()
if err := mariaDB.Exec(`
INSERT INTO qt_pricelist_sync_status (username, last_sync_at, updated_at, app_version)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
last_sync_at = VALUES(last_sync_at),
updated_at = VALUES(updated_at),
app_version = VALUES(app_version)
`, username, now, now, appmeta.Version()).Error; err != nil {
slog.Debug("sync heartbeat: skipped", "username", username, "error", err)
}
}
// ListUserSyncStatuses returns users who have recorded sync heartbeat.
func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyncStatus, error) {
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return nil, ErrOffline
}
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
return nil, fmt.Errorf("ensure sync status table: %w", err)
}
type row struct {
Username string `gorm:"column:username"`
LastSyncAt time.Time `gorm:"column:last_sync_at"`
AppVersion string `gorm:"column:app_version"`
}
var rows []row
if err := mariaDB.Raw(`
SELECT username, last_sync_at, COALESCE(app_version, '') AS app_version
FROM qt_pricelist_sync_status
ORDER BY last_sync_at DESC, username ASC
`).Scan(&rows).Error; err != nil {
return nil, fmt.Errorf("load sync status rows: %w", err)
}
activeUsers, err := s.listConnectedDBUsers(mariaDB)
if err != nil {
slog.Debug("sync status: failed to load connected DB users", "error", err)
activeUsers = map[string]struct{}{}
}
now := time.Now().UTC()
result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers))
for i := range rows {
r := rows[i]
username := strings.TrimSpace(r.Username)
if username == "" {
continue
}
isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold
if _, connected := activeUsers[username]; connected {
isOnline = true
delete(activeUsers, username)
}
appVersion := strings.TrimSpace(r.AppVersion)
result = append(result, UserSyncStatus{
Username: username,
LastSyncAt: r.LastSyncAt,
AppVersion: appVersion,
IsOnline: isOnline,
})
}
for username := range activeUsers {
result = append(result, UserSyncStatus{
Username: username,
LastSyncAt: now,
AppVersion: "",
IsOnline: true,
})
}
sort.SliceStable(result, func(i, j int) bool {
if result[i].IsOnline != result[j].IsOnline {
return result[i].IsOnline
}
if result[i].LastSyncAt.Equal(result[j].LastSyncAt) {
return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username)
}
return result[i].LastSyncAt.After(result[j].LastSyncAt)
})
return result, nil
}
func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) {
type processUserRow struct {
Username string `gorm:"column:username"`
}
var rows []processUserRow
if err := mariaDB.Raw(`
SELECT DISTINCT TRIM(USER) AS username
FROM information_schema.PROCESSLIST
WHERE COALESCE(TRIM(USER), '') <> ''
AND DB = DATABASE()
`).Scan(&rows).Error; err != nil {
return nil, err
}
users := make(map[string]struct{}, len(rows))
for i := range rows {
username := strings.TrimSpace(rows[i].Username)
if username == "" {
continue
}
users[username] = struct{}{}
}
return users, nil
}
func ensureUserSyncStatusTable(db *gorm.DB) error {
// Check if table exists instead of trying to create (avoids permission issues)
if !tableExists(db, "qt_pricelist_sync_status") {
if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status (
username VARCHAR(100) NOT NULL,
last_sync_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
app_version VARCHAR(64) NULL,
PRIMARY KEY (username),
INDEX idx_qt_pricelist_sync_status_last_sync (last_sync_at)
)
`).Error; err != nil {
return fmt.Errorf("create qt_pricelist_sync_status table: %w", err)
}
}
// Backward compatibility for environments where table was created without app_version.
// Only try to add column if table exists.
if tableExists(db, "qt_pricelist_sync_status") {
if err := db.Exec(`
ALTER TABLE qt_pricelist_sync_status
ADD COLUMN IF NOT EXISTS app_version VARCHAR(64) NULL
`).Error; err != nil {
// Log but don't fail if alter fails (column might already exist)
slog.Debug("failed to add app_version column", "error", err)
}
}
return nil
}
// SyncPricelistItems synchronizes items for a specific pricelist
func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
// Get local pricelist
localPL, err := s.localDB.GetLocalPricelistByID(localPricelistID)
if err != nil {
return 0, fmt.Errorf("getting local pricelist: %w", err)
}
// Check if items already exist
existingCount := s.localDB.CountLocalPricelistItems(localPricelistID)
if existingCount > 0 {
slog.Debug("pricelist items already synced", "pricelist_id", localPricelistID, "count", existingCount)
return int(existingCount), nil
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return 0, fmt.Errorf("database not available: %w", err)
}
// Create repository
pricelistRepo := repository.NewPricelistRepository(mariaDB)
// Get items from server
serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "")
if err != nil {
return 0, fmt.Errorf("getting server pricelist items: %w", err)
}
// Convert and save locally
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
for i, item := range serverItems {
localItems[i] = *localdb.PricelistItemToLocal(&item, localPricelistID)
}
if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil {
return 0, fmt.Errorf("saving local pricelist items: %w", err)
}
slog.Info("synced pricelist items", "pricelist_id", localPricelistID, "items", len(localItems))
return len(localItems), nil
}
// SyncPricelistItemsByServerID syncs items for a pricelist by its server ID
func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) {
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
if err != nil {
return 0, fmt.Errorf("local pricelist not found for server ID %d", serverPricelistID)
}
return s.SyncPricelistItems(localPL.ID)
}
// GetLocalPriceForLot returns the price for a lot from a local pricelist
func (s *Service) GetLocalPriceForLot(localPricelistID uint, lotName string) (float64, error) {
return s.localDB.GetLocalPriceForLot(localPricelistID, lotName)
}
// GetPricelistForOffline returns a pricelist suitable for offline use
// If items are not synced, it will sync them first
func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.LocalPricelist, error) {
// Ensure pricelist is synced
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
if err != nil {
// Try to sync pricelists first
if _, err := s.SyncPricelists(); err != nil {
return nil, fmt.Errorf("syncing pricelists: %w", err)
}
// Try again
localPL, err = s.localDB.GetLocalPricelistByServerID(serverPricelistID)
if err != nil {
return nil, fmt.Errorf("pricelist not found on server: %w", err)
}
}
// Ensure items are synced
if _, err := s.SyncPricelistItems(localPL.ID); err != nil {
return nil, fmt.Errorf("syncing pricelist items: %w", err)
}
return localPL, nil
}
// SyncPricelistsIfNeeded checks for new pricelists and syncs if needed
// This should be called before creating a new configuration when online
func (s *Service) SyncPricelistsIfNeeded() error {
needSync, err := s.NeedSync()
if err != nil {
slog.Warn("failed to check if sync needed", "error", err)
return nil // Don't fail on check error
}
if !needSync {
slog.Debug("pricelists are up to date, no sync needed")
return nil
}
slog.Info("new pricelists detected, syncing...")
_, err = s.SyncPricelists()
if err != nil {
return fmt.Errorf("syncing pricelists: %w", err)
}
return nil
}
// PushPendingChanges pushes all pending changes to the server
func (s *Service) PushPendingChanges() (int, error) {
if _, err := s.EnsureReadinessForSync(); err != nil {
return 0, err
}
removed, err := s.localDB.PurgeOrphanConfigurationPendingChanges()
if err != nil {
slog.Warn("failed to purge orphan configuration pending changes", "error", err)
} else if removed > 0 {
slog.Info("purged orphan configuration pending changes", "removed", removed)
}
changes, err := s.localDB.GetPendingChanges()
if err != nil {
return 0, fmt.Errorf("getting pending changes: %w", err)
}
if len(changes) == 0 {
slog.Debug("no pending changes to push")
return 0, nil
}
slog.Info("pushing pending changes", "count", len(changes))
pushed := 0
var syncedIDs []int64
sortedChanges := prioritizeProjectChanges(changes)
for _, change := range sortedChanges {
err := s.pushSingleChange(&change)
if err != nil {
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
// Increment attempts
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
continue
}
syncedIDs = append(syncedIDs, change.ID)
pushed++
}
// Mark synced changes as complete by deleting them
if len(syncedIDs) > 0 {
if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil {
slog.Error("failed to mark changes as synced", "error", err)
}
}
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
return pushed, nil
}
// pushSingleChange pushes a single pending change to the server
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
switch change.EntityType {
case "project":
return s.pushProjectChange(change)
case "configuration":
return s.pushConfigurationChange(change)
default:
return fmt.Errorf("unknown entity type: %s", change.EntityType)
}
}
func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange {
if len(changes) < 2 {
return changes
}
projectChanges := make([]localdb.PendingChange, 0, len(changes))
otherChanges := make([]localdb.PendingChange, 0, len(changes))
for _, change := range changes {
if change.EntityType == "project" {
projectChanges = append(projectChanges, change)
continue
}
otherChanges = append(otherChanges, change)
}
sorted := make([]localdb.PendingChange, 0, len(changes))
sorted = append(sorted, projectChanges...)
sorted = append(sorted, otherChanges...)
return sorted
}
func (s *Service) pushProjectChange(change *localdb.PendingChange) error {
payload, err := decodeProjectChangePayload(change)
if err != nil {
return fmt.Errorf("decode project payload: %w", err)
}
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
projectRepo := repository.NewProjectRepository(mariaDB)
project := payload.Snapshot
project.UUID = payload.ProjectUUID
if strings.TrimSpace(project.Code) == "" {
project.Code = strings.TrimSpace(derefString(project.Name))
if project.Code == "" {
project.Code = project.UUID
}
}
// Try upsert by UUID first
err = projectRepo.UpsertByUUID(&project)
if err != nil {
// Check if it's a duplicate (code, variant) constraint violation
// In this case, find existing project with same (code, variant) and link to it
var existing models.Project
lookupErr := mariaDB.Where("code = ? AND variant = ?", project.Code, project.Variant).First(&existing).Error
if lookupErr == nil {
// Found duplicate - link local project to existing server project
slog.Info("project duplicate found, linking to existing",
"local_uuid", project.UUID,
"server_uuid", existing.UUID,
"server_id", existing.ID,
"code", project.Code,
"variant", project.Variant)
project.ID = existing.ID
} else {
// Not a duplicate issue, return original error
return fmt.Errorf("upsert project on server: %w", err)
}
}
// Update local project with server ID
localProject, localErr := s.localDB.GetProjectByUUID(project.UUID)
if localErr == nil {
if project.ID > 0 {
serverID := project.ID
localProject.ServerID = &serverID
}
localProject.SyncStatus = "synced"
now := time.Now()
localProject.SyncedAt = &now
_ = s.localDB.SaveProject(localProject)
}
return nil
}
func derefString(value *string) string {
if value == nil {
return ""
}
return *value
}
func ptrString(value string) *string {
return &value
}
func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) {
var payload ProjectChangePayload
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" {
if payload.Operation == "" {
payload.Operation = change.Operation
}
return payload, nil
}
var project models.Project
if err := json.Unmarshal([]byte(change.Payload), &project); err != nil {
return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err)
}
return ProjectChangePayload{
ProjectUUID: project.UUID,
Operation: change.Operation,
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation),
Snapshot: project,
}, nil
}
// pushConfigurationChange pushes a configuration change to the server
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
switch change.Operation {
case "create":
return s.pushConfigurationCreate(change)
case "update":
return s.pushConfigurationUpdate(change)
case "rollback":
return s.pushConfigurationRollback(change)
case "deactivate":
return s.pushConfigurationDeactivate(change)
case "reactivate":
return s.pushConfigurationReactivate(change)
case "delete":
return s.pushConfigurationDelete(change)
default:
return fmt.Errorf("unknown operation: %s", change.Operation)
}
}
// pushConfigurationCreate creates a configuration on the server
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
if err != nil {
return err
}
if isStale {
slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
return nil
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
// Create repository
configRepo := repository.NewConfigurationRepository(mariaDB)
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration owner: %w", err)
}
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration project: %w", err)
}
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration pricelist: %w", err)
}
// Create on server
if err := configRepo.Create(&cfg); err != nil {
// Idempotency fallback: configuration may already be created remotely.
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
if getErr != nil {
return fmt.Errorf("creating configuration on server: %w", err)
}
cfg.ID = serverCfg.ID
if updateErr := configRepo.Update(&cfg); updateErr != nil {
return fmt.Errorf("create fallback update on server: %w", updateErr)
}
}
// Update local configuration with server ID
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err == nil {
serverID := cfg.ID
localCfg.ServerID = &serverID
localCfg.SyncStatus = "synced"
s.localDB.SaveConfiguration(localCfg)
}
slog.Info("configuration created on server",
"uuid", cfg.UUID,
"server_id", cfg.ID,
"version_no", payload.CurrentVersionNo,
"version_id", payload.CurrentVersionID,
"idempotency_key", payload.IdempotencyKey,
)
return nil
}
// pushConfigurationUpdate updates a configuration on the server
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
if err != nil {
return err
}
if isStale {
slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
return nil
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
// Create repository
configRepo := repository.NewConfigurationRepository(mariaDB)
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration owner: %w", err)
}
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration project: %w", err)
}
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration pricelist: %w", err)
}
// Ensure we have a server ID before updating
// If the payload doesn't have ID, get it from local configuration
if cfg.ID == 0 {
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err != nil {
return fmt.Errorf("getting local configuration: %w", err)
}
if localCfg.ServerID == nil {
// Configuration hasn't been synced yet, try to find it on server by UUID.
// If not found (e.g. stale create was skipped), create it from current snapshot.
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
if getErr != nil {
if !errors.Is(getErr, gorm.ErrRecordNotFound) {
return fmt.Errorf("loading configuration from server: %w", getErr)
}
if createErr := configRepo.Create(&cfg); createErr != nil {
// Idempotency fallback: configuration may have been created concurrently.
existing, existingErr := configRepo.GetByUUID(cfg.UUID)
if existingErr != nil {
return fmt.Errorf("creating missing configuration on server: %w", createErr)
}
cfg.ID = existing.ID
}
if cfg.ID == 0 {
existing, existingErr := configRepo.GetByUUID(cfg.UUID)
if existingErr != nil {
return fmt.Errorf("loading created configuration from server: %w", existingErr)
}
cfg.ID = existing.ID
}
} else {
cfg.ID = serverCfg.ID
}
// Update local with server ID
serverID := cfg.ID
localCfg.ServerID = &serverID
s.localDB.SaveConfiguration(localCfg)
} else {
cfg.ID = *localCfg.ServerID
}
}
// Update on server
if err := configRepo.Update(&cfg); err != nil {
return fmt.Errorf("updating configuration on server: %w", err)
}
// Update local sync status
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err == nil {
localCfg.SyncStatus = "synced"
s.localDB.SaveConfiguration(localCfg)
}
slog.Info("configuration updated on server",
"uuid", cfg.UUID,
"version_no", payload.CurrentVersionNo,
"version_id", payload.CurrentVersionID,
"idempotency_key", payload.IdempotencyKey,
"operation", payload.Operation,
"conflict_policy", payload.ConflictPolicy,
)
return nil
}
func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configuration) error {
if cfg == nil {
return fmt.Errorf("configuration is nil")
}
ownerUsername := cfg.OwnerUsername
if ownerUsername == "" {
ownerUsername = s.localDB.GetDBUser()
cfg.OwnerUsername = ownerUsername
}
if ownerUsername == "" {
return fmt.Errorf("owner username is empty")
}
// user_id is legacy and no longer used for ownership in local-first mode.
// Keep it NULL on writes; ownership is represented by owner_username.
cfg.UserID = nil
cfg.AppVersion = appmeta.Version()
return nil
}
func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error {
if cfg == nil {
return fmt.Errorf("configuration is nil")
}
projectRepo := repository.NewProjectRepository(mariaDB)
if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" {
_, err := projectRepo.GetByUUID(*cfg.ProjectUUID)
if err == nil {
return nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
if localErr != nil {
return err
}
modelProject := localdb.LocalToProject(localProject)
if modelProject.OwnerUsername == "" {
modelProject.OwnerUsername = cfg.OwnerUsername
}
if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil {
return createErr
}
if modelProject.ID > 0 {
serverID := modelProject.ID
localProject.ServerID = &serverID
localProject.SyncStatus = "synced"
now := time.Now()
localProject.SyncedAt = &now
_ = s.localDB.SaveProject(localProject)
}
return nil
}
systemProject := &models.Project{}
err := mariaDB.
Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true).
Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC").
First(systemProject).Error
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
systemProject = &models.Project{
UUID: uuid.NewString(),
OwnerUsername: "",
Code: "Без проекта",
Name: ptrString("Без проекта"),
IsActive: true,
IsSystem: true,
}
if createErr := projectRepo.Create(systemProject); createErr != nil {
return createErr
}
}
cfg.ProjectUUID = &systemProject.UUID
return nil
}
func (s *Service) ensureConfigurationPricelist(mariaDB *gorm.DB, cfg *models.Configuration) error {
if cfg == nil {
return fmt.Errorf("configuration is nil")
}
pricelistRepo := repository.NewPricelistRepository(mariaDB)
if cfg.PricelistID != nil && *cfg.PricelistID > 0 {
if _, err := pricelistRepo.GetByID(*cfg.PricelistID); err == nil {
return nil
}
}
latest, err := pricelistRepo.GetLatestActive()
if err != nil {
cfg.PricelistID = nil
return nil
}
cfg.PricelistID = &latest.ID
return nil
}
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
return s.pushConfigurationUpdate(change)
}
func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error {
// Local deactivate is represented as the latest snapshot push.
return s.pushConfigurationUpdate(change)
}
func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error {
// Local reactivate is represented as the latest snapshot push.
return s.pushConfigurationUpdate(change)
}
func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) {
payload, err := decodeConfigurationChangePayload(change)
if err != nil {
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err)
}
eventVersionNo := payload.CurrentVersionNo
currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID)
if err != nil {
// Local config may be gone (e.g. stale queue item after delete/cleanup). Treat as no-op.
if errors.Is(err, gorm.ErrRecordNotFound) {
return payload, payload.Snapshot, true, nil
}
// create->deactivate race: config may no longer be active/visible locally, skip stale create.
if change.Operation == "create" {
return payload, payload.Snapshot, true, nil
}
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err)
}
if payload.ConflictPolicy == "" {
payload.ConflictPolicy = "last_write_wins"
}
if currentCfg.UUID != "" {
payload.Snapshot = currentCfg
if currentVersionID != "" {
payload.CurrentVersionID = currentVersionID
}
if currentVersionNo > 0 {
payload.CurrentVersionNo = currentVersionNo
}
payload.PricelistID = currentCfg.PricelistID
}
isStale := false
if eventVersionNo > 0 && currentVersionNo > eventVersionNo {
// Keep only latest intent in queue; older versions become no-op.
isStale = true
}
if !isStale && change.Operation == "create" {
localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID)
if getErr == nil && !localCfg.IsActive {
isStale = true
}
}
return payload, payload.Snapshot, isStale, nil
}
func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) {
var payload ConfigurationChangePayload
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" {
if payload.Operation == "" {
payload.Operation = change.Operation
}
return payload, nil
}
// Backward compatibility: legacy queue stored raw models.Configuration JSON.
var cfg models.Configuration
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err)
}
return ConfigurationChangePayload{
EventID: "",
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
ConfigurationUUID: cfg.UUID,
ProjectUUID: cfg.ProjectUUID,
PricelistID: cfg.PricelistID,
Operation: change.Operation,
ConflictPolicy: "last_write_wins",
Snapshot: cfg,
}, nil
}
func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) {
localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID)
if err != nil {
return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err)
}
cfg := *localdb.LocalToConfiguration(localCfg)
currentVersionID := ""
if localCfg.CurrentVersionID != nil {
currentVersionID = *localCfg.CurrentVersionID
}
currentVersionNo := 0
if currentVersionID != "" {
var version localdb.LocalConfigurationVersion
err = s.localDB.DB().
Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID).
First(&version).Error
if err == nil {
currentVersionNo = version.VersionNo
}
}
if currentVersionNo == 0 {
var latest localdb.LocalConfigurationVersion
err = s.localDB.DB().
Where("configuration_uuid = ?", configurationUUID).
Order("version_no DESC").
First(&latest).Error
if err == nil {
currentVersionNo = latest.VersionNo
currentVersionID = latest.ID
}
}
if currentVersionNo == 0 {
if err := s.repairMissingConfigurationVersion(localCfg); err != nil {
return models.Configuration{}, "", 0, fmt.Errorf("repair missing configuration version: %w", err)
}
var latest localdb.LocalConfigurationVersion
err = s.localDB.DB().
Where("configuration_uuid = ?", configurationUUID).
Order("version_no DESC").
First(&latest).Error
if err == nil {
currentVersionNo = latest.VersionNo
currentVersionID = latest.ID
}
}
if currentVersionNo == 0 {
return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID)
}
return cfg, currentVersionID, currentVersionNo, nil
}
func (s *Service) repairMissingConfigurationVersion(localCfg *localdb.LocalConfiguration) error {
if localCfg == nil {
return fmt.Errorf("local configuration is nil")
}
return s.localDB.DB().Transaction(func(tx *gorm.DB) error {
var cfg localdb.LocalConfiguration
if err := tx.Where("uuid = ?", localCfg.UUID).First(&cfg).Error; err != nil {
return fmt.Errorf("load local configuration: %w", err)
}
// If versions exist, just make sure current_version_id is set.
var latest localdb.LocalConfigurationVersion
if err := tx.Where("configuration_uuid = ?", cfg.UUID).
Order("version_no DESC").
First(&latest).Error; err == nil {
if cfg.CurrentVersionID == nil || *cfg.CurrentVersionID == "" {
if err := tx.Model(&localdb.LocalConfiguration{}).
Where("uuid = ?", cfg.UUID).
Update("current_version_id", latest.ID).Error; err != nil {
return fmt.Errorf("set current version id: %w", err)
}
}
return nil
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("load latest version: %w", err)
}
snapshot, err := localdb.BuildConfigurationSnapshot(&cfg)
if err != nil {
return fmt.Errorf("build configuration snapshot: %w", err)
}
note := "Auto-repaired missing local version"
version := localdb.LocalConfigurationVersion{
ID: uuid.NewString(),
ConfigurationUUID: cfg.UUID,
VersionNo: 1,
Data: snapshot,
ChangeNote: &note,
AppVersion: appmeta.Version(),
CreatedAt: time.Now(),
}
if err := tx.Create(&version).Error; err != nil {
return fmt.Errorf("create initial version: %w", err)
}
if err := tx.Model(&localdb.LocalConfiguration{}).
Where("uuid = ?", cfg.UUID).
Update("current_version_id", version.ID).Error; err != nil {
return fmt.Errorf("set current version id: %w", err)
}
slog.Warn("repaired missing local configuration version", "uuid", cfg.UUID, "version_no", version.VersionNo)
return nil
})
}
// NOTE: prepared for future conflict resolution:
// when server starts storing version metadata, we can compare payload.CurrentVersionNo
// against remote version and branch into custom strategies. For now use last-write-wins.
// pushConfigurationDelete deletes a configuration from the server
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
// Create repository
configRepo := repository.NewConfigurationRepository(mariaDB)
// Get the configuration from server by UUID to get the ID
cfg, err := configRepo.GetByUUID(change.EntityUUID)
if err != nil {
// Already deleted or not found, consider it successful
slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID)
return nil
}
// Delete from server
if err := configRepo.Delete(cfg.ID); err != nil {
return fmt.Errorf("deleting configuration from server: %w", err)
}
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
return nil
}
func (s *Service) getDB() (*gorm.DB, error) {
if s.directDB != nil {
return s.directDB, nil
}
if s.connMgr == nil {
return nil, ErrOffline
}
return s.connMgr.GetDB()
}
func (s *Service) getConnectionStatus() db.ConnectionStatus {
if s.directDB != nil {
return db.ConnectionStatus{IsConnected: true}
}
if s.connMgr == nil {
return db.ConnectionStatus{IsConnected: false}
}
return s.connMgr.GetStatus()
}