From 6df262b8ee7b8b84d2bbdf7c579c8c77543b4a6a Mon Sep 17 00:00:00 2001 From: Mikhail Chusavitin Date: Tue, 16 Jun 2026 17:28:07 +0300 Subject: [PATCH] =?UTF-8?q?fix:=20self-heal=20=D0=B7=D0=B0=D1=81=D1=82?= =?UTF-8?q?=D1=80=D1=8F=D0=B2=D1=88=D0=B8=D1=85=20pending=20changes=20?= =?UTF-8?q?=D0=BF=D1=80=D0=B8=20broken=20project=20reference?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ensureConfigurationProject: если project не найден ни на сервере, ни локально (stale UUID после удаления), падаем в fallback «Без проекта» вместо вечной ошибки - PushPendingChanges: автоматически вызывает RepairPendingChanges() перед циклом, чтобы локально-исправимые проблемы чинились до попытки отправки - maxPendingChangeAttempts=20: после 20 неудачных попыток change считается unrecoverable и удаляется из очереди (логируется ERROR) - pushSingleChange/pushConfigurationChange: unknown entity type / operation теперь дропается с warn вместо вечного error в цикле - latestSyncErrorState: last_sync_error_text в qt_client_schema_state теперь содержит JSON-массив с type/uuid/op/attempts/error по всем застрявшим changes (до 20 штук) вместо текста только последней ошибки Co-Authored-By: Claude Sonnet 4.6 --- internal/services/sync/readiness.go | 32 +++++++++++-- internal/services/sync/service.go | 72 +++++++++++++++++++++-------- 2 files changed, 80 insertions(+), 24 deletions(-) diff --git a/internal/services/sync/readiness.go b/internal/services/sync/readiness.go index a0a2879..cd2c84d 100644 --- a/internal/services/sync/readiness.go +++ b/internal/services/sync/readiness.go @@ -1,6 +1,7 @@ package sync import ( + "encoding/json" "errors" "fmt" "log/slog" @@ -320,14 +321,37 @@ func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) { return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText)) } - var pending localdb.PendingChange + var errored []localdb.PendingChange if err := local.DB(). Where("TRIM(COALESCE(last_error, '')) <> ''"). Order("id DESC"). - First(&pending).Error; err == nil { - return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(pending.LastError)) + Limit(20). + Find(&errored).Error; err != nil || len(errored) == 0 { + return nil, nil } - return nil, nil + + type errorEntry struct { + Type string `json:"type"` + UUID string `json:"uuid"` + Op string `json:"op"` + Attempts int `json:"attempts"` + Error string `json:"error"` + } + entries := make([]errorEntry, 0, len(errored)) + for _, ch := range errored { + entries = append(entries, errorEntry{ + Type: ch.EntityType, + UUID: ch.EntityUUID, + Op: ch.Operation, + Attempts: ch.Attempts, + Error: strings.TrimSpace(ch.LastError), + }) + } + detail, jsonErr := json.Marshal(entries) + if jsonErr != nil { + return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(errored[0].LastError)) + } + return optionalString("PENDING_CHANGE_ERROR"), optionalString(string(detail)) } func optionalString(value string) *string { diff --git a/internal/services/sync/service.go b/internal/services/sync/service.go index faac173..c02e3a8 100644 --- a/internal/services/sync/service.go +++ b/internal/services/sync/service.go @@ -851,6 +851,11 @@ func (s *Service) SyncPricelistsIfNeeded() error { return nil } +// maxPendingChangeAttempts is the number of failed attempts after which a pending change +// is considered unrecoverable and removed from the queue. Applies only to changes that +// fail with a non-transient error (e.g. corrupt payload, unknown operation). +const maxPendingChangeAttempts = 20 + // PushPendingChanges pushes all pending changes to the server func (s *Service) PushPendingChanges() (int, error) { if _, err := s.EnsureReadinessForSync(); err != nil { @@ -864,6 +869,14 @@ func (s *Service) PushPendingChanges() (int, error) { slog.Info("purged orphan configuration pending changes", "removed", removed) } + // Auto-repair locally-fixable problems (e.g. stale project references) + // before attempting to push, so that repaired changes succeed on this cycle. + if repaired, _, repairErr := s.localDB.RepairPendingChanges(); repairErr != nil { + slog.Warn("auto-repair of errored pending changes failed", "error", repairErr) + } else if repaired > 0 { + slog.Info("auto-repaired errored pending changes", "repaired", repaired) + } + changes, err := s.localDB.GetPendingChanges() if err != nil { return 0, fmt.Errorf("getting pending changes: %w", err) @@ -884,8 +897,14 @@ func (s *Service) PushPendingChanges() (int, error) { if err != nil { s.markConnectionBroken(err) slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err) - // Increment attempts + newAttempts := change.Attempts + 1 s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error()) + if newAttempts >= maxPendingChangeAttempts { + slog.Error("abandoning pending change after max attempts", + "id", change.ID, "type", change.EntityType, "op", change.Operation, + "attempts", newAttempts, "last_error", err.Error()) + syncedIDs = append(syncedIDs, change.ID) + } continue } @@ -912,7 +931,11 @@ func (s *Service) pushSingleChange(change *localdb.PendingChange) error { case "configuration": return s.pushConfigurationChange(change) default: - return fmt.Errorf("unknown entity type: %s", change.EntityType) + // Unknown entity type: this change was queued by a newer or different build + // and cannot be processed. Remove it from the queue. + slog.Warn("dropping pending change with unknown entity type", + "id", change.ID, "type", change.EntityType, "op", change.Operation) + return nil } } @@ -1045,7 +1068,10 @@ func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error { case "delete": return s.pushConfigurationDelete(change) default: - return fmt.Errorf("unknown operation: %s", change.Operation) + // Unknown operation: queued by a newer or different build. Drop from queue. + slog.Warn("dropping pending change with unknown operation", + "id", change.ID, "type", change.EntityType, "op", change.Operation) + return nil } } @@ -1245,24 +1271,30 @@ func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Confi localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID) if localErr != nil { - return err + // Project not found locally either: stale reference (project was deleted). + // Fall through to system project so this configuration is not stuck forever. + slog.Warn("configuration references missing project, assigning to system project", + "cfg_uuid", cfg.UUID, + "project_uuid", *cfg.ProjectUUID, + ) + } else { + modelProject := localdb.LocalToProject(localProject) + if modelProject.OwnerUsername == "" { + modelProject.OwnerUsername = cfg.OwnerUsername + } + if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil { + return createErr + } + if modelProject.ID > 0 { + serverID := modelProject.ID + localProject.ServerID = &serverID + localProject.SyncStatus = "synced" + now := time.Now() + localProject.SyncedAt = &now + _ = s.localDB.SaveProjectPreservingUpdatedAt(localProject) + } + return nil } - modelProject := localdb.LocalToProject(localProject) - if modelProject.OwnerUsername == "" { - modelProject.OwnerUsername = cfg.OwnerUsername - } - if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil { - return createErr - } - if modelProject.ID > 0 { - serverID := modelProject.ID - localProject.ServerID = &serverID - localProject.SyncStatus = "synced" - now := time.Now() - localProject.SyncedAt = &now - _ = s.localDB.SaveProjectPreservingUpdatedAt(localProject) - } - return nil } systemProject := &models.Project{}