fix: self-heal застрявших pending changes при broken project reference
- ensureConfigurationProject: если project не найден ни на сервере, ни локально (stale UUID после удаления), падаем в fallback «Без проекта» вместо вечной ошибки - PushPendingChanges: автоматически вызывает RepairPendingChanges() перед циклом, чтобы локально-исправимые проблемы чинились до попытки отправки - maxPendingChangeAttempts=20: после 20 неудачных попыток change считается unrecoverable и удаляется из очереди (логируется ERROR) - pushSingleChange/pushConfigurationChange: unknown entity type / operation теперь дропается с warn вместо вечного error в цикле - latestSyncErrorState: last_sync_error_text в qt_client_schema_state теперь содержит JSON-массив с type/uuid/op/attempts/error по всем застрявшим changes (до 20 штук) вместо текста только последней ошибки Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
@@ -320,14 +321,37 @@ func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) {
|
||||
return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText))
|
||||
}
|
||||
|
||||
var pending localdb.PendingChange
|
||||
var errored []localdb.PendingChange
|
||||
if err := local.DB().
|
||||
Where("TRIM(COALESCE(last_error, '')) <> ''").
|
||||
Order("id DESC").
|
||||
First(&pending).Error; err == nil {
|
||||
return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(pending.LastError))
|
||||
Limit(20).
|
||||
Find(&errored).Error; err != nil || len(errored) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, nil
|
||||
|
||||
type errorEntry struct {
|
||||
Type string `json:"type"`
|
||||
UUID string `json:"uuid"`
|
||||
Op string `json:"op"`
|
||||
Attempts int `json:"attempts"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
entries := make([]errorEntry, 0, len(errored))
|
||||
for _, ch := range errored {
|
||||
entries = append(entries, errorEntry{
|
||||
Type: ch.EntityType,
|
||||
UUID: ch.EntityUUID,
|
||||
Op: ch.Operation,
|
||||
Attempts: ch.Attempts,
|
||||
Error: strings.TrimSpace(ch.LastError),
|
||||
})
|
||||
}
|
||||
detail, jsonErr := json.Marshal(entries)
|
||||
if jsonErr != nil {
|
||||
return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(errored[0].LastError))
|
||||
}
|
||||
return optionalString("PENDING_CHANGE_ERROR"), optionalString(string(detail))
|
||||
}
|
||||
|
||||
func optionalString(value string) *string {
|
||||
|
||||
@@ -851,6 +851,11 @@ func (s *Service) SyncPricelistsIfNeeded() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// maxPendingChangeAttempts is the number of failed attempts after which a pending change
|
||||
// is considered unrecoverable and removed from the queue. Applies only to changes that
|
||||
// fail with a non-transient error (e.g. corrupt payload, unknown operation).
|
||||
const maxPendingChangeAttempts = 20
|
||||
|
||||
// PushPendingChanges pushes all pending changes to the server
|
||||
func (s *Service) PushPendingChanges() (int, error) {
|
||||
if _, err := s.EnsureReadinessForSync(); err != nil {
|
||||
@@ -864,6 +869,14 @@ func (s *Service) PushPendingChanges() (int, error) {
|
||||
slog.Info("purged orphan configuration pending changes", "removed", removed)
|
||||
}
|
||||
|
||||
// Auto-repair locally-fixable problems (e.g. stale project references)
|
||||
// before attempting to push, so that repaired changes succeed on this cycle.
|
||||
if repaired, _, repairErr := s.localDB.RepairPendingChanges(); repairErr != nil {
|
||||
slog.Warn("auto-repair of errored pending changes failed", "error", repairErr)
|
||||
} else if repaired > 0 {
|
||||
slog.Info("auto-repaired errored pending changes", "repaired", repaired)
|
||||
}
|
||||
|
||||
changes, err := s.localDB.GetPendingChanges()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("getting pending changes: %w", err)
|
||||
@@ -884,8 +897,14 @@ func (s *Service) PushPendingChanges() (int, error) {
|
||||
if err != nil {
|
||||
s.markConnectionBroken(err)
|
||||
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
||||
// Increment attempts
|
||||
newAttempts := change.Attempts + 1
|
||||
s.localDB.IncrementPendingChangeAttempts(change.ID, err.Error())
|
||||
if newAttempts >= maxPendingChangeAttempts {
|
||||
slog.Error("abandoning pending change after max attempts",
|
||||
"id", change.ID, "type", change.EntityType, "op", change.Operation,
|
||||
"attempts", newAttempts, "last_error", err.Error())
|
||||
syncedIDs = append(syncedIDs, change.ID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -912,7 +931,11 @@ func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
||||
case "configuration":
|
||||
return s.pushConfigurationChange(change)
|
||||
default:
|
||||
return fmt.Errorf("unknown entity type: %s", change.EntityType)
|
||||
// Unknown entity type: this change was queued by a newer or different build
|
||||
// and cannot be processed. Remove it from the queue.
|
||||
slog.Warn("dropping pending change with unknown entity type",
|
||||
"id", change.ID, "type", change.EntityType, "op", change.Operation)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1045,7 +1068,10 @@ func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
||||
case "delete":
|
||||
return s.pushConfigurationDelete(change)
|
||||
default:
|
||||
return fmt.Errorf("unknown operation: %s", change.Operation)
|
||||
// Unknown operation: queued by a newer or different build. Drop from queue.
|
||||
slog.Warn("dropping pending change with unknown operation",
|
||||
"id", change.ID, "type", change.EntityType, "op", change.Operation)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1245,24 +1271,30 @@ func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Confi
|
||||
|
||||
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
|
||||
if localErr != nil {
|
||||
return err
|
||||
// Project not found locally either: stale reference (project was deleted).
|
||||
// Fall through to system project so this configuration is not stuck forever.
|
||||
slog.Warn("configuration references missing project, assigning to system project",
|
||||
"cfg_uuid", cfg.UUID,
|
||||
"project_uuid", *cfg.ProjectUUID,
|
||||
)
|
||||
} else {
|
||||
modelProject := localdb.LocalToProject(localProject)
|
||||
if modelProject.OwnerUsername == "" {
|
||||
modelProject.OwnerUsername = cfg.OwnerUsername
|
||||
}
|
||||
if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
if modelProject.ID > 0 {
|
||||
serverID := modelProject.ID
|
||||
localProject.ServerID = &serverID
|
||||
localProject.SyncStatus = "synced"
|
||||
now := time.Now()
|
||||
localProject.SyncedAt = &now
|
||||
_ = s.localDB.SaveProjectPreservingUpdatedAt(localProject)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
modelProject := localdb.LocalToProject(localProject)
|
||||
if modelProject.OwnerUsername == "" {
|
||||
modelProject.OwnerUsername = cfg.OwnerUsername
|
||||
}
|
||||
if createErr := projectRepo.UpsertByUUID(modelProject); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
if modelProject.ID > 0 {
|
||||
serverID := modelProject.ID
|
||||
localProject.ServerID = &serverID
|
||||
localProject.SyncStatus = "synced"
|
||||
now := time.Now()
|
||||
localProject.SyncedAt = &now
|
||||
_ = s.localDB.SaveProjectPreservingUpdatedAt(localProject)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
systemProject := &models.Project{}
|
||||
|
||||
Reference in New Issue
Block a user