Root cause: Projects with duplicate (code, variant) pairs fail to sync due to unique constraint on server. Example: multiple "OPS-1934" projects with variant="Dell" where one already exists on server. Fixes: 1. Sync service now detects duplicate (code, variant) on server and links local project to existing server project instead of failing 2. Local repair checks for duplicate (code, variant) pairs and deduplicates by appending UUID suffix to variant 3. Modal now scrollable with fixed header/footer (max-h-90vh) This allows users to sync projects that were created offline with conflicting codes/variants without losing data. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1474 lines
44 KiB
Go
1474 lines
44 KiB
Go
package sync
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
|
|
"git.mchus.pro/mchus/quoteforge/internal/db"
|
|
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
|
"git.mchus.pro/mchus/quoteforge/internal/models"
|
|
"git.mchus.pro/mchus/quoteforge/internal/repository"
|
|
"github.com/google/uuid"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
var ErrOffline = errors.New("database is offline")
|
|
|
|
// Service handles synchronization between MariaDB and local SQLite
|
|
type Service struct {
|
|
connMgr *db.ConnectionManager
|
|
localDB *localdb.LocalDB
|
|
directDB *gorm.DB
|
|
}
|
|
|
|
// NewService creates a new sync service
|
|
func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service {
|
|
return &Service{
|
|
connMgr: connMgr,
|
|
localDB: localDB,
|
|
}
|
|
}
|
|
|
|
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
|
|
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
|
|
return &Service{
|
|
localDB: localDB,
|
|
directDB: mariaDB,
|
|
}
|
|
}
|
|
|
|
// SyncStatus represents the current sync status
|
|
type SyncStatus struct {
|
|
LastSyncAt *time.Time `json:"last_sync_at"`
|
|
ServerPricelists int `json:"server_pricelists"`
|
|
LocalPricelists int `json:"local_pricelists"`
|
|
NeedsSync bool `json:"needs_sync"`
|
|
}
|
|
|
|
type UserSyncStatus struct {
|
|
Username string `json:"username"`
|
|
LastSyncAt time.Time `json:"last_sync_at"`
|
|
AppVersion string `json:"app_version,omitempty"`
|
|
IsOnline bool `json:"is_online"`
|
|
}
|
|
|
|
// ConfigImportResult represents server->local configuration import stats.
|
|
type ConfigImportResult struct {
|
|
Imported int `json:"imported"`
|
|
Updated int `json:"updated"`
|
|
Skipped int `json:"skipped"`
|
|
}
|
|
|
|
// ProjectImportResult represents server->local project import stats.
|
|
type ProjectImportResult struct {
|
|
Imported int `json:"imported"`
|
|
Updated int `json:"updated"`
|
|
Skipped int `json:"skipped"`
|
|
}
|
|
|
|
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
|
|
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
|
|
type ConfigurationChangePayload struct {
|
|
EventID string `json:"event_id"`
|
|
IdempotencyKey string `json:"idempotency_key"`
|
|
ConfigurationUUID string `json:"configuration_uuid"`
|
|
ProjectUUID *string `json:"project_uuid,omitempty"`
|
|
PricelistID *uint `json:"pricelist_id,omitempty"`
|
|
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
|
|
CurrentVersionID string `json:"current_version_id,omitempty"`
|
|
CurrentVersionNo int `json:"current_version_no,omitempty"`
|
|
ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins
|
|
Snapshot models.Configuration `json:"snapshot"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
CreatedBy *string `json:"created_by,omitempty"`
|
|
}
|
|
|
|
type ProjectChangePayload struct {
|
|
EventID string `json:"event_id"`
|
|
IdempotencyKey string `json:"idempotency_key"`
|
|
ProjectUUID string `json:"project_uuid"`
|
|
Operation string `json:"operation"`
|
|
Snapshot models.Project `json:"snapshot"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
|
|
// Existing local configs with pending local changes are skipped to avoid data loss.
|
|
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return nil, ErrOffline
|
|
}
|
|
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
result := &ConfigImportResult{}
|
|
|
|
offset := 0
|
|
const limit = 200
|
|
for {
|
|
serverConfigs, _, err := configRepo.ListAll(offset, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing server configurations: %w", err)
|
|
}
|
|
if len(serverConfigs) == 0 {
|
|
break
|
|
}
|
|
|
|
for i := range serverConfigs {
|
|
cfg := serverConfigs[i]
|
|
existing, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil, fmt.Errorf("getting local configuration %s: %w", cfg.UUID, err)
|
|
}
|
|
|
|
if existing != nil && err == nil && existing.SyncStatus == "pending" {
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
if existing != nil && err == nil && !existing.IsActive {
|
|
// Keep local deactivation sticky: do not resurrect hidden entries from server pull.
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
|
|
localCfg := localdb.ConfigurationToLocal(&cfg)
|
|
now := time.Now()
|
|
localCfg.SyncedAt = &now
|
|
localCfg.SyncStatus = "synced"
|
|
localCfg.UpdatedAt = now
|
|
|
|
if existing != nil && err == nil {
|
|
localCfg.ID = existing.ID
|
|
result.Updated++
|
|
} else {
|
|
result.Imported++
|
|
}
|
|
|
|
if err := s.localDB.SaveConfiguration(localCfg); err != nil {
|
|
return nil, fmt.Errorf("saving local configuration %s: %w", cfg.UUID, err)
|
|
}
|
|
}
|
|
|
|
offset += len(serverConfigs)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ImportProjectsToLocal imports projects from MariaDB into local SQLite.
|
|
// Existing local projects with pending local changes are skipped to avoid data loss.
|
|
func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) {
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return nil, ErrOffline
|
|
}
|
|
|
|
projectRepo := repository.NewProjectRepository(mariaDB)
|
|
result := &ProjectImportResult{}
|
|
|
|
offset := 0
|
|
const limit = 200
|
|
for {
|
|
serverProjects, _, err := projectRepo.List(offset, limit, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing server projects: %w", err)
|
|
}
|
|
if len(serverProjects) == 0 {
|
|
break
|
|
}
|
|
|
|
now := time.Now()
|
|
for i := range serverProjects {
|
|
project := serverProjects[i]
|
|
|
|
existing, getErr := s.localDB.GetProjectByUUID(project.UUID)
|
|
if getErr != nil && !errors.Is(getErr, gorm.ErrRecordNotFound) {
|
|
return nil, fmt.Errorf("getting local project %s: %w", project.UUID, getErr)
|
|
}
|
|
|
|
if existing != nil && getErr == nil {
|
|
// Keep unsynced local changes intact.
|
|
if existing.SyncStatus == "pending" {
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
|
|
existing.OwnerUsername = project.OwnerUsername
|
|
existing.Code = project.Code
|
|
existing.Name = project.Name
|
|
existing.TrackerURL = project.TrackerURL
|
|
existing.IsActive = project.IsActive
|
|
existing.IsSystem = project.IsSystem
|
|
existing.CreatedAt = project.CreatedAt
|
|
existing.UpdatedAt = project.UpdatedAt
|
|
serverID := project.ID
|
|
existing.ServerID = &serverID
|
|
existing.SyncStatus = "synced"
|
|
existing.SyncedAt = &now
|
|
|
|
if err := s.localDB.SaveProject(existing); err != nil {
|
|
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
|
|
}
|
|
result.Updated++
|
|
continue
|
|
}
|
|
|
|
localProject := localdb.ProjectToLocal(&project)
|
|
localProject.SyncStatus = "synced"
|
|
localProject.SyncedAt = &now
|
|
if err := s.localDB.SaveProject(localProject); err != nil {
|
|
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
|
|
}
|
|
result.Imported++
|
|
}
|
|
|
|
offset += len(serverProjects)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// GetStatus returns the current sync status
|
|
func (s *Service) GetStatus() (*SyncStatus, error) {
|
|
lastSync := s.localDB.GetLastSyncTime()
|
|
|
|
// Count server pricelists (only if already connected, don't reconnect)
|
|
serverCount := 0
|
|
connStatus := s.getConnectionStatus()
|
|
if connStatus.IsConnected {
|
|
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
activeCount, err := pricelistRepo.CountActive()
|
|
if err == nil {
|
|
serverCount = int(activeCount)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count local pricelists
|
|
localCount := s.localDB.CountLocalPricelists()
|
|
|
|
needsSync, _ := s.NeedSync()
|
|
|
|
return &SyncStatus{
|
|
LastSyncAt: lastSync,
|
|
ServerPricelists: serverCount,
|
|
LocalPricelists: int(localCount),
|
|
NeedsSync: needsSync,
|
|
}, nil
|
|
}
|
|
|
|
// NeedSync checks if synchronization is needed
|
|
// Returns true if there are new pricelists on server or last sync was >1 hour ago
|
|
func (s *Service) NeedSync() (bool, error) {
|
|
lastSync := s.localDB.GetLastSyncTime()
|
|
|
|
// If never synced, need sync
|
|
if lastSync == nil {
|
|
return true, nil
|
|
}
|
|
|
|
// If last sync was more than 1 hour ago, suggest sync
|
|
if time.Since(*lastSync) > time.Hour {
|
|
return true, nil
|
|
}
|
|
|
|
// Check if there are new pricelists on server (only if already connected)
|
|
connStatus := s.getConnectionStatus()
|
|
if !connStatus.IsConnected {
|
|
// If offline, can't check server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
// If offline, can't check server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
sources := []models.PricelistSource{
|
|
models.PricelistSourceEstimate,
|
|
models.PricelistSourceWarehouse,
|
|
models.PricelistSourceCompetitor,
|
|
}
|
|
for _, source := range sources {
|
|
latestServer, err := pricelistRepo.GetLatestActiveBySource(string(source))
|
|
if err != nil {
|
|
// No active pricelist for this source yet.
|
|
continue
|
|
}
|
|
|
|
latestLocal, err := s.localDB.GetLatestLocalPricelistBySource(string(source))
|
|
if err != nil {
|
|
// No local pricelist for an existing source on server.
|
|
return true, nil
|
|
}
|
|
|
|
// If server has newer pricelist for this source, need sync.
|
|
if latestServer.ID != latestLocal.ServerID {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// SyncPricelists synchronizes all active pricelists from server to local SQLite
|
|
func (s *Service) SyncPricelists() (int, error) {
|
|
slog.Info("starting pricelist sync")
|
|
if _, err := s.EnsureReadinessForSync(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
// Get active pricelists from server (up to 100)
|
|
serverPricelists, _, err := pricelistRepo.ListActive(0, 100)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting active server pricelists: %w", err)
|
|
}
|
|
serverPricelistIDs := make([]uint, 0, len(serverPricelists))
|
|
for i := range serverPricelists {
|
|
serverPricelistIDs = append(serverPricelistIDs, serverPricelists[i].ID)
|
|
}
|
|
|
|
synced := 0
|
|
for _, pl := range serverPricelists {
|
|
// Check if pricelist already exists locally
|
|
existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID)
|
|
if existing != nil {
|
|
continue
|
|
}
|
|
|
|
// Create local pricelist
|
|
localPL := &localdb.LocalPricelist{
|
|
ServerID: pl.ID,
|
|
Source: pl.Source,
|
|
Version: pl.Version,
|
|
Name: pl.Notification, // Using notification as name
|
|
CreatedAt: pl.CreatedAt,
|
|
SyncedAt: time.Now(),
|
|
IsUsed: false,
|
|
}
|
|
|
|
if err := s.localDB.SaveLocalPricelist(localPL); err != nil {
|
|
slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Sync items for the newly created pricelist
|
|
itemCount, err := s.SyncPricelistItems(localPL.ID)
|
|
if err != nil {
|
|
slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err)
|
|
// Continue even if items sync fails - we have the pricelist metadata
|
|
} else {
|
|
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
|
|
}
|
|
|
|
synced++
|
|
}
|
|
|
|
removed, err := s.localDB.DeleteUnusedLocalPricelistsMissingOnServer(serverPricelistIDs)
|
|
if err != nil {
|
|
slog.Warn("failed to cleanup stale local pricelists", "error", err)
|
|
} else if removed > 0 {
|
|
slog.Info("deleted stale local pricelists", "deleted", removed)
|
|
}
|
|
|
|
// Backfill lot_category for used pricelists (older local caches may miss the column values).
|
|
s.backfillUsedPricelistItemCategories(pricelistRepo, serverPricelistIDs)
|
|
|
|
// Update last sync time
|
|
s.localDB.SetLastSyncTime(time.Now())
|
|
s.RecordSyncHeartbeat()
|
|
|
|
slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists))
|
|
return synced, nil
|
|
}
|
|
|
|
func (s *Service) backfillUsedPricelistItemCategories(pricelistRepo *repository.PricelistRepository, activeServerPricelistIDs []uint) {
|
|
if s.localDB == nil || pricelistRepo == nil {
|
|
return
|
|
}
|
|
|
|
activeSet := make(map[uint]struct{}, len(activeServerPricelistIDs))
|
|
for _, id := range activeServerPricelistIDs {
|
|
activeSet[id] = struct{}{}
|
|
}
|
|
|
|
type row struct {
|
|
ID uint `gorm:"column:id"`
|
|
}
|
|
var usedRows []row
|
|
if err := s.localDB.DB().Raw(`
|
|
SELECT DISTINCT pricelist_id AS id
|
|
FROM local_configurations
|
|
WHERE is_active = 1 AND pricelist_id IS NOT NULL
|
|
UNION
|
|
SELECT DISTINCT warehouse_pricelist_id AS id
|
|
FROM local_configurations
|
|
WHERE is_active = 1 AND warehouse_pricelist_id IS NOT NULL
|
|
UNION
|
|
SELECT DISTINCT competitor_pricelist_id AS id
|
|
FROM local_configurations
|
|
WHERE is_active = 1 AND competitor_pricelist_id IS NOT NULL
|
|
`).Scan(&usedRows).Error; err != nil {
|
|
slog.Warn("pricelist category backfill: failed to list used pricelists", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, r := range usedRows {
|
|
serverID := r.ID
|
|
if serverID == 0 {
|
|
continue
|
|
}
|
|
if _, ok := activeSet[serverID]; !ok {
|
|
// Not present on server (or not active) - cannot backfill from remote.
|
|
continue
|
|
}
|
|
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverID)
|
|
if err != nil || localPL == nil {
|
|
continue
|
|
}
|
|
|
|
if s.localDB.CountLocalPricelistItems(localPL.ID) == 0 {
|
|
continue
|
|
}
|
|
|
|
missing, err := s.localDB.CountLocalPricelistItemsWithEmptyCategory(localPL.ID)
|
|
if err != nil {
|
|
slog.Warn("pricelist category backfill: failed to check local items", "server_id", serverID, "error", err)
|
|
continue
|
|
}
|
|
if missing == 0 {
|
|
continue
|
|
}
|
|
|
|
serverItems, _, err := pricelistRepo.GetItems(serverID, 0, 10000, "")
|
|
if err != nil {
|
|
slog.Warn("pricelist category backfill: failed to load server items", "server_id", serverID, "error", err)
|
|
continue
|
|
}
|
|
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
|
for i := range serverItems {
|
|
localItems[i] = *localdb.PricelistItemToLocal(&serverItems[i], localPL.ID)
|
|
}
|
|
|
|
if err := s.localDB.ReplaceLocalPricelistItems(localPL.ID, localItems); err != nil {
|
|
slog.Warn("pricelist category backfill: failed to replace local items", "server_id", serverID, "error", err)
|
|
continue
|
|
}
|
|
slog.Info("pricelist category backfill: refreshed local items", "server_id", serverID, "items", len(localItems))
|
|
}
|
|
}
|
|
|
|
// RecordSyncHeartbeat updates shared sync heartbeat for current DB user.
|
|
// Only users with write rights are expected to be able to update this table.
|
|
func (s *Service) RecordSyncHeartbeat() {
|
|
username := strings.TrimSpace(s.localDB.GetDBUser())
|
|
if username == "" {
|
|
return
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil || mariaDB == nil {
|
|
return
|
|
}
|
|
|
|
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
|
|
slog.Warn("sync heartbeat: failed to ensure table", "error", err)
|
|
return
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
if err := mariaDB.Exec(`
|
|
INSERT INTO qt_pricelist_sync_status (username, last_sync_at, updated_at, app_version)
|
|
VALUES (?, ?, ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
last_sync_at = VALUES(last_sync_at),
|
|
updated_at = VALUES(updated_at),
|
|
app_version = VALUES(app_version)
|
|
`, username, now, now, appmeta.Version()).Error; err != nil {
|
|
slog.Debug("sync heartbeat: skipped", "username", username, "error", err)
|
|
}
|
|
}
|
|
|
|
// ListUserSyncStatuses returns users who have recorded sync heartbeat.
|
|
func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyncStatus, error) {
|
|
mariaDB, err := s.getDB()
|
|
if err != nil || mariaDB == nil {
|
|
return nil, ErrOffline
|
|
}
|
|
|
|
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
|
|
return nil, fmt.Errorf("ensure sync status table: %w", err)
|
|
}
|
|
|
|
type row struct {
|
|
Username string `gorm:"column:username"`
|
|
LastSyncAt time.Time `gorm:"column:last_sync_at"`
|
|
AppVersion string `gorm:"column:app_version"`
|
|
}
|
|
var rows []row
|
|
if err := mariaDB.Raw(`
|
|
SELECT username, last_sync_at, COALESCE(app_version, '') AS app_version
|
|
FROM qt_pricelist_sync_status
|
|
ORDER BY last_sync_at DESC, username ASC
|
|
`).Scan(&rows).Error; err != nil {
|
|
return nil, fmt.Errorf("load sync status rows: %w", err)
|
|
}
|
|
|
|
activeUsers, err := s.listConnectedDBUsers(mariaDB)
|
|
if err != nil {
|
|
slog.Debug("sync status: failed to load connected DB users", "error", err)
|
|
activeUsers = map[string]struct{}{}
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers))
|
|
for i := range rows {
|
|
r := rows[i]
|
|
username := strings.TrimSpace(r.Username)
|
|
if username == "" {
|
|
continue
|
|
}
|
|
|
|
isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold
|
|
if _, connected := activeUsers[username]; connected {
|
|
isOnline = true
|
|
delete(activeUsers, username)
|
|
}
|
|
|
|
appVersion := strings.TrimSpace(r.AppVersion)
|
|
|
|
result = append(result, UserSyncStatus{
|
|
Username: username,
|
|
LastSyncAt: r.LastSyncAt,
|
|
AppVersion: appVersion,
|
|
IsOnline: isOnline,
|
|
})
|
|
}
|
|
|
|
for username := range activeUsers {
|
|
result = append(result, UserSyncStatus{
|
|
Username: username,
|
|
LastSyncAt: now,
|
|
AppVersion: "",
|
|
IsOnline: true,
|
|
})
|
|
}
|
|
|
|
sort.SliceStable(result, func(i, j int) bool {
|
|
if result[i].IsOnline != result[j].IsOnline {
|
|
return result[i].IsOnline
|
|
}
|
|
if result[i].LastSyncAt.Equal(result[j].LastSyncAt) {
|
|
return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username)
|
|
}
|
|
return result[i].LastSyncAt.After(result[j].LastSyncAt)
|
|
})
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) {
|
|
type processUserRow struct {
|
|
Username string `gorm:"column:username"`
|
|
}
|
|
|
|
var rows []processUserRow
|
|
if err := mariaDB.Raw(`
|
|
SELECT DISTINCT TRIM(USER) AS username
|
|
FROM information_schema.PROCESSLIST
|
|
WHERE COALESCE(TRIM(USER), '') <> ''
|
|
AND DB = DATABASE()
|
|
`).Scan(&rows).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
users := make(map[string]struct{}, len(rows))
|
|
for i := range rows {
|
|
username := strings.TrimSpace(rows[i].Username)
|
|
if username == "" {
|
|
continue
|
|
}
|
|
users[username] = struct{}{}
|
|
}
|
|
return users, nil
|
|
}
|
|
|
|
func ensureUserSyncStatusTable(db *gorm.DB) error {
|
|
// Check if table exists instead of trying to create (avoids permission issues)
|
|
if !tableExists(db, "qt_pricelist_sync_status") {
|
|
if err := db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status (
|
|
username VARCHAR(100) NOT NULL,
|
|
last_sync_at DATETIME NOT NULL,
|
|
updated_at DATETIME NOT NULL,
|
|
app_version VARCHAR(64) NULL,
|
|
PRIMARY KEY (username),
|
|
INDEX idx_qt_pricelist_sync_status_last_sync (last_sync_at)
|
|
)
|
|
`).Error; err != nil {
|
|
return fmt.Errorf("create qt_pricelist_sync_status table: %w", err)
|
|
}
|
|
}
|
|
|
|
// Backward compatibility for environments where table was created without app_version.
|
|
// Only try to add column if table exists.
|
|
if tableExists(db, "qt_pricelist_sync_status") {
|
|
if err := db.Exec(`
|
|
ALTER TABLE qt_pricelist_sync_status
|
|
ADD COLUMN IF NOT EXISTS app_version VARCHAR(64) NULL
|
|
`).Error; err != nil {
|
|
// Log but don't fail if alter fails (column might already exist)
|
|
slog.Debug("failed to add app_version column", "error", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncPricelistItems synchronizes items for a specific pricelist
|
|
func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
|
// Get local pricelist
|
|
localPL, err := s.localDB.GetLocalPricelistByID(localPricelistID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting local pricelist: %w", err)
|
|
}
|
|
|
|
// Check if items already exist
|
|
existingCount := s.localDB.CountLocalPricelistItems(localPricelistID)
|
|
if existingCount > 0 {
|
|
slog.Debug("pricelist items already synced", "pricelist_id", localPricelistID, "count", existingCount)
|
|
return int(existingCount), nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
// Get items from server
|
|
serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "")
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting server pricelist items: %w", err)
|
|
}
|
|
|
|
// Convert and save locally
|
|
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
|
for i, item := range serverItems {
|
|
localItems[i] = *localdb.PricelistItemToLocal(&item, localPricelistID)
|
|
}
|
|
|
|
if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil {
|
|
return 0, fmt.Errorf("saving local pricelist items: %w", err)
|
|
}
|
|
|
|
slog.Info("synced pricelist items", "pricelist_id", localPricelistID, "items", len(localItems))
|
|
return len(localItems), nil
|
|
}
|
|
|
|
// SyncPricelistItemsByServerID syncs items for a pricelist by its server ID
|
|
func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) {
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("local pricelist not found for server ID %d", serverPricelistID)
|
|
}
|
|
return s.SyncPricelistItems(localPL.ID)
|
|
}
|
|
|
|
// GetLocalPriceForLot returns the price for a lot from a local pricelist
|
|
func (s *Service) GetLocalPriceForLot(localPricelistID uint, lotName string) (float64, error) {
|
|
return s.localDB.GetLocalPriceForLot(localPricelistID, lotName)
|
|
}
|
|
|
|
// GetPricelistForOffline returns a pricelist suitable for offline use
|
|
// If items are not synced, it will sync them first
|
|
func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.LocalPricelist, error) {
|
|
// Ensure pricelist is synced
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
// Try to sync pricelists first
|
|
if _, err := s.SyncPricelists(); err != nil {
|
|
return nil, fmt.Errorf("syncing pricelists: %w", err)
|
|
}
|
|
|
|
// Try again
|
|
localPL, err = s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pricelist not found on server: %w", err)
|
|
}
|
|
}
|
|
|
|
// Ensure items are synced
|
|
if _, err := s.SyncPricelistItems(localPL.ID); err != nil {
|
|
return nil, fmt.Errorf("syncing pricelist items: %w", err)
|
|
}
|
|
|
|
return localPL, nil
|
|
}
|
|
|
|
// SyncPricelistsIfNeeded checks for new pricelists and syncs if needed
|
|
// This should be called before creating a new configuration when online
|
|
func (s *Service) SyncPricelistsIfNeeded() error {
|
|
needSync, err := s.NeedSync()
|
|
if err != nil {
|
|
slog.Warn("failed to check if sync needed", "error", err)
|
|
return nil // Don't fail on check error
|
|
}
|
|
|
|
if !needSync {
|
|
slog.Debug("pricelists are up to date, no sync needed")
|
|
return nil
|
|
}
|
|
|
|
slog.Info("new pricelists detected, syncing...")
|
|
_, err = s.SyncPricelists()
|
|
if err != nil {
|
|
return fmt.Errorf("syncing pricelists: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PushPendingChanges pushes all pending changes to the server
|
|
func (s *Service) PushPendingChanges() (int, error) {
|
|
if _, err := s.EnsureReadinessForSync(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
removed, err := s.localDB.PurgeOrphanConfigurationPendingChanges()
|
|
if err != nil {
|
|
slog.Warn("failed to purge orphan configuration pending changes", "error", err)
|
|
} else if removed > 0 {
|
|
slog.Info("purged orphan configuration pending changes", "removed", removed)
|
|
}
|
|
|
|
changes, err := s.localDB.GetPendingChanges()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting pending changes: %w", err)
|
|
}
|
|
|
|
if len(changes) == 0 {
|
|
slog.Debug("no pending changes to push")
|
|
return 0, nil
|
|
}
|
|
|
|
slog.Info("pushing pending changes", "count", len(changes))
|
|
pushed := 0
|
|
var syncedIDs []int64
|
|
sortedChanges := prioritizeProjectChanges(changes)
|
|
|
|
for _, change := range sortedChanges {
|
|
err := s.pushSingleChange(&change)
|
|
if err != nil {
|
|
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
|
// Increment attempts
|
|
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
|
|
continue
|
|
}
|
|
|
|
syncedIDs = append(syncedIDs, change.ID)
|
|
pushed++
|
|
}
|
|
|
|
// Mark synced changes as complete by deleting them
|
|
if len(syncedIDs) > 0 {
|
|
if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil {
|
|
slog.Error("failed to mark changes as synced", "error", err)
|
|
}
|
|
}
|
|
|
|
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
|
|
return pushed, nil
|
|
}
|
|
|
|
// pushSingleChange pushes a single pending change to the server
|
|
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
|
switch change.EntityType {
|
|
case "project":
|
|
return s.pushProjectChange(change)
|
|
case "configuration":
|
|
return s.pushConfigurationChange(change)
|
|
default:
|
|
return fmt.Errorf("unknown entity type: %s", change.EntityType)
|
|
}
|
|
}
|
|
|
|
func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange {
|
|
if len(changes) < 2 {
|
|
return changes
|
|
}
|
|
|
|
projectChanges := make([]localdb.PendingChange, 0, len(changes))
|
|
otherChanges := make([]localdb.PendingChange, 0, len(changes))
|
|
for _, change := range changes {
|
|
if change.EntityType == "project" {
|
|
projectChanges = append(projectChanges, change)
|
|
continue
|
|
}
|
|
otherChanges = append(otherChanges, change)
|
|
}
|
|
|
|
sorted := make([]localdb.PendingChange, 0, len(changes))
|
|
sorted = append(sorted, projectChanges...)
|
|
sorted = append(sorted, otherChanges...)
|
|
return sorted
|
|
}
|
|
|
|
func (s *Service) pushProjectChange(change *localdb.PendingChange) error {
|
|
payload, err := decodeProjectChangePayload(change)
|
|
if err != nil {
|
|
return fmt.Errorf("decode project payload: %w", err)
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
projectRepo := repository.NewProjectRepository(mariaDB)
|
|
project := payload.Snapshot
|
|
project.UUID = payload.ProjectUUID
|
|
if strings.TrimSpace(project.Code) == "" {
|
|
project.Code = strings.TrimSpace(derefString(project.Name))
|
|
if project.Code == "" {
|
|
project.Code = project.UUID
|
|
}
|
|
}
|
|
|
|
// Try upsert by UUID first
|
|
err = projectRepo.UpsertByUUID(&project)
|
|
if err != nil {
|
|
// Check if it's a duplicate (code, variant) constraint violation
|
|
// In this case, find existing project with same (code, variant) and link to it
|
|
var existing models.Project
|
|
lookupErr := mariaDB.Where("code = ? AND variant = ?", project.Code, project.Variant).First(&existing).Error
|
|
if lookupErr == nil {
|
|
// Found duplicate - link local project to existing server project
|
|
slog.Info("project duplicate found, linking to existing",
|
|
"local_uuid", project.UUID,
|
|
"server_uuid", existing.UUID,
|
|
"server_id", existing.ID,
|
|
"code", project.Code,
|
|
"variant", project.Variant)
|
|
project.ID = existing.ID
|
|
} else {
|
|
// Not a duplicate issue, return original error
|
|
return fmt.Errorf("upsert project on server: %w", err)
|
|
}
|
|
}
|
|
|
|
// Update local project with server ID
|
|
localProject, localErr := s.localDB.GetProjectByUUID(project.UUID)
|
|
if localErr == nil {
|
|
if project.ID > 0 {
|
|
serverID := project.ID
|
|
localProject.ServerID = &serverID
|
|
}
|
|
localProject.SyncStatus = "synced"
|
|
now := time.Now()
|
|
localProject.SyncedAt = &now
|
|
_ = s.localDB.SaveProject(localProject)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func derefString(value *string) string {
|
|
if value == nil {
|
|
return ""
|
|
}
|
|
return *value
|
|
}
|
|
|
|
func ptrString(value string) *string {
|
|
return &value
|
|
}
|
|
|
|
func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) {
|
|
var payload ProjectChangePayload
|
|
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" {
|
|
if payload.Operation == "" {
|
|
payload.Operation = change.Operation
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
var project models.Project
|
|
if err := json.Unmarshal([]byte(change.Payload), &project); err != nil {
|
|
return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err)
|
|
}
|
|
|
|
return ProjectChangePayload{
|
|
ProjectUUID: project.UUID,
|
|
Operation: change.Operation,
|
|
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation),
|
|
Snapshot: project,
|
|
}, nil
|
|
}
|
|
|
|
// pushConfigurationChange pushes a configuration change to the server
|
|
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
|
switch change.Operation {
|
|
case "create":
|
|
return s.pushConfigurationCreate(change)
|
|
case "update":
|
|
return s.pushConfigurationUpdate(change)
|
|
case "rollback":
|
|
return s.pushConfigurationRollback(change)
|
|
case "deactivate":
|
|
return s.pushConfigurationDeactivate(change)
|
|
case "reactivate":
|
|
return s.pushConfigurationReactivate(change)
|
|
case "delete":
|
|
return s.pushConfigurationDelete(change)
|
|
default:
|
|
return fmt.Errorf("unknown operation: %s", change.Operation)
|
|
}
|
|
}
|
|
|
|
// pushConfigurationCreate creates a configuration on the server
|
|
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
|
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isStale {
|
|
slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
|
|
return nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration owner: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration project: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration pricelist: %w", err)
|
|
}
|
|
|
|
// Create on server
|
|
if err := configRepo.Create(&cfg); err != nil {
|
|
// Idempotency fallback: configuration may already be created remotely.
|
|
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
|
|
if getErr != nil {
|
|
return fmt.Errorf("creating configuration on server: %w", err)
|
|
}
|
|
cfg.ID = serverCfg.ID
|
|
if updateErr := configRepo.Update(&cfg); updateErr != nil {
|
|
return fmt.Errorf("create fallback update on server: %w", updateErr)
|
|
}
|
|
}
|
|
|
|
// Update local configuration with server ID
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err == nil {
|
|
serverID := cfg.ID
|
|
localCfg.ServerID = &serverID
|
|
localCfg.SyncStatus = "synced"
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
}
|
|
|
|
slog.Info("configuration created on server",
|
|
"uuid", cfg.UUID,
|
|
"server_id", cfg.ID,
|
|
"version_no", payload.CurrentVersionNo,
|
|
"version_id", payload.CurrentVersionID,
|
|
"idempotency_key", payload.IdempotencyKey,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// pushConfigurationUpdate updates a configuration on the server
|
|
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
|
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isStale {
|
|
slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
|
|
return nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration owner: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration project: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration pricelist: %w", err)
|
|
}
|
|
|
|
// Ensure we have a server ID before updating
|
|
// If the payload doesn't have ID, get it from local configuration
|
|
if cfg.ID == 0 {
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err != nil {
|
|
return fmt.Errorf("getting local configuration: %w", err)
|
|
}
|
|
|
|
if localCfg.ServerID == nil {
|
|
// Configuration hasn't been synced yet, try to find it on server by UUID.
|
|
// If not found (e.g. stale create was skipped), create it from current snapshot.
|
|
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
|
|
if getErr != nil {
|
|
if !errors.Is(getErr, gorm.ErrRecordNotFound) {
|
|
return fmt.Errorf("loading configuration from server: %w", getErr)
|
|
}
|
|
if createErr := configRepo.Create(&cfg); createErr != nil {
|
|
// Idempotency fallback: configuration may have been created concurrently.
|
|
existing, existingErr := configRepo.GetByUUID(cfg.UUID)
|
|
if existingErr != nil {
|
|
return fmt.Errorf("creating missing configuration on server: %w", createErr)
|
|
}
|
|
cfg.ID = existing.ID
|
|
}
|
|
if cfg.ID == 0 {
|
|
existing, existingErr := configRepo.GetByUUID(cfg.UUID)
|
|
if existingErr != nil {
|
|
return fmt.Errorf("loading created configuration from server: %w", existingErr)
|
|
}
|
|
cfg.ID = existing.ID
|
|
}
|
|
} else {
|
|
cfg.ID = serverCfg.ID
|
|
}
|
|
|
|
// Update local with server ID
|
|
serverID := cfg.ID
|
|
localCfg.ServerID = &serverID
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
} else {
|
|
cfg.ID = *localCfg.ServerID
|
|
}
|
|
}
|
|
|
|
// Update on server
|
|
if err := configRepo.Update(&cfg); err != nil {
|
|
return fmt.Errorf("updating configuration on server: %w", err)
|
|
}
|
|
|
|
// Update local sync status
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err == nil {
|
|
localCfg.SyncStatus = "synced"
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
}
|
|
|
|
slog.Info("configuration updated on server",
|
|
"uuid", cfg.UUID,
|
|
"version_no", payload.CurrentVersionNo,
|
|
"version_id", payload.CurrentVersionID,
|
|
"idempotency_key", payload.IdempotencyKey,
|
|
"operation", payload.Operation,
|
|
"conflict_policy", payload.ConflictPolicy,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("configuration is nil")
|
|
}
|
|
|
|
ownerUsername := cfg.OwnerUsername
|
|
if ownerUsername == "" {
|
|
ownerUsername = s.localDB.GetDBUser()
|
|
cfg.OwnerUsername = ownerUsername
|
|
}
|
|
if ownerUsername == "" {
|
|
return fmt.Errorf("owner username is empty")
|
|
}
|
|
|
|
// user_id is legacy and no longer used for ownership in local-first mode.
|
|
// Keep it NULL on writes; ownership is represented by owner_username.
|
|
cfg.UserID = nil
|
|
cfg.AppVersion = appmeta.Version()
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("configuration is nil")
|
|
}
|
|
|
|
projectRepo := repository.NewProjectRepository(mariaDB)
|
|
|
|
if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" {
|
|
_, err := projectRepo.GetByUUID(*cfg.ProjectUUID)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
|
|
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
|
|
if localErr != nil {
|
|
return err
|
|
}
|
|
modelProject := localdb.LocalToProject(localProject)
|
|
if modelProject.OwnerUsername == "" {
|
|
modelProject.OwnerUsername = cfg.OwnerUsername
|
|
}
|
|
if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil {
|
|
return createErr
|
|
}
|
|
if modelProject.ID > 0 {
|
|
serverID := modelProject.ID
|
|
localProject.ServerID = &serverID
|
|
localProject.SyncStatus = "synced"
|
|
now := time.Now()
|
|
localProject.SyncedAt = &now
|
|
_ = s.localDB.SaveProject(localProject)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
systemProject := &models.Project{}
|
|
err := mariaDB.
|
|
Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true).
|
|
Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC").
|
|
First(systemProject).Error
|
|
if err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
systemProject = &models.Project{
|
|
UUID: uuid.NewString(),
|
|
OwnerUsername: "",
|
|
Code: "Без проекта",
|
|
Name: ptrString("Без проекта"),
|
|
IsActive: true,
|
|
IsSystem: true,
|
|
}
|
|
if createErr := projectRepo.Create(systemProject); createErr != nil {
|
|
return createErr
|
|
}
|
|
}
|
|
|
|
cfg.ProjectUUID = &systemProject.UUID
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ensureConfigurationPricelist(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("configuration is nil")
|
|
}
|
|
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
if cfg.PricelistID != nil && *cfg.PricelistID > 0 {
|
|
if _, err := pricelistRepo.GetByID(*cfg.PricelistID); err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
latest, err := pricelistRepo.GetLatestActive()
|
|
if err != nil {
|
|
cfg.PricelistID = nil
|
|
return nil
|
|
}
|
|
|
|
cfg.PricelistID = &latest.ID
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
|
|
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
|
|
return s.pushConfigurationUpdate(change)
|
|
}
|
|
|
|
func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error {
|
|
// Local deactivate is represented as the latest snapshot push.
|
|
return s.pushConfigurationUpdate(change)
|
|
}
|
|
|
|
func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error {
|
|
// Local reactivate is represented as the latest snapshot push.
|
|
return s.pushConfigurationUpdate(change)
|
|
}
|
|
|
|
func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) {
|
|
payload, err := decodeConfigurationChangePayload(change)
|
|
if err != nil {
|
|
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err)
|
|
}
|
|
eventVersionNo := payload.CurrentVersionNo
|
|
|
|
currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID)
|
|
if err != nil {
|
|
// Local config may be gone (e.g. stale queue item after delete/cleanup). Treat as no-op.
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return payload, payload.Snapshot, true, nil
|
|
}
|
|
// create->deactivate race: config may no longer be active/visible locally, skip stale create.
|
|
if change.Operation == "create" {
|
|
return payload, payload.Snapshot, true, nil
|
|
}
|
|
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err)
|
|
}
|
|
|
|
if payload.ConflictPolicy == "" {
|
|
payload.ConflictPolicy = "last_write_wins"
|
|
}
|
|
|
|
if currentCfg.UUID != "" {
|
|
payload.Snapshot = currentCfg
|
|
if currentVersionID != "" {
|
|
payload.CurrentVersionID = currentVersionID
|
|
}
|
|
if currentVersionNo > 0 {
|
|
payload.CurrentVersionNo = currentVersionNo
|
|
}
|
|
payload.PricelistID = currentCfg.PricelistID
|
|
}
|
|
|
|
isStale := false
|
|
if eventVersionNo > 0 && currentVersionNo > eventVersionNo {
|
|
// Keep only latest intent in queue; older versions become no-op.
|
|
isStale = true
|
|
}
|
|
if !isStale && change.Operation == "create" {
|
|
localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID)
|
|
if getErr == nil && !localCfg.IsActive {
|
|
isStale = true
|
|
}
|
|
}
|
|
|
|
return payload, payload.Snapshot, isStale, nil
|
|
}
|
|
|
|
func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) {
|
|
var payload ConfigurationChangePayload
|
|
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" {
|
|
if payload.Operation == "" {
|
|
payload.Operation = change.Operation
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
// Backward compatibility: legacy queue stored raw models.Configuration JSON.
|
|
var cfg models.Configuration
|
|
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
|
return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err)
|
|
}
|
|
|
|
return ConfigurationChangePayload{
|
|
EventID: "",
|
|
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
|
|
ConfigurationUUID: cfg.UUID,
|
|
ProjectUUID: cfg.ProjectUUID,
|
|
PricelistID: cfg.PricelistID,
|
|
Operation: change.Operation,
|
|
ConflictPolicy: "last_write_wins",
|
|
Snapshot: cfg,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) {
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID)
|
|
if err != nil {
|
|
return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err)
|
|
}
|
|
|
|
cfg := *localdb.LocalToConfiguration(localCfg)
|
|
|
|
currentVersionID := ""
|
|
if localCfg.CurrentVersionID != nil {
|
|
currentVersionID = *localCfg.CurrentVersionID
|
|
}
|
|
|
|
currentVersionNo := 0
|
|
if currentVersionID != "" {
|
|
var version localdb.LocalConfigurationVersion
|
|
err = s.localDB.DB().
|
|
Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID).
|
|
First(&version).Error
|
|
if err == nil {
|
|
currentVersionNo = version.VersionNo
|
|
}
|
|
}
|
|
|
|
if currentVersionNo == 0 {
|
|
var latest localdb.LocalConfigurationVersion
|
|
err = s.localDB.DB().
|
|
Where("configuration_uuid = ?", configurationUUID).
|
|
Order("version_no DESC").
|
|
First(&latest).Error
|
|
if err == nil {
|
|
currentVersionNo = latest.VersionNo
|
|
currentVersionID = latest.ID
|
|
}
|
|
}
|
|
|
|
if currentVersionNo == 0 {
|
|
if err := s.repairMissingConfigurationVersion(localCfg); err != nil {
|
|
return models.Configuration{}, "", 0, fmt.Errorf("repair missing configuration version: %w", err)
|
|
}
|
|
var latest localdb.LocalConfigurationVersion
|
|
err = s.localDB.DB().
|
|
Where("configuration_uuid = ?", configurationUUID).
|
|
Order("version_no DESC").
|
|
First(&latest).Error
|
|
if err == nil {
|
|
currentVersionNo = latest.VersionNo
|
|
currentVersionID = latest.ID
|
|
}
|
|
}
|
|
|
|
if currentVersionNo == 0 {
|
|
return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID)
|
|
}
|
|
|
|
return cfg, currentVersionID, currentVersionNo, nil
|
|
}
|
|
|
|
func (s *Service) repairMissingConfigurationVersion(localCfg *localdb.LocalConfiguration) error {
|
|
if localCfg == nil {
|
|
return fmt.Errorf("local configuration is nil")
|
|
}
|
|
|
|
return s.localDB.DB().Transaction(func(tx *gorm.DB) error {
|
|
var cfg localdb.LocalConfiguration
|
|
if err := tx.Where("uuid = ?", localCfg.UUID).First(&cfg).Error; err != nil {
|
|
return fmt.Errorf("load local configuration: %w", err)
|
|
}
|
|
|
|
// If versions exist, just make sure current_version_id is set.
|
|
var latest localdb.LocalConfigurationVersion
|
|
if err := tx.Where("configuration_uuid = ?", cfg.UUID).
|
|
Order("version_no DESC").
|
|
First(&latest).Error; err == nil {
|
|
if cfg.CurrentVersionID == nil || *cfg.CurrentVersionID == "" {
|
|
if err := tx.Model(&localdb.LocalConfiguration{}).
|
|
Where("uuid = ?", cfg.UUID).
|
|
Update("current_version_id", latest.ID).Error; err != nil {
|
|
return fmt.Errorf("set current version id: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return fmt.Errorf("load latest version: %w", err)
|
|
}
|
|
|
|
snapshot, err := localdb.BuildConfigurationSnapshot(&cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("build configuration snapshot: %w", err)
|
|
}
|
|
|
|
note := "Auto-repaired missing local version"
|
|
version := localdb.LocalConfigurationVersion{
|
|
ID: uuid.NewString(),
|
|
ConfigurationUUID: cfg.UUID,
|
|
VersionNo: 1,
|
|
Data: snapshot,
|
|
ChangeNote: ¬e,
|
|
AppVersion: appmeta.Version(),
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
if err := tx.Create(&version).Error; err != nil {
|
|
return fmt.Errorf("create initial version: %w", err)
|
|
}
|
|
if err := tx.Model(&localdb.LocalConfiguration{}).
|
|
Where("uuid = ?", cfg.UUID).
|
|
Update("current_version_id", version.ID).Error; err != nil {
|
|
return fmt.Errorf("set current version id: %w", err)
|
|
}
|
|
|
|
slog.Warn("repaired missing local configuration version", "uuid", cfg.UUID, "version_no", version.VersionNo)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// NOTE: prepared for future conflict resolution:
|
|
// when server starts storing version metadata, we can compare payload.CurrentVersionNo
|
|
// against remote version and branch into custom strategies. For now use last-write-wins.
|
|
|
|
// pushConfigurationDelete deletes a configuration from the server
|
|
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
|
|
// Get the configuration from server by UUID to get the ID
|
|
cfg, err := configRepo.GetByUUID(change.EntityUUID)
|
|
if err != nil {
|
|
// Already deleted or not found, consider it successful
|
|
slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID)
|
|
return nil
|
|
}
|
|
|
|
// Delete from server
|
|
if err := configRepo.Delete(cfg.ID); err != nil {
|
|
return fmt.Errorf("deleting configuration from server: %w", err)
|
|
}
|
|
|
|
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) getDB() (*gorm.DB, error) {
|
|
if s.directDB != nil {
|
|
return s.directDB, nil
|
|
}
|
|
if s.connMgr == nil {
|
|
return nil, ErrOffline
|
|
}
|
|
return s.connMgr.GetDB()
|
|
}
|
|
|
|
func (s *Service) getConnectionStatus() db.ConnectionStatus {
|
|
if s.directDB != nil {
|
|
return db.ConnectionStatus{IsConnected: true}
|
|
}
|
|
if s.connMgr == nil {
|
|
return db.ConnectionStatus{IsConnected: false}
|
|
}
|
|
return s.connMgr.GetStatus()
|
|
}
|