491 lines
14 KiB
Go
491 lines
14 KiB
Go
package sync
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"git.mchus.pro/mchus/quoteforge/internal/db"
|
|
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
|
"git.mchus.pro/mchus/quoteforge/internal/models"
|
|
"git.mchus.pro/mchus/quoteforge/internal/repository"
|
|
)
|
|
|
|
// Service handles synchronization between MariaDB and local SQLite
|
|
type Service struct {
|
|
connMgr *db.ConnectionManager
|
|
localDB *localdb.LocalDB
|
|
}
|
|
|
|
// NewService creates a new sync service
|
|
func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service {
|
|
return &Service{
|
|
connMgr: connMgr,
|
|
localDB: localDB,
|
|
}
|
|
}
|
|
|
|
// SyncStatus represents the current sync status
|
|
type SyncStatus struct {
|
|
LastSyncAt *time.Time `json:"last_sync_at"`
|
|
ServerPricelists int `json:"server_pricelists"`
|
|
LocalPricelists int `json:"local_pricelists"`
|
|
NeedsSync bool `json:"needs_sync"`
|
|
}
|
|
|
|
// GetStatus returns the current sync status
|
|
func (s *Service) GetStatus() (*SyncStatus, error) {
|
|
lastSync := s.localDB.GetLastSyncTime()
|
|
|
|
// Count server pricelists (only if already connected, don't reconnect)
|
|
serverCount := 0
|
|
connStatus := s.connMgr.GetStatus()
|
|
if connStatus.IsConnected {
|
|
if mariaDB, err := s.connMgr.GetDB(); err == nil && mariaDB != nil {
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
activeCount, err := pricelistRepo.CountActive()
|
|
if err == nil {
|
|
serverCount = int(activeCount)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Count local pricelists
|
|
localCount := s.localDB.CountLocalPricelists()
|
|
|
|
needsSync, _ := s.NeedSync()
|
|
|
|
return &SyncStatus{
|
|
LastSyncAt: lastSync,
|
|
ServerPricelists: serverCount,
|
|
LocalPricelists: int(localCount),
|
|
NeedsSync: needsSync,
|
|
}, nil
|
|
}
|
|
|
|
// NeedSync checks if synchronization is needed
|
|
// Returns true if there are new pricelists on server or last sync was >1 hour ago
|
|
func (s *Service) NeedSync() (bool, error) {
|
|
lastSync := s.localDB.GetLastSyncTime()
|
|
|
|
// If never synced, need sync
|
|
if lastSync == nil {
|
|
return true, nil
|
|
}
|
|
|
|
// If last sync was more than 1 hour ago, suggest sync
|
|
if time.Since(*lastSync) > time.Hour {
|
|
return true, nil
|
|
}
|
|
|
|
// Check if there are new pricelists on server (only if already connected)
|
|
connStatus := s.connMgr.GetStatus()
|
|
if !connStatus.IsConnected {
|
|
// If offline, can't check server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
mariaDB, err := s.connMgr.GetDB()
|
|
if err != nil {
|
|
// If offline, can't check server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
latestServer, err := pricelistRepo.GetLatestActive()
|
|
if err != nil {
|
|
// If no pricelists on server, no need to sync
|
|
return false, nil
|
|
}
|
|
|
|
latestLocal, err := s.localDB.GetLatestLocalPricelist()
|
|
if err != nil {
|
|
// No local pricelists, need to sync
|
|
return true, nil
|
|
}
|
|
|
|
// If server has newer pricelist, need sync
|
|
if latestServer.ID != latestLocal.ServerID {
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// SyncPricelists synchronizes all active pricelists from server to local SQLite
|
|
func (s *Service) SyncPricelists() (int, error) {
|
|
slog.Info("starting pricelist sync")
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.connMgr.GetDB()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
// Get active pricelists from server (up to 100)
|
|
serverPricelists, _, err := pricelistRepo.ListActive(0, 100)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting active server pricelists: %w", err)
|
|
}
|
|
|
|
synced := 0
|
|
var latestLocalID uint
|
|
var latestServerID uint
|
|
for _, pl := range serverPricelists {
|
|
// Check if pricelist already exists locally
|
|
existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID)
|
|
if existing != nil {
|
|
// Already synced, track latest by server ID
|
|
if pl.ID > latestServerID {
|
|
latestServerID = pl.ID
|
|
latestLocalID = existing.ID
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Create local pricelist
|
|
localPL := &localdb.LocalPricelist{
|
|
ServerID: pl.ID,
|
|
Version: pl.Version,
|
|
Name: pl.Notification, // Using notification as name
|
|
CreatedAt: pl.CreatedAt,
|
|
SyncedAt: time.Now(),
|
|
IsUsed: false,
|
|
}
|
|
|
|
if err := s.localDB.SaveLocalPricelist(localPL); err != nil {
|
|
slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Sync items for the newly created pricelist
|
|
itemCount, err := s.SyncPricelistItems(localPL.ID)
|
|
if err != nil {
|
|
slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err)
|
|
// Continue even if items sync fails - we have the pricelist metadata
|
|
} else {
|
|
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
|
|
}
|
|
|
|
if pl.ID > latestServerID {
|
|
latestServerID = pl.ID
|
|
latestLocalID = localPL.ID
|
|
}
|
|
synced++
|
|
}
|
|
|
|
// Update component prices from latest pricelist
|
|
if latestLocalID > 0 {
|
|
updated, err := s.localDB.UpdateComponentPricesFromPricelist(latestLocalID)
|
|
if err != nil {
|
|
slog.Warn("failed to update component prices from pricelist", "error", err)
|
|
} else {
|
|
slog.Info("updated component prices from latest pricelist", "updated", updated)
|
|
}
|
|
}
|
|
|
|
// Update last sync time
|
|
s.localDB.SetLastSyncTime(time.Now())
|
|
|
|
slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists))
|
|
return synced, nil
|
|
}
|
|
|
|
// SyncPricelistItems synchronizes items for a specific pricelist
|
|
func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
|
// Get local pricelist
|
|
localPL, err := s.localDB.GetLocalPricelistByID(localPricelistID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting local pricelist: %w", err)
|
|
}
|
|
|
|
// Check if items already exist
|
|
existingCount := s.localDB.CountLocalPricelistItems(localPricelistID)
|
|
if existingCount > 0 {
|
|
slog.Debug("pricelist items already synced", "pricelist_id", localPricelistID, "count", existingCount)
|
|
return int(existingCount), nil
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.connMgr.GetDB()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
|
|
|
// Get items from server
|
|
serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "")
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting server pricelist items: %w", err)
|
|
}
|
|
|
|
// Convert and save locally
|
|
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
|
for i, item := range serverItems {
|
|
localItems[i] = localdb.LocalPricelistItem{
|
|
PricelistID: localPricelistID,
|
|
LotName: item.LotName,
|
|
Price: item.Price,
|
|
}
|
|
}
|
|
|
|
if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil {
|
|
return 0, fmt.Errorf("saving local pricelist items: %w", err)
|
|
}
|
|
|
|
slog.Info("synced pricelist items", "pricelist_id", localPricelistID, "items", len(localItems))
|
|
return len(localItems), nil
|
|
}
|
|
|
|
// SyncPricelistItemsByServerID syncs items for a pricelist by its server ID
|
|
func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) {
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("local pricelist not found for server ID %d", serverPricelistID)
|
|
}
|
|
return s.SyncPricelistItems(localPL.ID)
|
|
}
|
|
|
|
// GetLocalPriceForLot returns the price for a lot from a local pricelist
|
|
func (s *Service) GetLocalPriceForLot(localPricelistID uint, lotName string) (float64, error) {
|
|
return s.localDB.GetLocalPriceForLot(localPricelistID, lotName)
|
|
}
|
|
|
|
// GetPricelistForOffline returns a pricelist suitable for offline use
|
|
// If items are not synced, it will sync them first
|
|
func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.LocalPricelist, error) {
|
|
// Ensure pricelist is synced
|
|
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
// Try to sync pricelists first
|
|
if _, err := s.SyncPricelists(); err != nil {
|
|
return nil, fmt.Errorf("syncing pricelists: %w", err)
|
|
}
|
|
|
|
// Try again
|
|
localPL, err = s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pricelist not found on server: %w", err)
|
|
}
|
|
}
|
|
|
|
// Ensure items are synced
|
|
if _, err := s.SyncPricelistItems(localPL.ID); err != nil {
|
|
return nil, fmt.Errorf("syncing pricelist items: %w", err)
|
|
}
|
|
|
|
return localPL, nil
|
|
}
|
|
|
|
// SyncPricelistsIfNeeded checks for new pricelists and syncs if needed
|
|
// This should be called before creating a new configuration when online
|
|
func (s *Service) SyncPricelistsIfNeeded() error {
|
|
needSync, err := s.NeedSync()
|
|
if err != nil {
|
|
slog.Warn("failed to check if sync needed", "error", err)
|
|
return nil // Don't fail on check error
|
|
}
|
|
|
|
if !needSync {
|
|
slog.Debug("pricelists are up to date, no sync needed")
|
|
return nil
|
|
}
|
|
|
|
slog.Info("new pricelists detected, syncing...")
|
|
_, err = s.SyncPricelists()
|
|
if err != nil {
|
|
return fmt.Errorf("syncing pricelists: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PushPendingChanges pushes all pending changes to the server
|
|
func (s *Service) PushPendingChanges() (int, error) {
|
|
changes, err := s.localDB.GetPendingChanges()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("getting pending changes: %w", err)
|
|
}
|
|
|
|
if len(changes) == 0 {
|
|
slog.Debug("no pending changes to push")
|
|
return 0, nil
|
|
}
|
|
|
|
slog.Info("pushing pending changes", "count", len(changes))
|
|
pushed := 0
|
|
var syncedIDs []int64
|
|
|
|
for _, change := range changes {
|
|
err := s.pushSingleChange(&change)
|
|
if err != nil {
|
|
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
|
// Increment attempts
|
|
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
|
|
continue
|
|
}
|
|
|
|
syncedIDs = append(syncedIDs, change.ID)
|
|
pushed++
|
|
}
|
|
|
|
// Mark synced changes as complete by deleting them
|
|
if len(syncedIDs) > 0 {
|
|
if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil {
|
|
slog.Error("failed to mark changes as synced", "error", err)
|
|
}
|
|
}
|
|
|
|
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
|
|
return pushed, nil
|
|
}
|
|
|
|
// pushSingleChange pushes a single pending change to the server
|
|
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
|
switch change.EntityType {
|
|
case "configuration":
|
|
return s.pushConfigurationChange(change)
|
|
default:
|
|
return fmt.Errorf("unknown entity type: %s", change.EntityType)
|
|
}
|
|
}
|
|
|
|
// pushConfigurationChange pushes a configuration change to the server
|
|
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
|
switch change.Operation {
|
|
case "create":
|
|
return s.pushConfigurationCreate(change)
|
|
case "update":
|
|
return s.pushConfigurationUpdate(change)
|
|
case "delete":
|
|
return s.pushConfigurationDelete(change)
|
|
default:
|
|
return fmt.Errorf("unknown operation: %s", change.Operation)
|
|
}
|
|
}
|
|
|
|
// pushConfigurationCreate creates a configuration on the server
|
|
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
|
var cfg models.Configuration
|
|
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
|
return fmt.Errorf("unmarshaling configuration: %w", err)
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.connMgr.GetDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
|
|
// Create on server
|
|
if err := configRepo.Create(&cfg); err != nil {
|
|
return fmt.Errorf("creating configuration on server: %w", err)
|
|
}
|
|
|
|
// Update local configuration with server ID
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err == nil {
|
|
serverID := cfg.ID
|
|
localCfg.ServerID = &serverID
|
|
localCfg.SyncStatus = "synced"
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
}
|
|
|
|
slog.Info("configuration created on server", "uuid", cfg.UUID, "server_id", cfg.ID)
|
|
return nil
|
|
}
|
|
|
|
// pushConfigurationUpdate updates a configuration on the server
|
|
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
|
var cfg models.Configuration
|
|
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
|
return fmt.Errorf("unmarshaling configuration: %w", err)
|
|
}
|
|
|
|
// Get database connection
|
|
mariaDB, err := s.connMgr.GetDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
|
|
// Ensure we have a server ID before updating
|
|
// If the payload doesn't have ID, get it from local configuration
|
|
if cfg.ID == 0 {
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err != nil {
|
|
return fmt.Errorf("getting local configuration: %w", err)
|
|
}
|
|
|
|
if localCfg.ServerID == nil {
|
|
// Configuration hasn't been synced yet, try to find it on server by UUID
|
|
serverCfg, err := configRepo.GetByUUID(cfg.UUID)
|
|
if err != nil {
|
|
return fmt.Errorf("configuration not yet synced to server: %w", err)
|
|
}
|
|
cfg.ID = serverCfg.ID
|
|
|
|
// Update local with server ID
|
|
serverID := serverCfg.ID
|
|
localCfg.ServerID = &serverID
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
} else {
|
|
cfg.ID = *localCfg.ServerID
|
|
}
|
|
}
|
|
|
|
// Update on server
|
|
if err := configRepo.Update(&cfg); err != nil {
|
|
return fmt.Errorf("updating configuration on server: %w", err)
|
|
}
|
|
|
|
// Update local sync status
|
|
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
|
if err == nil {
|
|
localCfg.SyncStatus = "synced"
|
|
s.localDB.SaveConfiguration(localCfg)
|
|
}
|
|
|
|
slog.Info("configuration updated on server", "uuid", cfg.UUID)
|
|
return nil
|
|
}
|
|
|
|
// pushConfigurationDelete deletes a configuration from the server
|
|
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
|
// Get database connection
|
|
mariaDB, err := s.connMgr.GetDB()
|
|
if err != nil {
|
|
return fmt.Errorf("database not available: %w", err)
|
|
}
|
|
|
|
// Create repository
|
|
configRepo := repository.NewConfigurationRepository(mariaDB)
|
|
|
|
// Get the configuration from server by UUID to get the ID
|
|
cfg, err := configRepo.GetByUUID(change.EntityUUID)
|
|
if err != nil {
|
|
// Already deleted or not found, consider it successful
|
|
slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID)
|
|
return nil
|
|
}
|
|
|
|
// Delete from server
|
|
if err := configRepo.Delete(cfg.ID); err != nil {
|
|
return fmt.Errorf("deleting configuration from server: %w", err)
|
|
}
|
|
|
|
slog.Info("configuration deleted from server", "uuid", change.EntityUUID)
|
|
return nil
|
|
}
|