From 949479550c4c2416a28c6380f7ad156c1c4109ef Mon Sep 17 00:00:00 2001 From: Michael Chus Date: Thu, 25 Jun 2026 10:08:20 +0300 Subject: [PATCH] =?UTF-8?q?fix:=20=D1=83=D1=81=D1=82=D1=80=D0=B0=D0=BD?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D0=B5=20race=20condition=20=D0=B8=20=D1=83?= =?UTF-8?q?=D0=BB=D1=83=D1=87=D1=88=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=B4=D0=B8?= =?UTF-8?q?=D0=B0=D0=B3=D0=BD=D0=BE=D1=81=D1=82=D0=B8=D0=BA=D0=B8=20=D1=81?= =?UTF-8?q?=D0=B8=D0=BD=D1=85=D1=80=D0=BE=D0=BD=D0=B8=D0=B7=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SyncPricelists() теперь захватывает pricelistMu, предотвращая параллельный запуск фонового тикера и ручного sync (было причиной UNIQUE constraint ошибки) - Дедупликация lot_name в fetchServerPricelistItems на случай дублей на сервере - PushPendingChanges пишет запись в sync_log (тип "changes") при каждом запуске - syncPricelists вызывает reportClientSchemaState через defer — состояние клиента отправляется на сервер независимо от исхода синхронизации Co-Authored-By: Claude Sonnet 4.6 --- internal/services/sync/service.go | 42 +++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/internal/services/sync/service.go b/internal/services/sync/service.go index c02e3a8..fe8e32f 100644 --- a/internal/services/sync/service.go +++ b/internal/services/sync/service.go @@ -322,6 +322,12 @@ func (s *Service) NeedSync() (bool, error) { // SyncPricelists synchronizes all active pricelists from server to local SQLite func (s *Service) SyncPricelists() (int, error) { + s.pricelistMu.Lock() + defer s.pricelistMu.Unlock() + return s.syncPricelists() +} + +func (s *Service) syncPricelists() (int, error) { slog.Info("starting pricelist sync") plSyncStart := time.Now() if _, err := s.EnsureReadinessForSync(); err != nil { @@ -336,6 +342,12 @@ func (s *Service) SyncPricelists() (int, error) { return 0, fmt.Errorf("database not available: %w", err) } + defer func() { + if reportErr := s.reportClientSchemaState(mariaDB, time.Now().UTC()); reportErr != nil { + slog.Warn("failed to report client state after pricelist sync", "error", reportErr) + } + }() + // Create repository pricelistRepo := repository.NewPricelistRepository(mariaDB) @@ -764,9 +776,16 @@ func (s *Service) fetchServerPricelistItems(serverPricelistID uint) ([]localdb.L return nil, fmt.Errorf("getting server pricelist items: %w", err) } - localItems := make([]localdb.LocalPricelistItem, len(serverItems)) - for i, item := range serverItems { - localItems[i] = *localdb.PricelistItemToLocal(&item, 0) + seen := make(map[string]struct{}, len(serverItems)) + localItems := make([]localdb.LocalPricelistItem, 0, len(serverItems)) + for i := range serverItems { + lotName := serverItems[i].LotName + if _, dup := seen[lotName]; dup { + slog.Warn("duplicate lot_name in server pricelist, skipping", "pricelist_id", serverPricelistID, "lot_name", lotName) + continue + } + seen[lotName] = struct{}{} + localItems = append(localItems, *localdb.PricelistItemToLocal(&serverItems[i], 0)) } return localItems, nil @@ -843,7 +862,7 @@ func (s *Service) SyncPricelistsIfNeeded() error { } slog.Info("new pricelists detected, syncing...") - _, err = s.SyncPricelists() + _, err = s.syncPricelists() if err != nil { return fmt.Errorf("syncing pricelists: %w", err) } @@ -888,7 +907,10 @@ func (s *Service) PushPendingChanges() (int, error) { } slog.Info("pushing pending changes", "count", len(changes)) + pushStart := time.Now() pushed := 0 + failed := 0 + var firstErr string var syncedIDs []int64 sortedChanges := prioritizeProjectChanges(changes) @@ -899,6 +921,10 @@ func (s *Service) PushPendingChanges() (int, error) { slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err) newAttempts := change.Attempts + 1 s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error()) + if firstErr == "" { + firstErr = err.Error() + } + failed++ if newAttempts >= maxPendingChangeAttempts { slog.Error("abandoning pending change after max attempts", "id", change.ID, "type", change.EntityType, "op", change.Operation, @@ -919,7 +945,13 @@ func (s *Service) PushPendingChanges() (int, error) { } } - slog.Info("pending changes pushed", "pushed", pushed, "failed", len(changes)-pushed) + if failed > 0 { + s.localDB.AppendSyncLog("changes", "error", firstErr, pushed, pushStart, time.Since(pushStart).Milliseconds()) + } else { + s.localDB.AppendSyncLog("changes", "ok", "", pushed, pushStart, time.Since(pushStart).Milliseconds()) + } + + slog.Info("pending changes pushed", "pushed", pushed, "failed", failed) return pushed, nil }