Add background sync worker and complete local-first architecture
Implements automatic background synchronization every 5 minutes: - Worker pushes pending changes to server (PushPendingChanges) - Worker pulls new pricelists (SyncPricelistsIfNeeded) - Graceful shutdown with context cancellation - Automatic online/offline detection via DB ping New files: - internal/services/sync/worker.go - Background sync worker - internal/services/local_configuration.go - Local-first CRUD - internal/localdb/converters.go - MariaDB ↔ SQLite converters Extended sync infrastructure: - Pending changes queue (pending_changes table) - Push/pull sync endpoints (/api/sync/push, /pending) - ConfigurationGetter interface for handler compatibility - LocalConfigurationService replaces ConfigurationService All configuration operations now run through SQLite with automatic background sync to MariaDB when online. Phase 2.5 nearly complete. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,24 +1,28 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/models"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/repository"
|
||||
)
|
||||
|
||||
// Service handles synchronization between MariaDB and local SQLite
|
||||
type Service struct {
|
||||
pricelistRepo *repository.PricelistRepository
|
||||
configRepo *repository.ConfigurationRepository
|
||||
localDB *localdb.LocalDB
|
||||
}
|
||||
|
||||
// NewService creates a new sync service
|
||||
func NewService(pricelistRepo *repository.PricelistRepository, localDB *localdb.LocalDB) *Service {
|
||||
func NewService(pricelistRepo *repository.PricelistRepository, configRepo *repository.ConfigurationRepository, localDB *localdb.LocalDB) *Service {
|
||||
return &Service{
|
||||
pricelistRepo: pricelistRepo,
|
||||
configRepo: configRepo,
|
||||
localDB: localDB,
|
||||
}
|
||||
}
|
||||
@@ -213,3 +217,157 @@ func (s *Service) GetPricelistForOffline(serverPricelistID uint) (*localdb.Local
|
||||
|
||||
return localPL, nil
|
||||
}
|
||||
|
||||
// SyncPricelistsIfNeeded checks for new pricelists and syncs if needed
|
||||
// This should be called before creating a new configuration when online
|
||||
func (s *Service) SyncPricelistsIfNeeded() error {
|
||||
needSync, err := s.NeedSync()
|
||||
if err != nil {
|
||||
slog.Warn("failed to check if sync needed", "error", err)
|
||||
return nil // Don't fail on check error
|
||||
}
|
||||
|
||||
if !needSync {
|
||||
slog.Debug("pricelists are up to date, no sync needed")
|
||||
return nil
|
||||
}
|
||||
|
||||
slog.Info("new pricelists detected, syncing...")
|
||||
_, err = s.SyncPricelists()
|
||||
if err != nil {
|
||||
return fmt.Errorf("syncing pricelists: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushPendingChanges pushes all pending changes to the server
|
||||
func (s *Service) PushPendingChanges() (int, error) {
|
||||
changes, err := s.localDB.GetPendingChanges()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("getting pending changes: %w", err)
|
||||
}
|
||||
|
||||
if len(changes) == 0 {
|
||||
slog.Debug("no pending changes to push")
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
slog.Info("pushing pending changes", "count", len(changes))
|
||||
pushed := 0
|
||||
var syncedIDs []int64
|
||||
|
||||
for _, change := range changes {
|
||||
err := s.pushSingleChange(&change)
|
||||
if err != nil {
|
||||
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
||||
// Increment attempts
|
||||
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
syncedIDs = append(syncedIDs, change.ID)
|
||||
pushed++
|
||||
}
|
||||
|
||||
// Mark synced changes as complete by deleting them
|
||||
if len(syncedIDs) > 0 {
|
||||
if err := s.localDB.MarkChangesSynced(syncedIDs); err != nil {
|
||||
slog.Error("failed to mark changes as synced", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
|
||||
return pushed, nil
|
||||
}
|
||||
|
||||
// pushSingleChange pushes a single pending change to the server
|
||||
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
||||
switch change.EntityType {
|
||||
case "configuration":
|
||||
return s.pushConfigurationChange(change)
|
||||
default:
|
||||
return fmt.Errorf("unknown entity type: %s", change.EntityType)
|
||||
}
|
||||
}
|
||||
|
||||
// pushConfigurationChange pushes a configuration change to the server
|
||||
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
||||
switch change.Operation {
|
||||
case "create":
|
||||
return s.pushConfigurationCreate(change)
|
||||
case "update":
|
||||
return s.pushConfigurationUpdate(change)
|
||||
case "delete":
|
||||
return s.pushConfigurationDelete(change)
|
||||
default:
|
||||
return fmt.Errorf("unknown operation: %s", change.Operation)
|
||||
}
|
||||
}
|
||||
|
||||
// pushConfigurationCreate creates a configuration on the server
|
||||
func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
var cfg models.Configuration
|
||||
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
||||
return fmt.Errorf("unmarshaling configuration: %w", err)
|
||||
}
|
||||
|
||||
// Create on server
|
||||
if err := s.configRepo.Create(&cfg); err != nil {
|
||||
return fmt.Errorf("creating configuration on server: %w", err)
|
||||
}
|
||||
|
||||
// Update local configuration with server ID
|
||||
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
||||
if err == nil {
|
||||
serverID := cfg.ID
|
||||
localCfg.ServerID = &serverID
|
||||
localCfg.SyncStatus = "synced"
|
||||
s.localDB.SaveConfiguration(localCfg)
|
||||
}
|
||||
|
||||
slog.Info("configuration created on server", "uuid", cfg.UUID, "server_id", cfg.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// pushConfigurationUpdate updates a configuration on the server
|
||||
func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
var cfg models.Configuration
|
||||
if err := json.Unmarshal([]byte(change.Payload), &cfg); err != nil {
|
||||
return fmt.Errorf("unmarshaling configuration: %w", err)
|
||||
}
|
||||
|
||||
// Update on server
|
||||
if err := s.configRepo.Update(&cfg); err != nil {
|
||||
return fmt.Errorf("updating configuration on server: %w", err)
|
||||
}
|
||||
|
||||
// Update local sync status
|
||||
localCfg, err := s.localDB.GetConfigurationByUUID(cfg.UUID)
|
||||
if err == nil {
|
||||
localCfg.SyncStatus = "synced"
|
||||
s.localDB.SaveConfiguration(localCfg)
|
||||
}
|
||||
|
||||
slog.Info("configuration updated on server", "uuid", cfg.UUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// pushConfigurationDelete deletes a configuration from the server
|
||||
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
// Get the configuration from server by UUID to get the ID
|
||||
cfg, err := s.configRepo.GetByUUID(change.EntityUUID)
|
||||
if err != nil {
|
||||
// Already deleted or not found, consider it successful
|
||||
slog.Warn("configuration not found on server, considering delete successful", "uuid", change.EntityUUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete from server
|
||||
if err := s.configRepo.Delete(cfg.ID); err != nil {
|
||||
return fmt.Errorf("deleting configuration from server: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("configuration deleted from server", "uuid", change.EntityUUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
95
internal/services/sync/worker.go
Normal file
95
internal/services/sync/worker.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// Worker performs background synchronization at regular intervals
|
||||
type Worker struct {
|
||||
service *Service
|
||||
db *gorm.DB
|
||||
interval time.Duration
|
||||
logger *slog.Logger
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewWorker creates a new background sync worker
|
||||
func NewWorker(service *Service, db *gorm.DB, interval time.Duration) *Worker {
|
||||
return &Worker{
|
||||
service: service,
|
||||
db: db,
|
||||
interval: interval,
|
||||
logger: slog.Default(),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// isOnline checks if the database connection is available
|
||||
func (w *Worker) isOnline() bool {
|
||||
sqlDB, err := w.db.DB()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return sqlDB.Ping() == nil
|
||||
}
|
||||
|
||||
// Start begins the background sync loop in a goroutine
|
||||
func (w *Worker) Start(ctx context.Context) {
|
||||
w.logger.Info("starting background sync worker", "interval", w.interval)
|
||||
|
||||
ticker := time.NewTicker(w.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run once immediately
|
||||
w.runSync()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
w.logger.Info("background sync worker stopped by context")
|
||||
return
|
||||
case <-w.stopCh:
|
||||
w.logger.Info("background sync worker stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.runSync()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully stops the worker
|
||||
func (w *Worker) Stop() {
|
||||
w.logger.Info("stopping background sync worker")
|
||||
close(w.stopCh)
|
||||
}
|
||||
|
||||
// runSync performs a single sync iteration
|
||||
func (w *Worker) runSync() {
|
||||
// Check if online
|
||||
if !w.isOnline() {
|
||||
w.logger.Debug("offline, skipping background sync")
|
||||
return
|
||||
}
|
||||
|
||||
w.logger.Debug("running background sync")
|
||||
|
||||
// Push pending changes first
|
||||
pushed, err := w.service.PushPendingChanges()
|
||||
if err != nil {
|
||||
w.logger.Warn("failed to push pending changes", "error", err)
|
||||
} else if pushed > 0 {
|
||||
w.logger.Info("pushed pending changes", "count", pushed)
|
||||
}
|
||||
|
||||
// Then check for new pricelists
|
||||
err = w.service.SyncPricelistsIfNeeded()
|
||||
if err != nil {
|
||||
w.logger.Warn("failed to sync pricelists", "error", err)
|
||||
}
|
||||
|
||||
w.logger.Debug("background sync completed")
|
||||
}
|
||||
Reference in New Issue
Block a user