fix: устранение race condition и улучшение диагностики синхронизации

- SyncPricelists() теперь захватывает pricelistMu, предотвращая параллельный
  запуск фонового тикера и ручного sync (было причиной UNIQUE constraint ошибки)
- Дедупликация lot_name в fetchServerPricelistItems на случай дублей на сервере
- PushPendingChanges пишет запись в sync_log (тип "changes") при каждом запуске
- syncPricelists вызывает reportClientSchemaState через defer — состояние
  клиента отправляется на сервер независимо от исхода синхронизации

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-25 10:08:20 +03:00
parent 677b5d898f
commit 949479550c

View File

@@ -322,6 +322,12 @@ func (s *Service) NeedSync() (bool, error) {
// SyncPricelists synchronizes all active pricelists from server to local SQLite
func (s *Service) SyncPricelists() (int, error) {
s.pricelistMu.Lock()
defer s.pricelistMu.Unlock()
return s.syncPricelists()
}
func (s *Service) syncPricelists() (int, error) {
slog.Info("starting pricelist sync")
plSyncStart := time.Now()
if _, err := s.EnsureReadinessForSync(); err != nil {
@@ -336,6 +342,12 @@ func (s *Service) SyncPricelists() (int, error) {
return 0, fmt.Errorf("database not available: %w", err)
}
defer func() {
if reportErr := s.reportClientSchemaState(mariaDB, time.Now().UTC()); reportErr != nil {
slog.Warn("failed to report client state after pricelist sync", "error", reportErr)
}
}()
// Create repository
pricelistRepo := repository.NewPricelistRepository(mariaDB)
@@ -764,9 +776,16 @@ func (s *Service) fetchServerPricelistItems(serverPricelistID uint) ([]localdb.L
return nil, fmt.Errorf("getting server pricelist items: %w", err)
}
localItems := make([]localdb.LocalPricelistItem, len(serverItems))
for i, item := range serverItems {
localItems[i] = *localdb.PricelistItemToLocal(&item, 0)
seen := make(map[string]struct{}, len(serverItems))
localItems := make([]localdb.LocalPricelistItem, 0, len(serverItems))
for i := range serverItems {
lotName := serverItems[i].LotName
if _, dup := seen[lotName]; dup {
slog.Warn("duplicate lot_name in server pricelist, skipping", "pricelist_id", serverPricelistID, "lot_name", lotName)
continue
}
seen[lotName] = struct{}{}
localItems = append(localItems, *localdb.PricelistItemToLocal(&serverItems[i], 0))
}
return localItems, nil
@@ -843,7 +862,7 @@ func (s *Service) SyncPricelistsIfNeeded() error {
}
slog.Info("new pricelists detected, syncing...")
_, err = s.SyncPricelists()
_, err = s.syncPricelists()
if err != nil {
return fmt.Errorf("syncing pricelists: %w", err)
}
@@ -888,7 +907,10 @@ func (s *Service) PushPendingChanges() (int, error) {
}
slog.Info("pushing pending changes", "count", len(changes))
pushStart := time.Now()
pushed := 0
failed := 0
var firstErr string
var syncedIDs []int64
sortedChanges := prioritizeProjectChanges(changes)
@@ -899,6 +921,10 @@ func (s *Service) PushPendingChanges() (int, error) {
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
newAttempts := change.Attempts + 1
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
if firstErr == "" {
firstErr = err.Error()
}
failed++
if newAttempts >= maxPendingChangeAttempts {
slog.Error("abandoning pending change after max attempts",
"id", change.ID, "type", change.EntityType, "op", change.Operation,
@@ -919,7 +945,13 @@ func (s *Service) PushPendingChanges() (int, error) {
}
}
slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed)
if failed > 0 {
s.localDB.AppendSyncLog("changes", "error", firstErr, pushed, pushStart, time.Since(pushStart).Milliseconds())
} else {
s.localDB.AppendSyncLog("changes", "ok", "", pushed, pushStart, time.Since(pushStart).Milliseconds())
}
slog.Info("pending changes pushed", "pushed", pushed, "failed", failed)
return pushed, nil
}