6df262b8ee
- ensureConfigurationProject: если project не найден ни на сервере, ни локально (stale UUID после удаления), падаем в fallback «Без проекта» вместо вечной ошибки - PushPendingChanges: автоматически вызывает RepairPendingChanges() перед циклом, чтобы локально-исправимые проблемы чинились до попытки отправки - maxPendingChangeAttempts=20: после 20 неудачных попыток change считается unrecoverable и удаляется из очереди (логируется ERROR) - pushSingleChange/pushConfigurationChange: unknown entity type / operation теперь дропается с warn вместо вечного error в цикле - latestSyncErrorState: last_sync_error_text в qt_client_schema_state теперь содержит JSON-массив с type/uuid/op/attempts/error по всем застрявшим changes (до 20 штук) вместо текста только последней ошибки Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
379 lines
14 KiB
Go
379 lines
14 KiB
Go
package sync
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
|
|
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
const (
|
|
ReadinessReady = "ready"
|
|
ReadinessBlocked = "blocked"
|
|
ReadinessUnknown = "unknown"
|
|
)
|
|
|
|
var ErrSyncBlockedByReadiness = errors.New("sync blocked by readiness guard")
|
|
|
|
type SyncReadiness struct {
|
|
Status string `json:"status"`
|
|
Blocked bool `json:"blocked"`
|
|
ReasonCode string `json:"reason_code,omitempty"`
|
|
ReasonText string `json:"reason_text,omitempty"`
|
|
RequiredMinAppVersion *string `json:"required_min_app_version,omitempty"`
|
|
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
|
|
}
|
|
|
|
type SyncBlockedError struct {
|
|
Readiness SyncReadiness
|
|
}
|
|
|
|
func (e *SyncBlockedError) Error() string {
|
|
if e == nil {
|
|
return ErrSyncBlockedByReadiness.Error()
|
|
}
|
|
if strings.TrimSpace(e.Readiness.ReasonText) != "" {
|
|
return e.Readiness.ReasonText
|
|
}
|
|
return ErrSyncBlockedByReadiness.Error()
|
|
}
|
|
|
|
func (s *Service) EnsureReadinessForSync() (*SyncReadiness, error) {
|
|
readiness, err := s.GetReadiness()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if readiness.Blocked {
|
|
return readiness, &SyncBlockedError{Readiness: *readiness}
|
|
}
|
|
return readiness, nil
|
|
}
|
|
|
|
func (s *Service) GetReadiness() (*SyncReadiness, error) {
|
|
now := time.Now().UTC()
|
|
if !s.isOnline() {
|
|
return s.blockedReadiness(
|
|
now,
|
|
"OFFLINE_UNVERIFIED_SCHEMA",
|
|
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
|
|
nil,
|
|
)
|
|
}
|
|
|
|
mariaDB, err := s.getDB()
|
|
if err != nil || mariaDB == nil {
|
|
return s.blockedReadiness(
|
|
now,
|
|
"OFFLINE_UNVERIFIED_SCHEMA",
|
|
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
|
|
nil,
|
|
)
|
|
}
|
|
|
|
s.schemaOnce.Do(func() {
|
|
if err := ensureClientSchemaStateTable(mariaDB); err != nil {
|
|
slog.Warn("qt_client_schema_state migration skipped (no DDL rights — run server migrate)", "error", err)
|
|
}
|
|
})
|
|
if err := s.reportClientSchemaState(mariaDB, now); err != nil {
|
|
slog.Warn("failed to report client schema state", "error", err)
|
|
}
|
|
|
|
ready := &SyncReadiness{Status: ReadinessReady, Blocked: false, LastCheckedAt: &now}
|
|
if setErr := s.localDB.SetSyncGuardState(ReadinessReady, "", "", nil, &now); setErr != nil {
|
|
slog.Warn("failed to persist sync guard state", "error", setErr)
|
|
}
|
|
return ready, nil
|
|
}
|
|
|
|
func (s *Service) blockedReadiness(now time.Time, code, text string, minVersion *string) (*SyncReadiness, error) {
|
|
readiness := &SyncReadiness{
|
|
Status: ReadinessBlocked,
|
|
Blocked: true,
|
|
ReasonCode: code,
|
|
ReasonText: text,
|
|
RequiredMinAppVersion: minVersion,
|
|
LastCheckedAt: &now,
|
|
}
|
|
if err := s.localDB.SetSyncGuardState(ReadinessBlocked, code, text, minVersion, &now); err != nil {
|
|
slog.Warn("failed to persist blocked sync guard state", "error", err)
|
|
}
|
|
return readiness, nil
|
|
}
|
|
|
|
func (s *Service) isOnline() bool {
|
|
if s.directDB != nil {
|
|
return true
|
|
}
|
|
if s.connMgr == nil {
|
|
return false
|
|
}
|
|
return s.connMgr.IsOnline()
|
|
}
|
|
|
|
func ensureClientSchemaStateTable(db *gorm.DB) error {
|
|
if !tableExists(db, "qt_client_schema_state") {
|
|
if err := db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS qt_client_schema_state (
|
|
username VARCHAR(100) NOT NULL,
|
|
hostname VARCHAR(255) NOT NULL DEFAULT '',
|
|
app_version VARCHAR(64) NULL,
|
|
last_sync_at DATETIME NULL,
|
|
last_sync_status VARCHAR(32) NULL,
|
|
pending_changes_count INT NOT NULL DEFAULT 0,
|
|
pending_errors_count INT NOT NULL DEFAULT 0,
|
|
configurations_count INT NOT NULL DEFAULT 0,
|
|
projects_count INT NOT NULL DEFAULT 0,
|
|
estimate_pricelist_version VARCHAR(128) NULL,
|
|
warehouse_pricelist_version VARCHAR(128) NULL,
|
|
competitor_pricelist_version VARCHAR(128) NULL,
|
|
last_sync_error_code VARCHAR(128) NULL,
|
|
last_sync_error_text TEXT NULL,
|
|
last_checked_at DATETIME NOT NULL,
|
|
updated_at DATETIME NOT NULL,
|
|
PRIMARY KEY (username, hostname),
|
|
INDEX idx_qt_client_schema_state_checked (last_checked_at)
|
|
)
|
|
`).Error; err != nil {
|
|
return fmt.Errorf("create qt_client_schema_state table: %w", err)
|
|
}
|
|
}
|
|
|
|
if tableExists(db, "qt_client_schema_state") {
|
|
// Each ALTER is guarded by a column existence check so users without DDL
|
|
// rights don't get a permission error on every sync cycle — the server
|
|
// migration tool is the authoritative path for schema changes.
|
|
if !columnExists(db, "qt_client_schema_state", "hostname") {
|
|
if err := db.Exec(`
|
|
ALTER TABLE qt_client_schema_state
|
|
ADD COLUMN IF NOT EXISTS hostname VARCHAR(255) NOT NULL DEFAULT '' AFTER username
|
|
`).Error; err != nil {
|
|
return fmt.Errorf("add qt_client_schema_state.hostname: %w", err)
|
|
}
|
|
if err := db.Exec(`
|
|
ALTER TABLE qt_client_schema_state
|
|
DROP PRIMARY KEY,
|
|
ADD PRIMARY KEY (username, hostname)
|
|
`).Error; err != nil && !isDuplicatePrimaryKeyDefinition(err) {
|
|
return fmt.Errorf("set qt_client_schema_state primary key: %w", err)
|
|
}
|
|
}
|
|
|
|
type colMigration struct {
|
|
column string
|
|
stmt string
|
|
}
|
|
migrations := []colMigration{
|
|
{"last_sync_at", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_at DATETIME NULL AFTER app_version"},
|
|
{"last_sync_status", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_status VARCHAR(32) NULL AFTER last_sync_at"},
|
|
{"pending_changes_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_changes_count INT NOT NULL DEFAULT 0 AFTER last_sync_status"},
|
|
{"pending_errors_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_errors_count INT NOT NULL DEFAULT 0 AFTER pending_changes_count"},
|
|
{"configurations_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS configurations_count INT NOT NULL DEFAULT 0 AFTER pending_errors_count"},
|
|
{"projects_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS projects_count INT NOT NULL DEFAULT 0 AFTER configurations_count"},
|
|
{"estimate_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS estimate_pricelist_version VARCHAR(128) NULL AFTER projects_count"},
|
|
{"warehouse_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS warehouse_pricelist_version VARCHAR(128) NULL AFTER estimate_pricelist_version"},
|
|
{"competitor_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS competitor_pricelist_version VARCHAR(128) NULL AFTER warehouse_pricelist_version"},
|
|
{"last_sync_error_code", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_code VARCHAR(128) NULL AFTER competitor_pricelist_version"},
|
|
{"last_sync_error_text", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_text TEXT NULL AFTER last_sync_error_code"},
|
|
{"local_pricelist_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS local_pricelist_count INT NOT NULL DEFAULT 0 AFTER last_sync_error_text"},
|
|
{"pricelist_items_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pricelist_items_count INT NOT NULL DEFAULT 0 AFTER local_pricelist_count"},
|
|
{"components_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS components_count INT NOT NULL DEFAULT 0 AFTER pricelist_items_count"},
|
|
{"db_size_bytes", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS db_size_bytes BIGINT NOT NULL DEFAULT 0 AFTER components_count"},
|
|
}
|
|
for _, m := range migrations {
|
|
if columnExists(db, "qt_client_schema_state", m.column) {
|
|
continue
|
|
}
|
|
if err := db.Exec(m.stmt).Error; err != nil {
|
|
return fmt.Errorf("expand qt_client_schema_state: %w", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func columnExists(db *gorm.DB, tableName, columnName string) bool {
|
|
var count int64
|
|
if err := db.Raw(`
|
|
SELECT COUNT(*) FROM information_schema.COLUMNS
|
|
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? AND COLUMN_NAME = ?
|
|
`, tableName, columnName).Scan(&count).Error; err != nil {
|
|
return false
|
|
}
|
|
return count > 0
|
|
}
|
|
|
|
func tableExists(db *gorm.DB, tableName string) bool {
|
|
var count int64
|
|
// For MariaDB/MySQL, check information_schema
|
|
if err := db.Raw(`
|
|
SELECT COUNT(*) FROM information_schema.TABLES
|
|
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ?
|
|
`, tableName).Scan(&count).Error; err != nil {
|
|
return false
|
|
}
|
|
return count > 0
|
|
}
|
|
|
|
func (s *Service) reportClientSchemaState(mariaDB *gorm.DB, checkedAt time.Time) error {
|
|
if strings.EqualFold(mariaDB.Dialector.Name(), "sqlite") {
|
|
return nil
|
|
}
|
|
username := strings.TrimSpace(s.localDB.GetDBUser())
|
|
if username == "" {
|
|
return nil
|
|
}
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = ""
|
|
}
|
|
hostname = strings.TrimSpace(hostname)
|
|
lastSyncAt := s.localDB.GetLastSyncTime()
|
|
lastSyncStatus := ReadinessReady
|
|
pendingChangesCount := s.localDB.CountPendingChanges()
|
|
pendingErrorsCount := s.localDB.CountErroredChanges()
|
|
configurationsCount := s.localDB.CountConfigurations()
|
|
projectsCount := s.localDB.CountProjects()
|
|
estimateVersion := latestPricelistVersion(s.localDB, "estimate")
|
|
warehouseVersion := latestPricelistVersion(s.localDB, "warehouse")
|
|
competitorVersion := latestPricelistVersion(s.localDB, "competitor")
|
|
lastSyncErrorCode, lastSyncErrorText := latestSyncErrorState(s.localDB)
|
|
localPricelistCount := s.localDB.CountLocalPricelists()
|
|
pricelistItemsCount := s.localDB.CountAllPricelistItems()
|
|
componentsCount := s.localDB.CountComponents()
|
|
dbSizeBytes := s.localDB.DBFileSizeBytes()
|
|
return mariaDB.Exec(`
|
|
INSERT INTO qt_client_schema_state (
|
|
username, hostname, app_version,
|
|
last_sync_at, last_sync_status, pending_changes_count, pending_errors_count,
|
|
configurations_count, projects_count,
|
|
estimate_pricelist_version, warehouse_pricelist_version, competitor_pricelist_version,
|
|
last_sync_error_code, last_sync_error_text,
|
|
local_pricelist_count, pricelist_items_count, components_count, db_size_bytes,
|
|
last_checked_at, updated_at
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
app_version = VALUES(app_version),
|
|
last_sync_at = VALUES(last_sync_at),
|
|
last_sync_status = VALUES(last_sync_status),
|
|
pending_changes_count = VALUES(pending_changes_count),
|
|
pending_errors_count = VALUES(pending_errors_count),
|
|
configurations_count = VALUES(configurations_count),
|
|
projects_count = VALUES(projects_count),
|
|
estimate_pricelist_version = VALUES(estimate_pricelist_version),
|
|
warehouse_pricelist_version = VALUES(warehouse_pricelist_version),
|
|
competitor_pricelist_version = VALUES(competitor_pricelist_version),
|
|
last_sync_error_code = VALUES(last_sync_error_code),
|
|
last_sync_error_text = VALUES(last_sync_error_text),
|
|
local_pricelist_count = VALUES(local_pricelist_count),
|
|
pricelist_items_count = VALUES(pricelist_items_count),
|
|
components_count = VALUES(components_count),
|
|
db_size_bytes = VALUES(db_size_bytes),
|
|
last_checked_at = VALUES(last_checked_at),
|
|
updated_at = VALUES(updated_at)
|
|
`, username, hostname, appmeta.Version(),
|
|
lastSyncAt, lastSyncStatus, pendingChangesCount, pendingErrorsCount,
|
|
configurationsCount, projectsCount,
|
|
estimateVersion, warehouseVersion, competitorVersion,
|
|
lastSyncErrorCode, lastSyncErrorText,
|
|
localPricelistCount, pricelistItemsCount, componentsCount, dbSizeBytes,
|
|
checkedAt, checkedAt).Error
|
|
}
|
|
|
|
func isDuplicatePrimaryKeyDefinition(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
msg := strings.ToLower(err.Error())
|
|
return strings.Contains(msg, "multiple primary key defined") ||
|
|
strings.Contains(msg, "duplicate key name 'primary'") ||
|
|
strings.Contains(msg, "duplicate entry")
|
|
}
|
|
|
|
func latestPricelistVersion(local *localdb.LocalDB, source string) *string {
|
|
if local == nil {
|
|
return nil
|
|
}
|
|
pl, err := local.GetLatestLocalPricelistBySource(source)
|
|
if err != nil || pl == nil {
|
|
return nil
|
|
}
|
|
version := strings.TrimSpace(pl.Version)
|
|
if version == "" {
|
|
return nil
|
|
}
|
|
return &version
|
|
}
|
|
|
|
func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) {
|
|
if local == nil {
|
|
return nil, nil
|
|
}
|
|
if guard, err := local.GetSyncGuardState(); err == nil && guard != nil && strings.EqualFold(guard.Status, ReadinessBlocked) {
|
|
return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText))
|
|
}
|
|
|
|
var errored []localdb.PendingChange
|
|
if err := local.DB().
|
|
Where("TRIM(COALESCE(last_error, '')) <> ''").
|
|
Order("id DESC").
|
|
Limit(20).
|
|
Find(&errored).Error; err != nil || len(errored) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
type errorEntry struct {
|
|
Type string `json:"type"`
|
|
UUID string `json:"uuid"`
|
|
Op string `json:"op"`
|
|
Attempts int `json:"attempts"`
|
|
Error string `json:"error"`
|
|
}
|
|
entries := make([]errorEntry, 0, len(errored))
|
|
for _, ch := range errored {
|
|
entries = append(entries, errorEntry{
|
|
Type: ch.EntityType,
|
|
UUID: ch.EntityUUID,
|
|
Op: ch.Operation,
|
|
Attempts: ch.Attempts,
|
|
Error: strings.TrimSpace(ch.LastError),
|
|
})
|
|
}
|
|
detail, jsonErr := json.Marshal(entries)
|
|
if jsonErr != nil {
|
|
return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(errored[0].LastError))
|
|
}
|
|
return optionalString("PENDING_CHANGE_ERROR"), optionalString(string(detail))
|
|
}
|
|
|
|
func optionalString(value string) *string {
|
|
if strings.TrimSpace(value) == "" {
|
|
return nil
|
|
}
|
|
v := strings.TrimSpace(value)
|
|
return &v
|
|
}
|
|
|
|
func toReadinessFromState(state *localdb.LocalSyncGuardState) *SyncReadiness {
|
|
if state == nil {
|
|
return nil
|
|
}
|
|
blocked := state.Status == ReadinessBlocked
|
|
return &SyncReadiness{
|
|
Status: state.Status,
|
|
Blocked: blocked,
|
|
ReasonCode: state.ReasonCode,
|
|
ReasonText: state.ReasonText,
|
|
RequiredMinAppVersion: state.RequiredMinAppVersion,
|
|
LastCheckedAt: state.LastCheckedAt,
|
|
}
|
|
}
|