1486 lines
45 KiB
Go
1486 lines
45 KiB
Go
package sync
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
|
|
"git.mchus.pro/mchus/quoteforge/internal/db"
|
|
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
|
"git.mchus.pro/mchus/quoteforge/internal/models"
|
|
"git.mchus.pro/mchus/quoteforge/internal/repository"
|
|
"github.com/google/uuid"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
var ErrOffline = errors.New("database is offline")
|
|
|
|
// Service handles synchronization between MariaDB and local SQLite
|
|
type Service struct {
|
|
connMgr *db.ConnectionManager
|
|
localDB *localdb.LocalDB
|
|
directDB *gorm.DB
|
|
}
|
|
|
|
// NewService creates a new sync service
|
|
func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service {
|
|
return &Service{
|
|
connMgr: connMgr,
|
|
localDB: localDB,
|
|
}
|
|
}
|
|
|
|
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
|
|
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
|
|
return &Service{
|
|
localDB: localDB,
|
|
directDB: mariaDB,
|
|
}
|
|
}
|
|
|
|
// SyncStatus represents the current sync status
|
|
type SyncStatus struct {
|
|
LastSyncAt *time.Time `json:"last_sync_at"`
|
|
ServerPricelists int `json:"server_pricelists"`
|
|
LocalPricelists int `json:"local_pricelists"`
|
|
NeedsSync bool `json:"needs_sync"`
|
|
}
|
|
|
|
type UserSyncStatus struct {
|
|
Username string `json:"username"`
|
|
LastSyncAt time.Time `json:"last_sync_at"`
|
|
AppVersion string `json:"app_version,omitempty"`
|
|
IsOnline bool `json:"is_online"`
|
|
}
|
|
|
|
// ConfigImportResult represents server->local configuration import stats.
|
|
type ConfigImportResult struct {
|
|
Imported int `json:"imported"`
|
|
Updated int `json:"updated"`
|
|
Skipped int `json:"skipped"`
|
|
}
|
|
|
|
// ProjectImportResult represents server->local project import stats.
|
|
type ProjectImportResult struct {
|
|
Imported int `json:"imported"`
|
|
Updated int `json:"updated"`
|
|
Skipped int `json:"skipped"`
|
|
}
|
|
|
|
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
|
|
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
|
|
type ConfigurationChangePayload struct {
|
|
EventID string `json:"event_id"`
|
|
IdempotencyKey string `json:"idempotency_key"`
|
|
ConfigurationUUID string `json:"configuration_uuid"`
|
|
ProjectUUID *string `json:"project_uuid,omitempty"`
|
|
PricelistID *uint `json:"pricelist_id,omitempty"`
|
|
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
|
|
CurrentVersionID string `json:"current_version_id,omitempty"`
|
|
CurrentVersionNo int `json:"current_version_no,omitempty"`
|
|
ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins
|
|
Snapshot models.Configuration `json:"snapshot"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
CreatedBy *string `json:"created_by,omitempty"`
|
|
}
|
|
|
|
type ProjectChangePayload struct {
|
|
EventID string `json:"event_id"`
|
|
IdempotencyKey string `json:"idempotency_key"`
|
|
ProjectUUID string `json:"project_uuid"`
|
|
Operation string `json:"operation"`
|
|
Snapshot models.Project `json:"snapshot"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
|
|
// Existing local configs with pending local changes are skipped to avoid data loss.
|
|
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return nil, ErrOffline
|
|
}
|
|
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
result := &ConfigImportResult{}
|
|
|
|
offset := 0
|
|
const limit = 200
|
|
for {
|
|
serverConfigs, _, err := configRepo.ListAll(offset, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing server configurations: %w", err)
|
|
}
|
|
if len(serverConfigs) == 0 {
|
|
break
|
|
}
|
|
|
|
for i := range serverConfigs {
|
|
cfg := serverConfigs[i]
|
|
existing, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return nil, fmt.Errorf("getting local configuration %s: %w", cfg.UUID, err)
|
|
}
|
|
|
|
if existing != nil && err == nil && existing.SyncStatus == "pending" {
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
if existing != nil && err == nil && !existing.IsActive {
|
|
// Keep local deactivation sticky: do not resurrect hidden entries from server pull.
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
|
|
localCfg := localdb.ConfigurationToLocal(&cfg)
|
|
now := time.Now()
|
|
localCfg.SyncedAt = &now
|
|
localCfg.SyncStatus = "synced"
|
|
localCfg.UpdatedAt = now
|
|
|
|
if existing != nil && err == nil {
|
|
localCfg.ID = existing.ID
|
|
if localCfg.Line <= 0 && existing.Line > 0 {
|
|
localCfg.Line = existing.Line
|
|
}
|
|
result.Updated++
|
|
} else {
|
|
result.Imported++
|
|
}
|
|
|
|
if err := s.localDB.SaveConfiguration(localCfg); err != nil {
|
|
return nil, fmt.Errorf("saving local configuration %s: %w", cfg.UUID, err)
|
|
}
|
|
}
|
|
|
|
offset += len(serverConfigs)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// ImportProjectsToLocal imports projects from MariaDB into local SQLite.
|
|
// Existing local projects with pending local changes are skipped to avoid data loss.
|
|
func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) {
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return nil, ErrOffline
|
|
}
|
|
|
|
projectRepo := repository.NewProjectRepository(mariaDB)
|
|
result := &ProjectImportResult{}
|
|
|
|
offset := 0
|
|
const limit = 200
|
|
for {
|
|
serverProjects, _, err := projectRepo.List(offset, limit, true)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("listing server projects: %w", err)
|
|
}
|
|
if len(serverProjects) == 0 {
|
|
break
|
|
}
|
|
|
|
now := time.Now()
|
|
for i := range serverProjects {
|
|
project := serverProjects[i]
|
|
|
|
existing, getErr := s.localDB.GetProjectByUUID(project.UUID)
|
|
if getErr != nil && !errors.Is(getErr, gorm.ErrRecordNotFound) {
|
|
return nil, fmt.Errorf("getting local project %s: %w", project.UUID, getErr)
|
|
}
|
|
|
|
if existing != nil && getErr == nil {
|
|
// Keep unsynced local changes intact.
|
|
if existing.SyncStatus == "pending" {
|
|
result.Skipped++
|
|
continue
|
|
}
|
|
|
|
existing.OwnerUsername = project.OwnerUsername
|
|
existing.Code = project.Code
|
|
existing.Name = project.Name
|
|
existing.TrackerURL = project.TrackerURL
|
|
existing.IsActive = project.IsActive
|
|
existing.IsSystem = project.IsSystem
|
|
existing.CreatedAt = project.CreatedAt
|
|
existing.UpdatedAt = project.UpdatedAt
|
|
serverID := project.ID
|
|
existing.ServerID = &serverID
|
|
existing.SyncStatus = "synced"
|
|
existing.SyncedAt = &now
|
|
|
|
if err := s.localDB.SaveProject(existing); err != nil {
|
|
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
|
|
}
|
|
result.Updated++
|
|
continue
|
|
}
|
|
|
|
localProject := localdb.ProjectToLocal(&project)
|
|
localProject.SyncStatus = "synced"
|
|
localProject.SyncedAt = &now
|
|
if err := s.localDB.SaveProject(localProject); err != nil {
|
|
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
|
|
}
|
|
result.Imported++
|
|
}
|
|
|
|
offset += len(serverProjects)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// GetStatus returns the current sync status
|
|
func (s *Service) GetStatus() (*SyncStatus, error) {
|
|
lastSync := s.localDB.GetLastSyncTime()
|
|
|
|
// Count server pricelists (only if already connected, don't reconnect)
|
|
serverCount := 0
|
|
connStatus := s.getConnectionStatus()
|
|
if connStatus.IsConnected {
|
|
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
activeCount, err := pricelistRepo.CountActive()
|
|
if err == nil {
|
|
serverCount = int(activeCount)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count local pricelists
|
|
localCount := s.localDB.CountLocalPricelists()
|
|
|
|
needsSync, _ := s.NeedSync()
|
|
|
|
return &SyncStatus{
|
|
LastSyncAt: lastSync,
|
|
ServerPricelists: serverCount,
|
|
LocalPricelists: int(localCount),
|
|
NeedsSync: needsSync,
|
|
}, nil
|
|
}
|
|
|
|
// NeedSync checks if synchronization is needed
|
|
// Returns true if there are new pricelists on server or last sync was >1 hour ago
|
|
func (s *Service) NeedSync() (bool, error) {
|
|
lastSync := s.localDB.GetLastSyncTime()
|
|
|
|
// If never synced, need sync
|
|
if lastSync == nil {
|
|
return true, nil
|
|
}
|
|
|
|
// If last sync was more than 1 hour ago, suggest sync
|
|
if time.Since(*lastSync) > time.Hour {
|
|
return true, nil
|
|
}
|
|
|
|
// Check if there are new pricelists on server (only if already connected)
|
|
connStatus := s.getConnectionStatus()
|
|
if !connStatus.IsConnected {
|
|
// If offline, can't check server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
// If offline, can't check server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
sources := []models.PricelistSource{
|
|
models.PricelistSourceEstimate,
|
|
models.PricelistSourceWarehouse,
|
|
models.PricelistSourceCompetitor,
|
|
}
|
|
for _, source := range sources {
|
|
latestServer, err := pricelistRepo.GetLatestActiveBySource(string(source))
|
|
if err != nil {
|
|
// No active pricelist for this source yet.
|
|
continue
|
|
}
|
|
|
|
latestLocal, err := s.localDB.GetLatestLocalPricelistBySource(string(source))
|
|
if err != nil {
|
|
// No local pricelist for an existing source on server.
|
|
return true, nil
|
|
}
|
|
|
|
// If server has newer pricelist for this source, need sync.
|
|
if latestServer.ID != latestLocal.ServerID {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// SyncPricelists synchronizes all active pricelists from server to local SQLite
|
|
func (s *Service) SyncPricelists() (int, error) {
|
|
slog.Info("starting pricelist sync")
|
|
if _, err := s.EnsureReadinessForSync(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
// Get active pricelists from server (up to 100)
|
|
serverPricelists, _, err := pricelistRepo.ListActive(0, 100)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting active server pricelists: %w", err)
|
|
}
|
|
serverPricelistIDs := make([]uint, 0, len(serverPricelists))
|
|
for i := range serverPricelists {
|
|
serverPricelistIDs = append(serverPricelistIDs, serverPricelists[i].ID)
|
|
}
|
|
|
|
synced := 0
|
|
for _, pl := range serverPricelists {
|
|
// Check if pricelist already exists locally
|
|
existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID)
|
|
if existing != nil {
|
|
// Backfill items for legacy/partial local caches where only pricelist metadata exists.
|
|
if s.localDB.CountLocalPricelistItems(existing.ID) == 0 {
|
|
itemCount, err := s.SyncPricelistItems(existing.ID)
|
|
if err != nil {
|
|
slog.Warn("failed to sync missing pricelist items for existing local pricelist", "version", pl.Version, "error", err)
|
|
} else {
|
|
slog.Info("synced missing pricelist items for existing local pricelist", "version", pl.Version, "items", itemCount)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Create local pricelist
|
|
localPL := &localdb.LocalPricelist{
|
|
ServerID: pl.ID,
|
|
Source: pl.Source,
|
|
Version: pl.Version,
|
|
Name: pl.Notification, // Using notification as name
|
|
CreatedAt: pl.CreatedAt,
|
|
SyncedAt: time.Now(),
|
|
IsUsed: false,
|
|
}
|
|
|
|
if err := s.localDB.SaveLocalPricelist(localPL); err != nil {
|
|
slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Sync items for the newly created pricelist
|
|
itemCount, err := s.SyncPricelistItems(localPL.ID)
|
|
if err != nil {
|
|
slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err)
|
|
// Continue even if items sync fails - we have the pricelist metadata
|
|
} else {
|
|
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
|
|
}
|
|
|
|
synced++
|
|
}
|
|
|
|
removed, err := s.localDB.DeleteUnusedLocalPricelistsMissingOnServer(serverPricelistIDs)
|
|
if err != nil {
|
|
slog.Warn("failed to cleanup stale local pricelists", "error", err)
|
|
} else if removed > 0 {
|
|
slog.Info("deleted stale local pricelists", "deleted", removed)
|
|
}
|
|
|
|
// Backfill lot_category for used pricelists (older local caches may miss the column values).
|
|
s.backfillUsedPricelistItemCategories(pricelistRepo, serverPricelistIDs)
|
|
|
|
// Update last sync time
|
|
s.localDB.SetLastSyncTime(time.Now())
|
|
s.RecordSyncHeartbeat()
|
|
|
|
slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists))
|
|
return synced, nil
|
|
}
|
|
|
|
func (s *Service) backfillUsedPricelistItemCategories(pricelistRepo *repository.PricelistRepository, activeServerPricelistIDs []uint) {
|
|
if s.localDB == nil || pricelistRepo == nil {
|
|
return
|
|
}
|
|
|
|
activeSet := make(map[uint]struct{}, len(activeServerPricelistIDs))
|
|
for _, id := range activeServerPricelistIDs {
|
|
activeSet[id] = struct{}{}
|
|
}
|
|
|
|
type row struct {
|
|
ID uint `gorm:"column:id"`
|
|
}
|
|
var usedRows []row
|
|
if err := s.localDB.DB().Raw(`
|
|
SELECT DISTINCT pricelist_id AS id
|
|
FROM local_configurations
|
|
WHERE is_active = 1 AND pricelist_id IS NOT NULL
|
|
UNION
|
|
SELECT DISTINCT warehouse_pricelist_id AS id
|
|
FROM local_configurations
|
|
WHERE is_active = 1 AND warehouse_pricelist_id IS NOT NULL
|
|
UNION
|
|
SELECT DISTINCT competitor_pricelist_id AS id
|
|
FROM local_configurations
|
|
WHERE is_active = 1 AND competitor_pricelist_id IS NOT NULL
|
|
`).Scan(&usedRows).Error; err != nil {
|
|
slog.Warn("pricelist category backfill: failed to list used pricelists", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, r := range usedRows {
|
|
serverID := r.ID
|
|
if serverID == 0 {
|
|
continue
|
|
}
|
|
if _, ok := activeSet[serverID]; !ok {
|
|
// Not present on server (or not active) - cannot backfill from remote.
|
|
continue
|
|
}
|
|
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverID)
|
|
if err != nil || localPL == nil {
|
|
continue
|
|
}
|
|
|
|
if s.localDB.CountLocalPricelistItems(localPL.ID) == 0 {
|
|
continue
|
|
}
|
|
|
|
missing, err := s.localDB.CountLocalPricelistItemsWithEmptyCategory(localPL.ID)
|
|
if err != nil {
|
|
slog.Warn("pricelist category backfill: failed to check local items", "server_id", serverID, "error", err)
|
|
continue
|
|
}
|
|
if missing == 0 {
|
|
continue
|
|
}
|
|
|
|
serverItems, _, err := pricelistRepo.GetItems(serverID, 0, 10000, "")
|
|
if err != nil {
|
|
slog.Warn("pricelist category backfill: failed to load server items", "server_id", serverID, "error", err)
|
|
continue
|
|
}
|
|
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
|
for i := range serverItems {
|
|
localItems[i] = *localdb.PricelistItemToLocal(&serverItems[i], localPL.ID)
|
|
}
|
|
|
|
if err := s.localDB.ReplaceLocalPricelistItems(localPL.ID, localItems); err != nil {
|
|
slog.Warn("pricelist category backfill: failed to replace local items", "server_id", serverID, "error", err)
|
|
continue
|
|
}
|
|
slog.Info("pricelist category backfill: refreshed local items", "server_id", serverID, "items", len(localItems))
|
|
}
|
|
}
|
|
|
|
// RecordSyncHeartbeat updates shared sync heartbeat for current DB user.
|
|
// Only users with write rights are expected to be able to update this table.
|
|
func (s *Service) RecordSyncHeartbeat() {
|
|
username := strings.TrimSpace(s.localDB.GetDBUser())
|
|
if username == "" {
|
|
return
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil || mariaDB == nil {
|
|
return
|
|
}
|
|
|
|
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
|
|
slog.Warn("sync heartbeat: failed to ensure table", "error", err)
|
|
return
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
if err := mariaDB.Exec(`
|
|
INSERT INTO qt_pricelist_sync_status (username, last_sync_at, updated_at, app_version)
|
|
VALUES (?, ?, ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
last_sync_at = VALUES(last_sync_at),
|
|
updated_at = VALUES(updated_at),
|
|
app_version = VALUES(app_version)
|
|
`, username, now, now, appmeta.Version()).Error; err != nil {
|
|
slog.Debug("sync heartbeat: skipped", "username", username, "error", err)
|
|
}
|
|
}
|
|
|
|
// ListUserSyncStatuses returns users who have recorded sync heartbeat.
|
|
func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyncStatus, error) {
|
|
mariaDB, err := s.getDB()
|
|
if err != nil || mariaDB == nil {
|
|
return nil, ErrOffline
|
|
}
|
|
|
|
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
|
|
return nil, fmt.Errorf("ensure sync status table: %w", err)
|
|
}
|
|
|
|
type row struct {
|
|
Username string `gorm:"column:username"`
|
|
LastSyncAt time.Time `gorm:"column:last_sync_at"`
|
|
AppVersion string `gorm:"column:app_version"`
|
|
}
|
|
var rows []row
|
|
if err := mariaDB.Raw(`
|
|
SELECT username, last_sync_at, COALESCE(app_version, '') AS app_version
|
|
FROM qt_pricelist_sync_status
|
|
ORDER BY last_sync_at DESC, username ASC
|
|
`).Scan(&rows).Error; err != nil {
|
|
return nil, fmt.Errorf("load sync status rows: %w", err)
|
|
}
|
|
|
|
activeUsers, err := s.listConnectedDBUsers(mariaDB)
|
|
if err != nil {
|
|
slog.Debug("sync status: failed to load connected DB users", "error", err)
|
|
activeUsers = map[string]struct{}{}
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers))
|
|
for i := range rows {
|
|
r := rows[i]
|
|
username := strings.TrimSpace(r.Username)
|
|
if username == "" {
|
|
continue
|
|
}
|
|
|
|
isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold
|
|
if _, connected := activeUsers[username]; connected {
|
|
isOnline = true
|
|
delete(activeUsers, username)
|
|
}
|
|
|
|
appVersion := strings.TrimSpace(r.AppVersion)
|
|
|
|
result = append(result, UserSyncStatus{
|
|
Username: username,
|
|
LastSyncAt: r.LastSyncAt,
|
|
AppVersion: appVersion,
|
|
IsOnline: isOnline,
|
|
})
|
|
}
|
|
|
|
for username := range activeUsers {
|
|
result = append(result, UserSyncStatus{
|
|
Username: username,
|
|
LastSyncAt: now,
|
|
AppVersion: "",
|
|
IsOnline: true,
|
|
})
|
|
}
|
|
|
|
sort.SliceStable(result, func(i, j int) bool {
|
|
if result[i].IsOnline != result[j].IsOnline {
|
|
return result[i].IsOnline
|
|
}
|
|
if result[i].LastSyncAt.Equal(result[j].LastSyncAt) {
|
|
return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username)
|
|
}
|
|
return result[i].LastSyncAt.After(result[j].LastSyncAt)
|
|
})
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) {
|
|
type processUserRow struct {
|
|
Username string `gorm:"column:username"`
|
|
}
|
|
|
|
var rows []processUserRow
|
|
if err := mariaDB.Raw(`
|
|
SELECT DISTINCT TRIM(USER) AS username
|
|
FROM information_schema.PROCESSLIST
|
|
WHERE COALESCE(TRIM(USER), '') <> ''
|
|
AND DB = DATABASE()
|
|
`).Scan(&rows).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
users := make(map[string]struct{}, len(rows))
|
|
for i := range rows {
|
|
username := strings.TrimSpace(rows[i].Username)
|
|
if username == "" {
|
|
continue
|
|
}
|
|
users[username] = struct{}{}
|
|
}
|
|
return users, nil
|
|
}
|
|
|
|
func ensureUserSyncStatusTable(db *gorm.DB) error {
|
|
// Check if table exists instead of trying to create (avoids permission issues)
|
|
if !tableExists(db, "qt_pricelist_sync_status") {
|
|
if err := db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status (
|
|
username VARCHAR(100) NOT NULL,
|
|
last_sync_at DATETIME NOT NULL,
|
|
updated_at DATETIME NOT NULL,
|
|
app_version VARCHAR(64) NULL,
|
|
PRIMARY KEY (username),
|
|
INDEX idx_qt_pricelist_sync_status_last_sync (last_sync_at)
|
|
)
|
|
`).Error; err != nil {
|
|
return fmt.Errorf("create qt_pricelist_sync_status table: %w", err)
|
|
}
|
|
}
|
|
|
|
// Backward compatibility for environments where table was created without app_version.
|
|
// Only try to add column if table exists.
|
|
if tableExists(db, "qt_pricelist_sync_status") {
|
|
if err := db.Exec(`
|
|
ALTER TABLE qt_pricelist_sync_status
|
|
ADD COLUMN IF NOT EXISTS app_version VARCHAR(64) NULL
|
|
`).Error; err != nil {
|
|
// Log but don't fail if alter fails (column might already exist)
|
|
slog.Debug("failed to add app_version column", "error", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SyncPricelistItems synchronizes items for a specific pricelist
|
|
func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
|
// Get local pricelist
|
|
localPL, err := s.localDB.GetLocalPricelistByID(localPricelistID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting local pricelist: %w", err)
|
|
}
|
|
|
|
// Check if items already exist
|
|
existingCount := s.localDB.CountLocalPricelistItems(localPricelistID)
|
|
if existingCount > 0 {
|
|
slog.Debug("pricelist items already synced", "pricelist_id", localPricelistID, "count", existingCount)
|
|
return int(existingCount), nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
// Get items from server
|
|
serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "")
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting server pricelist items: %w", err)
|
|
}
|
|
|
|
// Convert and save locally
|
|
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
|
for i, item := range serverItems {
|
|
localItems[i] = *localdb.PricelistItemToLocal(&item, localPricelistID)
|
|
}
|
|
|
|
if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil {
|
|
return 0, fmt.Errorf("saving local pricelist items: %w", err)
|
|
}
|
|
|
|
slog.Info("synced pricelist items", "pricelist_id", localPricelistID, "items", len(localItems))
|
|
return len(localItems), nil
|
|
}
|
|
|
|
// SyncPricelistItemsByServerID syncs items for a pricelist by its server ID
|
|
func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) {
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("local pricelist not found for server ID %d", serverPricelistID)
|
|
}
|
|
return s.SyncPricelistItems(localPL.ID)
|
|
}
|
|
|
|
// GetLocalPriceForLot returns the price for a lot from a local pricelist
|
|
func (s *Service) GetLocalPriceForLot(localPricelistID uint, lotName string) (float64, error) {
|
|
return s.localDB.GetLocalPriceForLot(localPricelistID, lotName)
|
|
}
|
|
|
|
// GetPricelistForOffline returns a pricelist suitable for offline use
|
|
// If items are not synced, it will sync them first
|
|
func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.LocalPricelist, error) {
|
|
// Ensure pricelist is synced
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
// Try to sync pricelists first
|
|
if _, err := s.SyncPricelists(); err != nil {
|
|
return nil, fmt.Errorf("syncing pricelists: %w", err)
|
|
}
|
|
|
|
// Try again
|
|
localPL, err = s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pricelist not found on server: %w", err)
|
|
}
|
|
}
|
|
|
|
// Ensure items are synced
|
|
if _, err := s.SyncPricelistItems(localPL.ID); err != nil {
|
|
return nil, fmt.Errorf("syncing pricelist items: %w", err)
|
|
}
|
|
|
|
return localPL, nil
|
|
}
|
|
|
|
// SyncPricelistsIfNeeded checks for new pricelists and syncs if needed
|
|
// This should be called before creating a new configuration when online
|
|
func (s *Service) SyncPricelistsIfNeeded() error {
|
|
needSync, err := s.NeedSync()
|
|
if err != nil {
|
|
slog.Warn("failed to check if sync needed", "error", err)
|
|
return nil // Don't fail on check error
|
|
}
|
|
|
|
if !needSync {
|
|
slog.Debug("pricelists are up to date, no sync needed")
|
|
return nil
|
|
}
|
|
|
|
slog.Info("new pricelists detected, syncing...")
|
|
_, err = s.SyncPricelists()
|
|
if err != nil {
|
|
return fmt.Errorf("syncing pricelists: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PushPendingChanges pushes all pending changes to the server
|
|
func (s *Service) PushPendingChanges() (int, error) {
|
|
if _, err := s.EnsureReadinessForSync(); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
removed, err := s.localDB.PurgeOrphanConfigurationPendingChanges()
|
|
if err != nil {
|
|
slog.Warn("failed to purge orphan configuration pending changes", "error", err)
|
|
} else if removed > 0 {
|
|
slog.Info("purged orphan configuration pending changes", "removed", removed)
|
|
}
|
|
|
|
changes, err := s.localDB.GetPendingChanges()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting pending changes: %w", err)
|
|
}
|
|
|
|
if len(changes) == 0 {
|
|
slog.Debug("no pending changes to push")
|
|
return 0, nil
|
|
}
|
|
|
|
slog.Info("pushing pending changes", "count", len(changes))
|
|
pushed := 0
|
|
var syncedIDs []int64
|
|
sortedChanges := prioritizeProjectChanges(changes)
|
|
|
|
for _, change := range sortedChanges {
|
|
err := s.pushSingleChange(&change)
|
|
if err != nil {
|
|
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
|
// Increment attempts
|
|
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
|
|
continue
|
|
}
|
|
|
|
syncedIDs = append(syncedIDs, change.ID)
|
|
pushed++
|
|
}
|
|
|
|
// Mark synced changes as complete by deleting them
|
|
if len(syncedIDs) > 0 {
|
|
if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil {
|
|
slog.Error("failed to mark changes as synced", "error", err)
|
|
}
|
|
}
|
|
|
|
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
|
|
return pushed, nil
|
|
}
|
|
|
|
// pushSingleChange pushes a single pending change to the server
|
|
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
|
switch change.EntityType {
|
|
case "project":
|
|
return s.pushProjectChange(change)
|
|
case "configuration":
|
|
return s.pushConfigurationChange(change)
|
|
default:
|
|
return fmt.Errorf("unknown entity type: %s", change.EntityType)
|
|
}
|
|
}
|
|
|
|
func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange {
|
|
if len(changes) < 2 {
|
|
return changes
|
|
}
|
|
|
|
projectChanges := make([]localdb.PendingChange, 0, len(changes))
|
|
otherChanges := make([]localdb.PendingChange, 0, len(changes))
|
|
for _, change := range changes {
|
|
if change.EntityType == "project" {
|
|
projectChanges = append(projectChanges, change)
|
|
continue
|
|
}
|
|
otherChanges = append(otherChanges, change)
|
|
}
|
|
|
|
sorted := make([]localdb.PendingChange, 0, len(changes))
|
|
sorted = append(sorted, projectChanges...)
|
|
sorted = append(sorted, otherChanges...)
|
|
return sorted
|
|
}
|
|
|
|
func (s *Service) pushProjectChange(change *localdb.PendingChange) error {
|
|
payload, err := decodeProjectChangePayload(change)
|
|
if err != nil {
|
|
return fmt.Errorf("decode project payload: %w", err)
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
projectRepo := repository.NewProjectRepository(mariaDB)
|
|
project := payload.Snapshot
|
|
project.UUID = payload.ProjectUUID
|
|
if strings.TrimSpace(project.Code) == "" {
|
|
project.Code = strings.TrimSpace(derefString(project.Name))
|
|
if project.Code == "" {
|
|
project.Code = project.UUID
|
|
}
|
|
}
|
|
|
|
// Try upsert by UUID first
|
|
err = projectRepo.UpsertByUUID(&project)
|
|
if err != nil {
|
|
// Check if it's a duplicate (code, variant) constraint violation
|
|
// In this case, find existing project with same (code, variant) and link to it
|
|
var existing models.Project
|
|
lookupErr := mariaDB.Where("code = ? AND variant = ?", project.Code, project.Variant).First(&existing).Error
|
|
if lookupErr == nil {
|
|
// Found duplicate - link local project to existing server project
|
|
slog.Info("project duplicate found, linking to existing",
|
|
"local_uuid", project.UUID,
|
|
"server_uuid", existing.UUID,
|
|
"server_id", existing.ID,
|
|
"code", project.Code,
|
|
"variant", project.Variant)
|
|
project.ID = existing.ID
|
|
} else {
|
|
// Not a duplicate issue, return original error
|
|
return fmt.Errorf("upsert project on server: %w", err)
|
|
}
|
|
}
|
|
|
|
// Update local project with server ID
|
|
localProject, localErr := s.localDB.GetProjectByUUID(project.UUID)
|
|
if localErr == nil {
|
|
if project.ID > 0 {
|
|
serverID := project.ID
|
|
localProject.ServerID = &serverID
|
|
}
|
|
localProject.SyncStatus = "synced"
|
|
now := time.Now()
|
|
localProject.SyncedAt = &now
|
|
_ = s.localDB.SaveProject(localProject)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func derefString(value *string) string {
|
|
if value == nil {
|
|
return ""
|
|
}
|
|
return *value
|
|
}
|
|
|
|
func ptrString(value string) *string {
|
|
return &value
|
|
}
|
|
|
|
func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) {
|
|
var payload ProjectChangePayload
|
|
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" {
|
|
if payload.Operation == "" {
|
|
payload.Operation = change.Operation
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
var project models.Project
|
|
if err := json.Unmarshal([]byte(change.Payload), &project); err != nil {
|
|
return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err)
|
|
}
|
|
|
|
return ProjectChangePayload{
|
|
ProjectUUID: project.UUID,
|
|
Operation: change.Operation,
|
|
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation),
|
|
Snapshot: project,
|
|
}, nil
|
|
}
|
|
|
|
// pushConfigurationChange pushes a configuration change to the server
|
|
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
|
switch change.Operation {
|
|
case "create":
|
|
return s.pushConfigurationCreate(change)
|
|
case "update":
|
|
return s.pushConfigurationUpdate(change)
|
|
case "rollback":
|
|
return s.pushConfigurationRollback(change)
|
|
case "deactivate":
|
|
return s.pushConfigurationDeactivate(change)
|
|
case "reactivate":
|
|
return s.pushConfigurationReactivate(change)
|
|
case "delete":
|
|
return s.pushConfigurationDelete(change)
|
|
default:
|
|
return fmt.Errorf("unknown operation: %s", change.Operation)
|
|
}
|
|
}
|
|
|
|
// pushConfigurationCreate creates a configuration on the server
|
|
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
|
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isStale {
|
|
slog.Debug("skipping stale create event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
|
|
return nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration owner: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration project: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration pricelist: %w", err)
|
|
}
|
|
|
|
// Create on server
|
|
if err := configRepo.Create(&cfg); err != nil {
|
|
// Idempotency fallback: configuration may already be created remotely.
|
|
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
|
|
if getErr != nil {
|
|
return fmt.Errorf("creating configuration on server: %w", err)
|
|
}
|
|
cfg.ID = serverCfg.ID
|
|
if updateErr := configRepo.Update(&cfg); updateErr != nil {
|
|
return fmt.Errorf("create fallback update on server: %w", updateErr)
|
|
}
|
|
}
|
|
|
|
// Update local configuration with server ID
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err == nil {
|
|
serverID := cfg.ID
|
|
localCfg.ServerID = &serverID
|
|
localCfg.SyncStatus = "synced"
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
}
|
|
|
|
slog.Info("configuration created on server",
|
|
"uuid", cfg.UUID,
|
|
"server_id", cfg.ID,
|
|
"version_no", payload.CurrentVersionNo,
|
|
"version_id", payload.CurrentVersionID,
|
|
"idempotency_key", payload.IdempotencyKey,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// pushConfigurationUpdate updates a configuration on the server
|
|
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
|
payload, cfg, isStale, err := s.resolveConfigurationPayloadForPush(change)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isStale {
|
|
slog.Debug("skipping stale update event, newer version exists", "uuid", payload.ConfigurationUUID, "idempotency_key", payload.IdempotencyKey)
|
|
return nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration owner: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration project: %w", err)
|
|
}
|
|
if err := s.ensureConfigurationPricelist(mariaDB, &cfg); err != nil {
|
|
return fmt.Errorf("resolve configuration pricelist: %w", err)
|
|
}
|
|
|
|
// Ensure we have a server ID before updating
|
|
// If the payload doesn't have ID, get it from local configuration
|
|
if cfg.ID == 0 {
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err != nil {
|
|
return fmt.Errorf("getting local configuration: %w", err)
|
|
}
|
|
|
|
if localCfg.ServerID == nil {
|
|
// Configuration hasn't been synced yet, try to find it on server by UUID.
|
|
// If not found (e.g. stale create was skipped), create it from current snapshot.
|
|
serverCfg, getErr := configRepo.GetByUUID(cfg.UUID)
|
|
if getErr != nil {
|
|
if !errors.Is(getErr, gorm.ErrRecordNotFound) {
|
|
return fmt.Errorf("loading configuration from server: %w", getErr)
|
|
}
|
|
if createErr := configRepo.Create(&cfg); createErr != nil {
|
|
// Idempotency fallback: configuration may have been created concurrently.
|
|
existing, existingErr := configRepo.GetByUUID(cfg.UUID)
|
|
if existingErr != nil {
|
|
return fmt.Errorf("creating missing configuration on server: %w", createErr)
|
|
}
|
|
cfg.ID = existing.ID
|
|
}
|
|
if cfg.ID == 0 {
|
|
existing, existingErr := configRepo.GetByUUID(cfg.UUID)
|
|
if existingErr != nil {
|
|
return fmt.Errorf("loading created configuration from server: %w", existingErr)
|
|
}
|
|
cfg.ID = existing.ID
|
|
}
|
|
} else {
|
|
cfg.ID = serverCfg.ID
|
|
}
|
|
|
|
// Update local with server ID
|
|
serverID := cfg.ID
|
|
localCfg.ServerID = &serverID
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
} else {
|
|
cfg.ID = *localCfg.ServerID
|
|
}
|
|
}
|
|
|
|
// Update on server
|
|
if err := configRepo.Update(&cfg); err != nil {
|
|
return fmt.Errorf("updating configuration on server: %w", err)
|
|
}
|
|
|
|
// Update local sync status
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err == nil {
|
|
localCfg.SyncStatus = "synced"
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
}
|
|
|
|
slog.Info("configuration updated on server",
|
|
"uuid", cfg.UUID,
|
|
"version_no", payload.CurrentVersionNo,
|
|
"version_id", payload.CurrentVersionID,
|
|
"idempotency_key", payload.IdempotencyKey,
|
|
"operation", payload.Operation,
|
|
"conflict_policy", payload.ConflictPolicy,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("configuration is nil")
|
|
}
|
|
|
|
ownerUsername := cfg.OwnerUsername
|
|
if ownerUsername == "" {
|
|
ownerUsername = s.localDB.GetDBUser()
|
|
cfg.OwnerUsername = ownerUsername
|
|
}
|
|
if ownerUsername == "" {
|
|
return fmt.Errorf("owner username is empty")
|
|
}
|
|
|
|
// user_id is legacy and no longer used for ownership in local-first mode.
|
|
// Keep it NULL on writes; ownership is represented by owner_username.
|
|
cfg.UserID = nil
|
|
cfg.AppVersion = appmeta.Version()
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("configuration is nil")
|
|
}
|
|
|
|
projectRepo := repository.NewProjectRepository(mariaDB)
|
|
|
|
if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" {
|
|
_, err := projectRepo.GetByUUID(*cfg.ProjectUUID)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
|
|
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
|
|
if localErr != nil {
|
|
return err
|
|
}
|
|
modelProject := localdb.LocalToProject(localProject)
|
|
if modelProject.OwnerUsername == "" {
|
|
modelProject.OwnerUsername = cfg.OwnerUsername
|
|
}
|
|
if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil {
|
|
return createErr
|
|
}
|
|
if modelProject.ID > 0 {
|
|
serverID := modelProject.ID
|
|
localProject.ServerID = &serverID
|
|
localProject.SyncStatus = "synced"
|
|
now := time.Now()
|
|
localProject.SyncedAt = &now
|
|
_ = s.localDB.SaveProject(localProject)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
systemProject := &models.Project{}
|
|
err := mariaDB.
|
|
Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true).
|
|
Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC").
|
|
First(systemProject).Error
|
|
if err != nil {
|
|
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return err
|
|
}
|
|
systemProject = &models.Project{
|
|
UUID: uuid.NewString(),
|
|
OwnerUsername: "",
|
|
Code: "Без проекта",
|
|
Name: ptrString("Без проекта"),
|
|
IsActive: true,
|
|
IsSystem: true,
|
|
}
|
|
if createErr := projectRepo.Create(systemProject); createErr != nil {
|
|
return createErr
|
|
}
|
|
}
|
|
|
|
cfg.ProjectUUID = &systemProject.UUID
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ensureConfigurationPricelist(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
|
if cfg == nil {
|
|
return fmt.Errorf("configuration is nil")
|
|
}
|
|
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
if cfg.PricelistID != nil && *cfg.PricelistID > 0 {
|
|
if _, err := pricelistRepo.GetByID(*cfg.PricelistID); err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
latest, err := pricelistRepo.GetLatestActive()
|
|
if err != nil {
|
|
cfg.PricelistID = nil
|
|
return nil
|
|
}
|
|
|
|
cfg.PricelistID = &latest.ID
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
|
|
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
|
|
return s.pushConfigurationUpdate(change)
|
|
}
|
|
|
|
func (s *Service) pushConfigurationDeactivate(change *localdb.PendingChange) error {
|
|
// Local deactivate is represented as the latest snapshot push.
|
|
return s.pushConfigurationUpdate(change)
|
|
}
|
|
|
|
func (s *Service) pushConfigurationReactivate(change *localdb.PendingChange) error {
|
|
// Local reactivate is represented as the latest snapshot push.
|
|
return s.pushConfigurationUpdate(change)
|
|
}
|
|
|
|
func (s *Service) resolveConfigurationPayloadForPush(change *localdb.PendingChange) (ConfigurationChangePayload, models.Configuration, bool, error) {
|
|
payload, err := decodeConfigurationChangePayload(change)
|
|
if err != nil {
|
|
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("decode configuration payload: %w", err)
|
|
}
|
|
eventVersionNo := payload.CurrentVersionNo
|
|
|
|
currentCfg, currentVersionID, currentVersionNo, err := s.loadCurrentConfigurationState(payload.ConfigurationUUID)
|
|
if err != nil {
|
|
// Local config may be gone (e.g. stale queue item after delete/cleanup). Treat as no-op.
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return payload, payload.Snapshot, true, nil
|
|
}
|
|
// create->deactivate race: config may no longer be active/visible locally, skip stale create.
|
|
if change.Operation == "create" {
|
|
return payload, payload.Snapshot, true, nil
|
|
}
|
|
return ConfigurationChangePayload{}, models.Configuration{}, false, fmt.Errorf("load current local configuration state: %w", err)
|
|
}
|
|
|
|
if payload.ConflictPolicy == "" {
|
|
payload.ConflictPolicy = "last_write_wins"
|
|
}
|
|
|
|
if currentCfg.UUID != "" {
|
|
payload.Snapshot = currentCfg
|
|
if currentVersionID != "" {
|
|
payload.CurrentVersionID = currentVersionID
|
|
}
|
|
if currentVersionNo > 0 {
|
|
payload.CurrentVersionNo = currentVersionNo
|
|
}
|
|
payload.PricelistID = currentCfg.PricelistID
|
|
}
|
|
|
|
isStale := false
|
|
if eventVersionNo > 0 && currentVersionNo > eventVersionNo {
|
|
// Keep only latest intent in queue; older versions become no-op.
|
|
isStale = true
|
|
}
|
|
if !isStale && change.Operation == "create" {
|
|
localCfg, getErr := s.localDB.GetConfigurationByUUID(payload.ConfigurationUUID)
|
|
if getErr == nil && !localCfg.IsActive {
|
|
isStale = true
|
|
}
|
|
}
|
|
|
|
return payload, payload.Snapshot, isStale, nil
|
|
}
|
|
|
|
func decodeConfigurationChangePayload(change *localdb.PendingChange) (ConfigurationChangePayload, error) {
|
|
var payload ConfigurationChangePayload
|
|
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ConfigurationUUID != "" && payload.Snapshot.UUID != "" {
|
|
if payload.Operation == "" {
|
|
payload.Operation = change.Operation
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
// Backward compatibility: legacy queue stored raw models.Configuration JSON.
|
|
var cfg models.Configuration
|
|
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
|
return ConfigurationChangePayload{}, fmt.Errorf("unmarshal legacy configuration payload: %w", err)
|
|
}
|
|
|
|
return ConfigurationChangePayload{
|
|
EventID: "",
|
|
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
|
|
ConfigurationUUID: cfg.UUID,
|
|
ProjectUUID: cfg.ProjectUUID,
|
|
PricelistID: cfg.PricelistID,
|
|
Operation: change.Operation,
|
|
ConflictPolicy: "last_write_wins",
|
|
Snapshot: cfg,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) loadCurrentConfigurationState(configurationUUID string) (models.Configuration, string, int, error) {
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(configurationUUID)
|
|
if err != nil {
|
|
return models.Configuration{}, "", 0, fmt.Errorf("get local configuration by uuid: %w", err)
|
|
}
|
|
|
|
cfg := *localdb.LocalToConfiguration(localCfg)
|
|
|
|
currentVersionID := ""
|
|
if localCfg.CurrentVersionID != nil {
|
|
currentVersionID = *localCfg.CurrentVersionID
|
|
}
|
|
|
|
currentVersionNo := 0
|
|
if currentVersionID != "" {
|
|
var version localdb.LocalConfigurationVersion
|
|
err = s.localDB.DB().
|
|
Where("id = ? AND configuration_uuid = ?", currentVersionID, configurationUUID).
|
|
First(&version).Error
|
|
if err == nil {
|
|
currentVersionNo = version.VersionNo
|
|
}
|
|
}
|
|
|
|
if currentVersionNo == 0 {
|
|
var latest localdb.LocalConfigurationVersion
|
|
err = s.localDB.DB().
|
|
Where("configuration_uuid = ?", configurationUUID).
|
|
Order("version_no DESC").
|
|
First(&latest).Error
|
|
if err == nil {
|
|
currentVersionNo = latest.VersionNo
|
|
currentVersionID = latest.ID
|
|
}
|
|
}
|
|
|
|
if currentVersionNo == 0 {
|
|
if err := s.repairMissingConfigurationVersion(localCfg); err != nil {
|
|
return models.Configuration{}, "", 0, fmt.Errorf("repair missing configuration version: %w", err)
|
|
}
|
|
var latest localdb.LocalConfigurationVersion
|
|
err = s.localDB.DB().
|
|
Where("configuration_uuid = ?", configurationUUID).
|
|
Order("version_no DESC").
|
|
First(&latest).Error
|
|
if err == nil {
|
|
currentVersionNo = latest.VersionNo
|
|
currentVersionID = latest.ID
|
|
}
|
|
}
|
|
|
|
if currentVersionNo == 0 {
|
|
return models.Configuration{}, "", 0, fmt.Errorf("no local configuration version found for %s", configurationUUID)
|
|
}
|
|
|
|
return cfg, currentVersionID, currentVersionNo, nil
|
|
}
|
|
|
|
func (s *Service) repairMissingConfigurationVersion(localCfg *localdb.LocalConfiguration) error {
|
|
if localCfg == nil {
|
|
return fmt.Errorf("local configuration is nil")
|
|
}
|
|
|
|
return s.localDB.DB().Transaction(func(tx *gorm.DB) error {
|
|
var cfg localdb.LocalConfiguration
|
|
if err := tx.Where("uuid = ?", localCfg.UUID).First(&cfg).Error; err != nil {
|
|
return fmt.Errorf("load local configuration: %w", err)
|
|
}
|
|
|
|
// If versions exist, just make sure current_version_id is set.
|
|
var latest localdb.LocalConfigurationVersion
|
|
if err := tx.Where("configuration_uuid = ?", cfg.UUID).
|
|
Order("version_no DESC").
|
|
First(&latest).Error; err == nil {
|
|
if cfg.CurrentVersionID == nil || *cfg.CurrentVersionID == "" {
|
|
if err := tx.Model(&localdb.LocalConfiguration{}).
|
|
Where("uuid = ?", cfg.UUID).
|
|
Update("current_version_id", latest.ID).Error; err != nil {
|
|
return fmt.Errorf("set current version id: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
|
|
return fmt.Errorf("load latest version: %w", err)
|
|
}
|
|
|
|
snapshot, err := localdb.BuildConfigurationSnapshot(&cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("build configuration snapshot: %w", err)
|
|
}
|
|
|
|
note := "Auto-repaired missing local version"
|
|
version := localdb.LocalConfigurationVersion{
|
|
ID: uuid.NewString(),
|
|
ConfigurationUUID: cfg.UUID,
|
|
VersionNo: 1,
|
|
Data: snapshot,
|
|
ChangeNote: ¬e,
|
|
AppVersion: appmeta.Version(),
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
if err := tx.Create(&version).Error; err != nil {
|
|
return fmt.Errorf("create initial version: %w", err)
|
|
}
|
|
if err := tx.Model(&localdb.LocalConfiguration{}).
|
|
Where("uuid = ?", cfg.UUID).
|
|
Update("current_version_id", version.ID).Error; err != nil {
|
|
return fmt.Errorf("set current version id: %w", err)
|
|
}
|
|
|
|
slog.Warn("repaired missing local configuration version", "uuid", cfg.UUID, "version_no", version.VersionNo)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// NOTE: prepared for future conflict resolution:
|
|
// when server starts storing version metadata, we can compare payload.CurrentVersionNo
|
|
// against remote version and branch into custom strategies. For now use last-write-wins.
|
|
|
|
// pushConfigurationDelete deletes a configuration from the server
|
|
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
|
// Get database connection
|
|
mariaDB, err := s.getDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
|
|
// Get the configuration from server by UUID to get the ID
|
|
cfg, err := configRepo.GetByUUID(change.EntityUUID)
|
|
if err != nil {
|
|
// Already deleted or not found, consider it successful
|
|
slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID)
|
|
return nil
|
|
}
|
|
|
|
// Delete from server
|
|
if err := configRepo.Delete(cfg.ID); err != nil {
|
|
return fmt.Errorf("deleting configuration from server: %w", err)
|
|
}
|
|
|
|
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) getDB() (*gorm.DB, error) {
|
|
if s.directDB != nil {
|
|
return s.directDB, nil
|
|
}
|
|
if s.connMgr == nil {
|
|
return nil, ErrOffline
|
|
}
|
|
return s.connMgr.GetDB()
|
|
}
|
|
|
|
func (s *Service) getConnectionStatus() db.ConnectionStatus {
|
|
if s.directDB != nil {
|
|
return db.ConnectionStatus{IsConnected: true}
|
|
}
|
|
if s.connMgr == nil {
|
|
return db.ConnectionStatus{IsConnected: false}
|
|
}
|
|
return s.connMgr.GetStatus()
|
|
}
|