Files
QuoteForge/internal/services/sync/service.go
2026-02-06 13:01:40 +03:00

1020 lines
31 KiB
Go

package sync
import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"time"
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
"git.mchus.pro/mchus/quoteforge/internal/db"
"git.mchus.pro/mchus/quoteforge/internal/localdb"
"git.mchus.pro/mchus/quoteforge/internal/models"
"git.mchus.pro/mchus/quoteforge/internal/repository"
"github.com/google/uuid"
"gorm.io/gorm"
)
var ErrOffline = errors.New("database is offline")
// Service handles synchronization between MariaDB and local SQLite
type Service struct {
connMgr *db.ConnectionManager
localDB *localdb.LocalDB
directDB *gorm.DB
}
// NewService creates a new sync service
func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service {
return &Service{
connMgr: connMgr,
localDB: localDB,
}
}
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
return &Service{
localDB: localDB,
directDB: mariaDB,
}
}
// SyncStatus represents the current sync status
type SyncStatus struct {
LastSyncAt *time.Time `json:"last_sync_at"`
ServerPricelists int `json:"server_pricelists"`
LocalPricelists int `json:"local_pricelists"`
NeedsSync bool `json:"needs_sync"`
}
// ConfigImportResult represents server->local configuration import stats.
type ConfigImportResult struct {
Imported int `json:"imported"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
}
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
type ConfigurationChangePayload struct {
EventID string `json:"event_id"`
IdempotencyKey string `json:"idempotency_key"`
ConfigurationUUID string `json:"configuration_uuid"`
ProjectUUID *string `json:"project_uuid,omitempty"`
PricelistID *uint `json:"pricelist_id,omitempty"`
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
CurrentVersionID string `json:"current_version_id,omitempty"`
CurrentVersionNo int `json:"current_version_no,omitempty"`
ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins
Snapshot models.Configuration `json:"snapshot"`
CreatedAt time.Time `json:"created_at"`
CreatedBy *string `json:"created_by,omitempty"`
}
type ProjectChangePayload struct {
EventID string `json:"event_id"`
IdempotencyKey string `json:"idempotency_key"`
ProjectUUID string `json:"project_uuid"`
Operation string `json:"operation"`
Snapshot models.Project `json:"snapshot"`
CreatedAt time.Time `json:"created_at"`
}
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
// Existing local configs with pending local changes are skipped to avoid data loss.
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
mariaDB, err := s.getDB()
if err != nil {
return nil, ErrOffline
}
configRepo := repository.NewConfigurationRepository(mariaDB)
result := &ConfigImportResult{}
offset := 0
const limit = 200
for {
serverConfigs, _, err := configRepo.ListAll(offset, limit)
if err != nil {
return nil, fmt.Errorf("listing server configurations: %w", err)
}
if len(serverConfigs) == 0 {
break
}
for i := range serverConfigs {
cfg := serverConfigs[i]
existing, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("getting local configuration %s: %w", cfg.UUID, err)
}
if existing != nil && err == nil && existing.SyncStatus == "pending" {
result.Skipped++
continue
}
if existing != nil && err == nil && !existing.IsActive {
// Keep local deactivation sticky: do not resurrect hidden entries from server pull.
result.Skipped++
continue
}
localCfg := localdb.ConfigurationToLocal(&cfg)
now := time.Now()
localCfg.SyncedAt = &now
localCfg.SyncStatus = "synced"
localCfg.UpdatedAt = now
if existing != nil && err == nil {
localCfg.ID = existing.ID
result.Updated++
} else {
result.Imported++
}
if err := s.localDB.SaveConfiguration(localCfg); err != nil {
return nil, fmt.Errorf("saving local configuration %s: %w", cfg.UUID, err)
}
}
offset += len(serverConfigs)
}
return result, nil
}
// GetStatus returns the current sync status
func (s *Service) GetStatus() (*SyncStatus, error) {
lastSync := s.localDB.GetLastSyncTime()
// Count server pricelists (only if already connected, don't reconnect)
serverCount := 0
connStatus := s.getConnectionStatus()
if connStatus.IsConnected {
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
pricelistRepo := repository.NewPricelistRepository(mariaDB)
activeCount, err := pricelistRepo.CountActive()
if err == nil {
serverCount = int(activeCount)
}
}
}
// Count local pricelists
localCount := s.localDB.CountLocalPricelists()
needsSync, _ := s.NeedSync()
return &SyncStatus{
LastSyncAt: lastSync,
ServerPricelists: serverCount,
LocalPricelists: int(localCount),
NeedsSync: needsSync,
}, nil
}
// NeedSync checks if synchronization is needed
// Returns true if there are new pricelists on server or last sync was >1 hour ago
func (s *Service) NeedSync() (bool, error) {
lastSync := s.localDB.GetLastSyncTime()
// If never synced, need sync
if lastSync == nil {
return true, nil
}
// If last sync was more than 1 hour ago, suggest sync
if time.Since(*lastSync) > time.Hour {
return true, nil
}
// Check if there are new pricelists on server (only if already connected)
connStatus := s.getConnectionStatus()
if !connStatus.IsConnected {
// If offline, can't check server, no need to sync
return false, nil
}
mariaDB, err := s.getDB()
if err != nil {
// If offline, can't check server, no need to sync
return false, nil
}
pricelistRepo := repository.NewPricelistRepository(mariaDB)
latestServer, err := pricelistRepo.GetLatestActive()
if err != nil {
// If no pricelists on server, no need to sync
return false, nil
}
latestLocal, err := s.localDB.GetLatestLocalPricelist()
if err != nil {
// No local pricelists, need to sync
return true, nil
}
// If server has newer pricelist, need sync
if latestServer.ID != latestLocal.ServerID {
return true, nil
}
return false, nil
}
// SyncPricelists synchronizes all active pricelists from server to local SQLite
func (s *Service) SyncPricelists() (int, error) {
slog.Info("starting pricelist sync")
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return 0, fmt.Errorf("database not available: %w", err)
}
// Create repository
pricelistRepo := repository.NewPricelistRepository(mariaDB)
// Get active pricelists from server (up to 100)
serverPricelists, _, err := pricelistRepo.ListActive(0, 100)
if err != nil {
return 0, fmt.Errorf("getting active server pricelists: %w", err)
}
synced := 0
var latestLocalID uint
var latestServerID uint
for _, pl := range serverPricelists {
// Check if pricelist already exists locally
existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID)
if existing != nil {
// Already synced, track latest by server ID
if pl.ID > latestServerID {
latestServerID = pl.ID
latestLocalID = existing.ID
}
continue
}
// Create local pricelist
localPL := &localdb.LocalPricelist{
ServerID: pl.ID,
Version: pl.Version,
Name: pl.Notification, // Using notification as name
CreatedAt: pl.CreatedAt,
SyncedAt: time.Now(),
IsUsed: false,
}
if err := s.localDB.SaveLocalPricelist(localPL); err != nil {
slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err)
continue
}
// Sync items for the newly created pricelist
itemCount, err := s.SyncPricelistItems(localPL.ID)
if err != nil {
slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err)
// Continue even if items sync fails - we have the pricelist metadata
} else {
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
}
if pl.ID > latestServerID {
latestServerID = pl.ID
latestLocalID = localPL.ID
}
synced++
}
// Update component prices from latest pricelist
if latestLocalID > 0 {
updated, err := s.localDB.UpdateComponentPricesFromPricelist(latestLocalID)
if err != nil {
slog.Warn("failed to update component prices from pricelist", "error", err)
} else {
slog.Info("updated component prices from latest pricelist", "updated", updated)
}
}
// Update last sync time
s.localDB.SetLastSyncTime(time.Now())
slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists))
return synced, nil
}
// SyncPricelistItems synchronizes items for a specific pricelist
func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
// Get local pricelist
localPL, err := s.localDB.GetLocalPricelistByID(localPricelistID)
if err != nil {
return 0, fmt.Errorf("getting local pricelist: %w", err)
}
// Check if items already exist
existingCount := s.localDB.CountLocalPricelistItems(localPricelistID)
if existingCount > 0 {
slog.Debug("pricelist items already synced", "pricelist_id", localPricelistID, "count", existingCount)
return int(existingCount), nil
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return 0, fmt.Errorf("database not available: %w", err)
}
// Create repository
pricelistRepo := repository.NewPricelistRepository(mariaDB)
// Get items from server
serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "")
if err != nil {
return 0, fmt.Errorf("getting server pricelist items: %w", err)
}
// Convert and save locally
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
for i, item := range serverItems {
localItems[i] = localdb.LocalPricelistItem{
PricelistID: localPricelistID,
LotName: item.LotName,
Price: item.Price,
}
}
if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil {
return 0, fmt.Errorf("saving local pricelist items: %w", err)
}
slog.Info("synced pricelist items", "pricelist_id", localPricelistID, "items", len(localItems))
return len(localItems), nil
}
// SyncPricelistItemsByServerID syncs items for a pricelist by its server ID
func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) {
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
if err != nil {
return 0, fmt.Errorf("local pricelist not found for server ID %d", serverPricelistID)
}
return s.SyncPricelistItems(localPL.ID)
}
// GetLocalPriceForLot returns the price for a lot from a local pricelist
func (s *Service) GetLocalPriceForLot(localPricelistID uint, lotName string) (float64, error) {
return s.localDB.GetLocalPriceForLot(localPricelistID, lotName)
}
// GetPricelistForOffline returns a pricelist suitable for offline use
// If items are not synced, it will sync them first
func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.LocalPricelist, error) {
// Ensure pricelist is synced
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
if err != nil {
// Try to sync pricelists first
if _, err := s.SyncPricelists(); err != nil {
return nil, fmt.Errorf("syncing pricelists: %w", err)
}
// Try again
localPL, err = s.localDB.GetLocalPricelistByServerID(serverPricelistID)
if err != nil {
return nil, fmt.Errorf("pricelist not found on server: %w", err)
}
}
// Ensure items are synced
if _, err := s.SyncPricelistItems(localPL.ID); err != nil {
return nil, fmt.Errorf("syncing pricelist items: %w", err)
}
return localPL, nil
}
// SyncPricelistsIfNeeded checks for new pricelists and syncs if needed
// This should be called before creating a new configuration when online
func (s *Service) SyncPricelistsIfNeeded() error {
needSync, err := s.NeedSync()
if err != nil {
slog.Warn("failed to check if sync needed", "error", err)
return nil // Don't fail on check error
}
if !needSync {
slog.Debug("pricelists are up to date, no sync needed")
return nil
}
slog.Info("new pricelists detected, syncing...")
_, err = s.SyncPricelists()
if err != nil {
return fmt.Errorf("syncing pricelists: %w", err)
}
return nil
}
// PushPendingChanges pushes all pending changes to the server
func (s *Service) PushPendingChanges() (int, error) {
removed, err := s.localDB.PurgeOrphanConfigurationPendingChanges()
if err != nil {
slog.Warn("failed to purge orphan configuration pending changes", "error", err)
} else if removed > 0 {
slog.Info("purged orphan configuration pending changes", "removed", removed)
}
changes, err := s.localDB.GetPendingChanges()
if err != nil {
return 0, fmt.Errorf("getting pending changes: %w", err)
}
if len(changes) == 0 {
slog.Debug("no pending changes to push")
return 0, nil
}
slog.Info("pushing pending changes", "count", len(changes))
pushed := 0
var syncedIDs []int64
sortedChanges := prioritizeProjectChanges(changes)
for _, change := range sortedChanges {
err := s.pushSingleChange(&change)
if err != nil {
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
// Increment attempts
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
continue
}
syncedIDs = append(syncedIDs, change.ID)
pushed++
}
// Mark synced changes as complete by deleting them
if len(syncedIDs) > 0 {
if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil {
slog.Error("failed to mark changes as synced", "error", err)
}
}
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
return pushed, nil
}
// pushSingleChange pushes a single pending change to the server
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
switch change.EntityType {
case "project":
return s.pushProjectChange(change)
case "configuration":
return s.pushConfigurationChange(change)
default:
return fmt.Errorf("unknown entity type: %s", change.EntityType)
}
}
func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange {
if len(changes) < 2 {
return changes
}
projectChanges := make([]localdb.PendingChange, 0, len(changes))
otherChanges := make([]localdb.PendingChange, 0, len(changes))
for _, change := range changes {
if change.EntityType == "project" {
projectChanges = append(projectChanges, change)
continue
}
otherChanges = append(otherChanges, change)
}
sorted := make([]localdb.PendingChange, 0, len(changes))
sorted = append(sorted, projectChanges...)
sorted = append(sorted, otherChanges...)
return sorted
}
func (s *Service) pushProjectChange(change *localdb.PendingChange) error {
payload, err := decodeProjectChangePayload(change)
if err != nil {
return fmt.Errorf("decode project payload: %w", err)
}
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
projectRepo := repository.NewProjectRepository(mariaDB)
project := payload.Snapshot
project.UUID = payload.ProjectUUID
serverProject, err := projectRepo.GetByUUID(project.UUID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
if createErr := projectRepo.Create(&project); createErr != nil {
return fmt.Errorf("create project on server: %w", createErr)
}
} else {
return fmt.Errorf("get project on server: %w", err)
}
} else {
project.ID = serverProject.ID
if updateErr := projectRepo.Update(&project); updateErr != nil {
return fmt.Errorf("update project on server: %w", updateErr)
}
}
localProject, localErr := s.localDB.GetProjectByUUID(project.UUID)
if localErr == nil {
if project.ID > 0 {
serverID := project.ID
localProject.ServerID = &serverID
}
localProject.SyncStatus = "synced"
now := time.Now()
localProject.SyncedAt = &now
_ = s.localDB.SaveProject(localProject)
}
return nil
}
func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) {
var payload ProjectChangePayload
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" {
if payload.Operation == "" {
payload.Operation = change.Operation
}
return payload, nil
}
var project models.Project
if err := json.Unmarshal([]byte(change.Payload), &project); err != nil {
return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err)
}
return ProjectChangePayload{
ProjectUUID: project.UUID,
Operation: change.Operation,
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation),
Snapshot: project,
}, nil
}
// pushConfigurationChange pushes a configuration change to the server
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
switch change.Operation {
case "create":
return s.pushConfigurationCreate(change)
case "update":
return s.pushConfigurationUpdate(change)
case "rollback":
return s.pushConfigurationRollback(change)
case "deactivate":
return s.pushConfigurationDeactivate(change)
case "reactivate":
return s.pushConfigurationReactivate(change)
case "delete":
return s.pushConfigurationDelete(change)
default:
return fmt.Errorf("unknown operation: %s", change.Operation)
}
}
// pushConfigurationCreate creates a configuration on the server
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
if err != nil {
return err
}
if isStale {
slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
return nil
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
// Create repository
configRepo := repository.NewConfigurationRepository(mariaDB)
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration owner: %w", err)
}
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration project: %w", err)
}
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration pricelist: %w", err)
}
// Create on server
if err := configRepo.Create(&cfg); err != nil {
// Idempotency fallback: configuration may already be created remotely.
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
if getErr != nil {
return fmt.Errorf("creating configuration on server: %w", err)
}
cfg.ID = serverCfg.ID
if updateErr := configRepo.Update(&cfg); updateErr != nil {
return fmt.Errorf("create fallback update on server: %w", updateErr)
}
}
// Update local configuration with server ID
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err == nil {
serverID := cfg.ID
localCfg.ServerID = &serverID
localCfg.SyncStatus = "synced"
s.localDB.SaveConfiguration(localCfg)
}
slog.Info("configuration created on server",
"uuid", cfg.UUID,
"server_id", cfg.ID,
"version_no", payload.CurrentVersionNo,
"version_id", payload.CurrentVersionID,
"idempotency_key", payload.IdempotencyKey,
)
return nil
}
// pushConfigurationUpdate updates a configuration on the server
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
if err != nil {
return err
}
if isStale {
slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
return nil
}
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
// Create repository
configRepo := repository.NewConfigurationRepository(mariaDB)
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration owner: %w", err)
}
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration project: %w", err)
}
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
return fmt.Errorf("resolve configuration pricelist: %w", err)
}
// Ensure we have a server ID before updating
// If the payload doesn't have ID, get it from local configuration
if cfg.ID == 0 {
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err != nil {
return fmt.Errorf("getting local configuration: %w", err)
}
if localCfg.ServerID == nil {
// Configuration hasn't been synced yet, try to find it on server by UUID
serverCfg, err := configRepo.GetByUUID(cfg.UUID)
if err != nil {
return fmt.Errorf("configuration not yet synced to server: %w", err)
}
cfg.ID = serverCfg.ID
// Update local with server ID
serverID := serverCfg.ID
localCfg.ServerID = &serverID
s.localDB.SaveConfiguration(localCfg)
} else {
cfg.ID = *localCfg.ServerID
}
}
// Update on server
if err := configRepo.Update(&cfg); err != nil {
return fmt.Errorf("updating configuration on server: %w", err)
}
// Update local sync status
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
if err == nil {
localCfg.SyncStatus = "synced"
s.localDB.SaveConfiguration(localCfg)
}
slog.Info("configuration updated on server",
"uuid", cfg.UUID,
"version_no", payload.CurrentVersionNo,
"version_id", payload.CurrentVersionID,
"idempotency_key", payload.IdempotencyKey,
"operation", payload.Operation,
"conflict_policy", payload.ConflictPolicy,
)
return nil
}
func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configuration) error {
if cfg == nil {
return fmt.Errorf("configuration is nil")
}
ownerUsername := cfg.OwnerUsername
if ownerUsername == "" {
ownerUsername = s.localDB.GetDBUser()
cfg.OwnerUsername = ownerUsername
}
if ownerUsername == "" {
return fmt.Errorf("owner username is empty")
}
// user_id is legacy and no longer used for ownership in local-first mode.
// Keep it NULL on writes; ownership is represented by owner_username.
cfg.UserID = nil
cfg.AppVersion = appmeta.Version()
return nil
}
func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error {
if cfg == nil {
return fmt.Errorf("configuration is nil")
}
projectRepo := repository.NewProjectRepository(mariaDB)
if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" {
_, err := projectRepo.GetByUUID(*cfg.ProjectUUID)
if err == nil {
return nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
if localErr != nil {
return err
}
modelProject := localdb.LocalToProject(localProject)
if modelProject.OwnerUsername == "" {
modelProject.OwnerUsername = cfg.OwnerUsername
}
if createErr := projectRepo.Create(modelProject); createErr != nil {
return createErr
}
if modelProject.ID > 0 {
serverID := modelProject.ID
localProject.ServerID = &serverID
localProject.SyncStatus = "synced"
now := time.Now()
localProject.SyncedAt = &now
_ = s.localDB.SaveProject(localProject)
}
return nil
}
systemProject := &models.Project{}
err := mariaDB.
Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true).
Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC").
First(systemProject).Error
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
systemProject = &models.Project{
UUID: uuid.NewString(),
OwnerUsername: "",
Name: "Без проекта",
IsActive: true,
IsSystem: true,
}
if createErr := projectRepo.Create(systemProject); createErr != nil {
return createErr
}
}
cfg.ProjectUUID = &systemProject.UUID
return nil
}
func (s *Service) ensureConfigurationPricelist(mariaDB *gorm.DB, cfg *models.Configuration) error {
if cfg == nil {
return fmt.Errorf("configuration is nil")
}
pricelistRepo := repository.NewPricelistRepository(mariaDB)
if cfg.PricelistID != nil && *cfg.PricelistID > 0 {
if _, err := pricelistRepo.GetByID(*cfg.PricelistID); err == nil {
return nil
}
}
latest, err := pricelistRepo.GetLatestActive()
if err != nil {
cfg.PricelistID = nil
return nil
}
cfg.PricelistID = &latest.ID
return nil
}
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
return s.pushConfigurationUpdate(change)
}
func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error {
// Local deactivate is represented as the latest snapshot push.
return s.pushConfigurationUpdate(change)
}
func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error {
// Local reactivate is represented as the latest snapshot push.
return s.pushConfigurationUpdate(change)
}
func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) {
payload, err := decodeConfigurationChangePayload(change)
if err != nil {
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err)
}
eventVersionNo := payload.CurrentVersionNo
currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID)
if err != nil {
// Local config may be gone (e.g. stale queue item after delete/cleanup). Treat as no-op.
if errors.Is(err, gorm.ErrRecordNotFound) {
return payload, payload.Snapshot, true, nil
}
// create->deactivate race: config may no longer be active/visible locally, skip stale create.
if change.Operation == "create" {
return payload, payload.Snapshot, true, nil
}
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err)
}
if payload.ConflictPolicy == "" {
payload.ConflictPolicy = "last_write_wins"
}
if currentCfg.UUID != "" {
payload.Snapshot = currentCfg
if currentVersionID != "" {
payload.CurrentVersionID = currentVersionID
}
if currentVersionNo > 0 {
payload.CurrentVersionNo = currentVersionNo
}
payload.PricelistID = currentCfg.PricelistID
}
isStale := false
if eventVersionNo > 0 && currentVersionNo > eventVersionNo {
// Keep only latest intent in queue; older versions become no-op.
isStale = true
}
if !isStale && change.Operation == "create" {
localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID)
if getErr == nil && !localCfg.IsActive {
isStale = true
}
}
return payload, payload.Snapshot, isStale, nil
}
func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) {
var payload ConfigurationChangePayload
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" {
if payload.Operation == "" {
payload.Operation = change.Operation
}
return payload, nil
}
// Backward compatibility: legacy queue stored raw models.Configuration JSON.
var cfg models.Configuration
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err)
}
return ConfigurationChangePayload{
EventID: "",
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
ConfigurationUUID: cfg.UUID,
ProjectUUID: cfg.ProjectUUID,
PricelistID: cfg.PricelistID,
Operation: change.Operation,
ConflictPolicy: "last_write_wins",
Snapshot: cfg,
}, nil
}
func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) {
localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID)
if err != nil {
return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err)
}
cfg := *localdb.LocalToConfiguration(localCfg)
currentVersionID := ""
if localCfg.CurrentVersionID != nil {
currentVersionID = *localCfg.CurrentVersionID
}
currentVersionNo := 0
if currentVersionID != "" {
var version localdb.LocalConfigurationVersion
err = s.localDB.DB().
Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID).
First(&version).Error
if err == nil {
currentVersionNo = version.VersionNo
}
}
if currentVersionNo == 0 {
var latest localdb.LocalConfigurationVersion
err = s.localDB.DB().
Where("configuration_uuid = ?", configurationUUID).
Order("version_no DESC").
First(&latest).Error
if err == nil {
currentVersionNo = latest.VersionNo
currentVersionID = latest.ID
}
}
if currentVersionNo == 0 {
return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID)
}
return cfg, currentVersionID, currentVersionNo, nil
}
// NOTE: prepared for future conflict resolution:
// when server starts storing version metadata, we can compare payload.CurrentVersionNo
// against remote version and branch into custom strategies. For now use last-write-wins.
// pushConfigurationDelete deletes a configuration from the server
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
// Get database connection
mariaDB, err := s.getDB()
if err != nil {
return fmt.Errorf("database not available: %w", err)
}
// Create repository
configRepo := repository.NewConfigurationRepository(mariaDB)
// Get the configuration from server by UUID to get the ID
cfg, err := configRepo.GetByUUID(change.EntityUUID)
if err != nil {
// Already deleted or not found, consider it successful
slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID)
return nil
}
// Delete from server
if err := configRepo.Delete(cfg.ID); err != nil {
return fmt.Errorf("deleting configuration from server: %w", err)
}
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
return nil
}
func (s *Service) getDB() (*gorm.DB, error) {
if s.directDB != nil {
return s.directDB, nil
}
if s.connMgr == nil {
return nil, ErrOffline
}
return s.connMgr.GetDB()
}
func (s *Service) getConnectionStatus() db.ConnectionStatus {
if s.directDB != nil {
return db.ConnectionStatus{IsConnected: true}
}
if s.connMgr == nil {
return db.ConnectionStatus{IsConnected: false}
}
return s.connMgr.GetStatus()
}