Fix incomplete pricelist sync status
This commit is contained in:
@@ -45,10 +45,15 @@ func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
|
||||
|
||||
// SyncStatus represents the current sync status
|
||||
type SyncStatus struct {
|
||||
LastSyncAt *time.Time `json:"last_sync_at"`
|
||||
ServerPricelists int `json:"server_pricelists"`
|
||||
LocalPricelists int `json:"local_pricelists"`
|
||||
NeedsSync bool `json:"needs_sync"`
|
||||
LastSyncAt *time.Time `json:"last_sync_at"`
|
||||
LastAttemptAt *time.Time `json:"last_attempt_at,omitempty"`
|
||||
LastSyncStatus string `json:"last_sync_status,omitempty"`
|
||||
LastSyncError string `json:"last_sync_error,omitempty"`
|
||||
ServerPricelists int `json:"server_pricelists"`
|
||||
LocalPricelists int `json:"local_pricelists"`
|
||||
NeedsSync bool `json:"needs_sync"`
|
||||
IncompleteServerSync bool `json:"incomplete_server_sync"`
|
||||
KnownServerChangesMiss bool `json:"known_server_changes_missing"`
|
||||
}
|
||||
|
||||
type UserSyncStatus struct {
|
||||
@@ -240,6 +245,9 @@ func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) {
|
||||
// GetStatus returns the current sync status
|
||||
func (s *Service) GetStatus() (*SyncStatus, error) {
|
||||
lastSync := s.localDB.GetLastSyncTime()
|
||||
lastAttempt := s.localDB.GetLastPricelistSyncAttemptAt()
|
||||
lastSyncStatus := s.localDB.GetLastPricelistSyncStatus()
|
||||
lastSyncError := s.localDB.GetLastPricelistSyncError()
|
||||
|
||||
// Count server pricelists (only if already connected, don't reconnect)
|
||||
serverCount := 0
|
||||
@@ -260,10 +268,15 @@ func (s *Service) GetStatus() (*SyncStatus, error) {
|
||||
needsSync, _ := s.NeedSync()
|
||||
|
||||
return &SyncStatus{
|
||||
LastSyncAt: lastSync,
|
||||
ServerPricelists: serverCount,
|
||||
LocalPricelists: int(localCount),
|
||||
NeedsSync: needsSync,
|
||||
LastSyncAt: lastSync,
|
||||
LastAttemptAt: lastAttempt,
|
||||
LastSyncStatus: lastSyncStatus,
|
||||
LastSyncError: lastSyncError,
|
||||
ServerPricelists: serverCount,
|
||||
LocalPricelists: int(localCount),
|
||||
NeedsSync: needsSync,
|
||||
IncompleteServerSync: needsSync && strings.EqualFold(lastSyncStatus, "failed"),
|
||||
KnownServerChangesMiss: needsSync,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -333,6 +346,7 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
// Get database connection
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
s.recordPricelistSyncFailure(err)
|
||||
return 0, fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
|
||||
@@ -342,6 +356,7 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
// Get active pricelists from server (up to 100)
|
||||
serverPricelists, _, err := pricelistRepo.ListActive(0, 100)
|
||||
if err != nil {
|
||||
s.recordPricelistSyncFailure(err)
|
||||
return 0, fmt.Errorf("getting active server pricelists: %w", err)
|
||||
}
|
||||
serverPricelistIDs := make([]uint, 0, len(serverPricelists))
|
||||
@@ -350,6 +365,7 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
}
|
||||
|
||||
synced := 0
|
||||
var syncErr error
|
||||
for _, pl := range serverPricelists {
|
||||
// Check if pricelist already exists locally
|
||||
existing, _ := s.localDB.GetLocalPricelistByServerID(pl.ID)
|
||||
@@ -358,6 +374,9 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
if s.localDB.CountLocalPricelistItems(existing.ID) == 0 {
|
||||
itemCount, err := s.SyncPricelistItems(existing.ID)
|
||||
if err != nil {
|
||||
if syncErr == nil {
|
||||
syncErr = fmt.Errorf("sync items for existing pricelist %s: %w", pl.Version, err)
|
||||
}
|
||||
slog.Warn("failed to sync missing pricelist items for existing local pricelist", "version", pl.Version, "error", err)
|
||||
} else {
|
||||
slog.Info("synced missing pricelist items for existing local pricelist", "version", pl.Version, "items", itemCount)
|
||||
@@ -377,19 +396,15 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
IsUsed: false,
|
||||
}
|
||||
|
||||
if err := s.localDB.SaveLocalPricelist(localPL); err != nil {
|
||||
slog.Warn("failed to save local pricelist", "version", pl.Version, "error", err)
|
||||
itemCount, err := s.syncNewPricelistSnapshot(localPL)
|
||||
if err != nil {
|
||||
if syncErr == nil {
|
||||
syncErr = fmt.Errorf("sync new pricelist %s: %w", pl.Version, err)
|
||||
}
|
||||
slog.Warn("failed to sync pricelist snapshot", "version", pl.Version, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Sync items for the newly created pricelist
|
||||
itemCount, err := s.SyncPricelistItems(localPL.ID)
|
||||
if err != nil {
|
||||
slog.Warn("failed to sync pricelist items", "version", pl.Version, "error", err)
|
||||
// Continue even if items sync fails - we have the pricelist metadata
|
||||
} else {
|
||||
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
|
||||
}
|
||||
slog.Debug("synced pricelist with items", "version", pl.Version, "items", itemCount)
|
||||
|
||||
synced++
|
||||
}
|
||||
@@ -404,14 +419,78 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
// Backfill lot_category for used pricelists (older local caches may miss the column values).
|
||||
s.backfillUsedPricelistItemCategories(pricelistRepo, serverPricelistIDs)
|
||||
|
||||
if syncErr != nil {
|
||||
s.recordPricelistSyncFailure(syncErr)
|
||||
return synced, syncErr
|
||||
}
|
||||
|
||||
// Update last sync time
|
||||
s.localDB.SetLastSyncTime(time.Now())
|
||||
now := time.Now()
|
||||
s.localDB.SetLastSyncTime(now)
|
||||
s.recordPricelistSyncSuccess(now)
|
||||
s.RecordSyncHeartbeat()
|
||||
|
||||
slog.Info("pricelist sync completed", "synced", synced, "total", len(serverPricelists))
|
||||
return synced, nil
|
||||
}
|
||||
|
||||
func (s *Service) recordPricelistSyncSuccess(at time.Time) {
|
||||
if s.localDB == nil {
|
||||
return
|
||||
}
|
||||
if err := s.localDB.SetPricelistSyncResult("success", "", at); err != nil {
|
||||
slog.Warn("failed to persist pricelist sync success state", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) recordPricelistSyncFailure(syncErr error) {
|
||||
if s.localDB == nil || syncErr == nil {
|
||||
return
|
||||
}
|
||||
if err := s.localDB.SetPricelistSyncResult("failed", syncErr.Error(), time.Now()); err != nil {
|
||||
slog.Warn("failed to persist pricelist sync failure state", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) syncNewPricelistSnapshot(localPL *localdb.LocalPricelist) (int, error) {
|
||||
if localPL == nil {
|
||||
return 0, fmt.Errorf("local pricelist is nil")
|
||||
}
|
||||
|
||||
localItems, err := s.fetchServerPricelistItems(localPL.ServerID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err := s.localDB.DB().Transaction(func(tx *gorm.DB) error {
|
||||
if err := tx.Create(localPL).Error; err != nil {
|
||||
return fmt.Errorf("save local pricelist: %w", err)
|
||||
}
|
||||
if len(localItems) == 0 {
|
||||
return nil
|
||||
}
|
||||
for i := range localItems {
|
||||
localItems[i].PricelistID = localPL.ID
|
||||
}
|
||||
batchSize := 500
|
||||
for i := 0; i < len(localItems); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(localItems) {
|
||||
end = len(localItems)
|
||||
}
|
||||
if err := tx.CreateInBatches(localItems[i:end], batchSize).Error; err != nil {
|
||||
return fmt.Errorf("save local pricelist items: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
slog.Info("synced pricelist items", "pricelist_id", localPL.ID, "items", len(localItems))
|
||||
return len(localItems), nil
|
||||
}
|
||||
|
||||
func (s *Service) backfillUsedPricelistItemCategories(pricelistRepo *repository.PricelistRepository, activeServerPricelistIDs []uint) {
|
||||
if s.localDB == nil || pricelistRepo == nil {
|
||||
return
|
||||
@@ -670,30 +749,13 @@ func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
||||
return int(existingCount), nil
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.getDB()
|
||||
localItems, err := s.fetchServerPricelistItems(localPL.ServerID)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("database not available: %w", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Create repository
|
||||
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
||||
|
||||
// Get items from server
|
||||
serverItems, _, err := pricelistRepo.GetItems(localPL.ServerID, 0, 10000, "")
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("getting server pricelist items: %w", err)
|
||||
for i := range localItems {
|
||||
localItems[i].PricelistID = localPricelistID
|
||||
}
|
||||
|
||||
// Convert and save locally
|
||||
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
||||
for i, item := range serverItems {
|
||||
localItems[i] = *localdb.PricelistItemToLocal(&item, localPricelistID)
|
||||
}
|
||||
if err := s.enrichLocalPricelistItemsWithStock(mariaDB, localItems); err != nil {
|
||||
slog.Warn("pricelist stock enrichment skipped", "pricelist_id", localPricelistID, "error", err)
|
||||
}
|
||||
|
||||
if err := s.localDB.SaveLocalPricelistItems(localItems); err != nil {
|
||||
return 0, fmt.Errorf("saving local pricelist items: %w", err)
|
||||
}
|
||||
@@ -702,6 +764,33 @@ func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
||||
return len(localItems), nil
|
||||
}
|
||||
|
||||
func (s *Service) fetchServerPricelistItems(serverPricelistID uint) ([]localdb.LocalPricelistItem, error) {
|
||||
// Get database connection
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
|
||||
// Create repository
|
||||
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
||||
|
||||
// Get items from server
|
||||
serverItems, _, err := pricelistRepo.GetItems(serverPricelistID, 0, 10000, "")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("getting server pricelist items: %w", err)
|
||||
}
|
||||
|
||||
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
|
||||
for i, item := range serverItems {
|
||||
localItems[i] = *localdb.PricelistItemToLocal(&item, 0)
|
||||
}
|
||||
if err := s.enrichLocalPricelistItemsWithStock(mariaDB, localItems); err != nil {
|
||||
slog.Warn("pricelist stock enrichment skipped", "server_pricelist_id", serverPricelistID, "error", err)
|
||||
}
|
||||
|
||||
return localItems, nil
|
||||
}
|
||||
|
||||
// SyncPricelistItemsByServerID syncs items for a pricelist by its server ID
|
||||
func (s *Service) SyncPricelistItemsByServerID(serverPricelistID uint) (int, error) {
|
||||
localPL, err := s.localDB.GetLocalPricelistByServerID(serverPricelistID)
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
package sync_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/models"
|
||||
syncsvc "git.mchus.pro/mchus/quoteforge/internal/services/sync"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func TestSyncPricelistsDeletesMissingUnusedLocalPricelists(t *testing.T) {
|
||||
@@ -83,3 +86,58 @@ func TestSyncPricelistsDeletesMissingUnusedLocalPricelists(t *testing.T) {
|
||||
t.Fatalf("expected server pricelist to be synced locally: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPricelistsDoesNotPersistHeaderWithoutItems(t *testing.T) {
|
||||
local := newLocalDBForSyncTest(t)
|
||||
serverDB := newServerDBForSyncTest(t)
|
||||
if err := serverDB.AutoMigrate(&models.Pricelist{}, &models.PricelistItem{}); err != nil {
|
||||
t.Fatalf("migrate server pricelist tables: %v", err)
|
||||
}
|
||||
|
||||
serverPL := models.Pricelist{
|
||||
Source: "estimate",
|
||||
Version: "2026-03-17-001",
|
||||
Notification: "server",
|
||||
CreatedBy: "tester",
|
||||
IsActive: true,
|
||||
CreatedAt: time.Now().Add(-1 * time.Hour),
|
||||
}
|
||||
if err := serverDB.Create(&serverPL).Error; err != nil {
|
||||
t.Fatalf("create server pricelist: %v", err)
|
||||
}
|
||||
if err := serverDB.Create(&models.PricelistItem{PricelistID: serverPL.ID, LotName: "CPU_A", Price: 10}).Error; err != nil {
|
||||
t.Fatalf("create server pricelist item: %v", err)
|
||||
}
|
||||
|
||||
const callbackName = "test:fail_qt_pricelist_items_query"
|
||||
if err := serverDB.Callback().Query().Before("gorm:query").Register(callbackName, func(db *gorm.DB) {
|
||||
if db.Statement != nil && db.Statement.Table == "qt_pricelist_items" {
|
||||
_ = db.AddError(errors.New("forced pricelist item fetch failure"))
|
||||
}
|
||||
}); err != nil {
|
||||
t.Fatalf("register query callback: %v", err)
|
||||
}
|
||||
defer serverDB.Callback().Query().Remove(callbackName)
|
||||
|
||||
svc := syncsvc.NewServiceWithDB(serverDB, local)
|
||||
synced, err := svc.SyncPricelists()
|
||||
if err == nil {
|
||||
t.Fatalf("expected sync error when item fetch fails")
|
||||
}
|
||||
if synced != 0 {
|
||||
t.Fatalf("expected synced=0 on incomplete sync, got %d", synced)
|
||||
}
|
||||
if !strings.Contains(err.Error(), "forced pricelist item fetch failure") {
|
||||
t.Fatalf("expected item fetch error, got %v", err)
|
||||
}
|
||||
|
||||
if _, err := local.GetLocalPricelistByServerID(serverPL.ID); err == nil {
|
||||
t.Fatalf("expected pricelist header not to be persisted without items")
|
||||
}
|
||||
if got := local.CountLocalPricelists(); got != 0 {
|
||||
t.Fatalf("expected no local pricelists after failed sync, got %d", got)
|
||||
}
|
||||
if ts := local.GetLastSyncTime(); ts != nil {
|
||||
t.Fatalf("expected last_pricelist_sync to stay unset on incomplete sync, got %v", ts)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user