package sync import ( "encoding/json" "errors" "fmt" "log/slog" "os" "strings" "time" "git.mchus.pro/mchus/quoteforge/internal/appmeta" "git.mchus.pro/mchus/quoteforge/internal/localdb" "gorm.io/gorm" ) const ( ReadinessReady = "ready" ReadinessBlocked = "blocked" ReadinessUnknown = "unknown" ) var ErrSyncBlockedByReadiness = errors.New("sync blocked by readiness guard") type SyncReadiness struct { Status string `json:"status"` Blocked bool `json:"blocked"` ReasonCode string `json:"reason_code,omitempty"` ReasonText string `json:"reason_text,omitempty"` RequiredMinAppVersion *string `json:"required_min_app_version,omitempty"` LastCheckedAt *time.Time `json:"last_checked_at,omitempty"` } type SyncBlockedError struct { Readiness SyncReadiness } func (e *SyncBlockedError) Error() string { if e == nil { return ErrSyncBlockedByReadiness.Error() } if strings.TrimSpace(e.Readiness.ReasonText) != "" { return e.Readiness.ReasonText } return ErrSyncBlockedByReadiness.Error() } func (s *Service) EnsureReadinessForSync() (*SyncReadiness, error) { readiness, err := s.GetReadiness() if err != nil { return nil, err } if readiness.Blocked { return readiness, &SyncBlockedError{Readiness: *readiness} } return readiness, nil } func (s *Service) GetReadiness() (*SyncReadiness, error) { now := time.Now().UTC() if !s.isOnline() { return s.blockedReadiness( now, "OFFLINE_UNVERIFIED_SCHEMA", "Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.", nil, ) } mariaDB, err := s.getDB() if err != nil || mariaDB == nil { return s.blockedReadiness( now, "OFFLINE_UNVERIFIED_SCHEMA", "Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.", nil, ) } s.schemaOnce.Do(func() { if err := ensureClientSchemaStateTable(mariaDB); err != nil { slog.Warn("qt_client_schema_state migration skipped (no DDL rights — run server migrate)", "error", err) } }) if err := s.reportClientSchemaState(mariaDB, now); err != nil { slog.Warn("failed to report client schema state", "error", err) } ready := &SyncReadiness{Status: ReadinessReady, Blocked: false, LastCheckedAt: &now} if setErr := s.localDB.SetSyncGuardState(ReadinessReady, "", "", nil, &now); setErr != nil { slog.Warn("failed to persist sync guard state", "error", setErr) } return ready, nil } func (s *Service) blockedReadiness(now time.Time, code, text string, minVersion *string) (*SyncReadiness, error) { readiness := &SyncReadiness{ Status: ReadinessBlocked, Blocked: true, ReasonCode: code, ReasonText: text, RequiredMinAppVersion: minVersion, LastCheckedAt: &now, } if err := s.localDB.SetSyncGuardState(ReadinessBlocked, code, text, minVersion, &now); err != nil { slog.Warn("failed to persist blocked sync guard state", "error", err) } return readiness, nil } func (s *Service) isOnline() bool { if s.directDB != nil { return true } if s.connMgr == nil { return false } return s.connMgr.IsOnline() } func ensureClientSchemaStateTable(db *gorm.DB) error { if !tableExists(db, "qt_client_schema_state") { if err := db.Exec(` CREATE TABLE IF NOT EXISTS qt_client_schema_state ( username VARCHAR(100) NOT NULL, hostname VARCHAR(255) NOT NULL DEFAULT '', app_version VARCHAR(64) NULL, last_sync_at DATETIME NULL, last_sync_status VARCHAR(32) NULL, pending_changes_count INT NOT NULL DEFAULT 0, pending_errors_count INT NOT NULL DEFAULT 0, configurations_count INT NOT NULL DEFAULT 0, projects_count INT NOT NULL DEFAULT 0, estimate_pricelist_version VARCHAR(128) NULL, warehouse_pricelist_version VARCHAR(128) NULL, competitor_pricelist_version VARCHAR(128) NULL, last_sync_error_code VARCHAR(128) NULL, last_sync_error_text TEXT NULL, last_checked_at DATETIME NOT NULL, updated_at DATETIME NOT NULL, PRIMARY KEY (username, hostname), INDEX idx_qt_client_schema_state_checked (last_checked_at) ) `).Error; err != nil { return fmt.Errorf("create qt_client_schema_state table: %w", err) } } if tableExists(db, "qt_client_schema_state") { // Each ALTER is guarded by a column existence check so users without DDL // rights don't get a permission error on every sync cycle — the server // migration tool is the authoritative path for schema changes. if !columnExists(db, "qt_client_schema_state", "hostname") { if err := db.Exec(` ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS hostname VARCHAR(255) NOT NULL DEFAULT '' AFTER username `).Error; err != nil { return fmt.Errorf("add qt_client_schema_state.hostname: %w", err) } if err := db.Exec(` ALTER TABLE qt_client_schema_state DROP PRIMARY KEY, ADD PRIMARY KEY (username, hostname) `).Error; err != nil && !isDuplicatePrimaryKeyDefinition(err) { return fmt.Errorf("set qt_client_schema_state primary key: %w", err) } } type colMigration struct { column string stmt string } migrations := []colMigration{ {"last_sync_at", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_at DATETIME NULL AFTER app_version"}, {"last_sync_status", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_status VARCHAR(32) NULL AFTER last_sync_at"}, {"pending_changes_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_changes_count INT NOT NULL DEFAULT 0 AFTER last_sync_status"}, {"pending_errors_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_errors_count INT NOT NULL DEFAULT 0 AFTER pending_changes_count"}, {"configurations_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS configurations_count INT NOT NULL DEFAULT 0 AFTER pending_errors_count"}, {"projects_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS projects_count INT NOT NULL DEFAULT 0 AFTER configurations_count"}, {"estimate_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS estimate_pricelist_version VARCHAR(128) NULL AFTER projects_count"}, {"warehouse_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS warehouse_pricelist_version VARCHAR(128) NULL AFTER estimate_pricelist_version"}, {"competitor_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS competitor_pricelist_version VARCHAR(128) NULL AFTER warehouse_pricelist_version"}, {"last_sync_error_code", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_code VARCHAR(128) NULL AFTER competitor_pricelist_version"}, {"last_sync_error_text", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_text TEXT NULL AFTER last_sync_error_code"}, {"local_pricelist_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS local_pricelist_count INT NOT NULL DEFAULT 0 AFTER last_sync_error_text"}, {"pricelist_items_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pricelist_items_count INT NOT NULL DEFAULT 0 AFTER local_pricelist_count"}, {"components_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS components_count INT NOT NULL DEFAULT 0 AFTER pricelist_items_count"}, {"db_size_bytes", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS db_size_bytes BIGINT NOT NULL DEFAULT 0 AFTER components_count"}, } for _, m := range migrations { if columnExists(db, "qt_client_schema_state", m.column) { continue } if err := db.Exec(m.stmt).Error; err != nil { return fmt.Errorf("expand qt_client_schema_state: %w", err) } } } return nil } func columnExists(db *gorm.DB, tableName, columnName string) bool { var count int64 if err := db.Raw(` SELECT COUNT(*) FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? AND COLUMN_NAME = ? `, tableName, columnName).Scan(&count).Error; err != nil { return false } return count > 0 } func tableExists(db *gorm.DB, tableName string) bool { var count int64 // For MariaDB/MySQL, check information_schema if err := db.Raw(` SELECT COUNT(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? `, tableName).Scan(&count).Error; err != nil { return false } return count > 0 } func (s *Service) reportClientSchemaState(mariaDB *gorm.DB, checkedAt time.Time) error { if strings.EqualFold(mariaDB.Dialector.Name(), "sqlite") { return nil } username := strings.TrimSpace(s.localDB.GetDBUser()) if username == "" { return nil } hostname, err := os.Hostname() if err != nil { hostname = "" } hostname = strings.TrimSpace(hostname) lastSyncAt := s.localDB.GetLastSyncTime() lastSyncStatus := ReadinessReady pendingChangesCount := s.localDB.CountPendingChanges() pendingErrorsCount := s.localDB.CountErroredChanges() configurationsCount := s.localDB.CountConfigurations() projectsCount := s.localDB.CountProjects() estimateVersion := latestPricelistVersion(s.localDB, "estimate") warehouseVersion := latestPricelistVersion(s.localDB, "warehouse") competitorVersion := latestPricelistVersion(s.localDB, "competitor") lastSyncErrorCode, lastSyncErrorText := latestSyncErrorState(s.localDB) localPricelistCount := s.localDB.CountLocalPricelists() pricelistItemsCount := s.localDB.CountAllPricelistItems() componentsCount := s.localDB.CountComponents() dbSizeBytes := s.localDB.DBFileSizeBytes() return mariaDB.Exec(` INSERT INTO qt_client_schema_state ( username, hostname, app_version, last_sync_at, last_sync_status, pending_changes_count, pending_errors_count, configurations_count, projects_count, estimate_pricelist_version, warehouse_pricelist_version, competitor_pricelist_version, last_sync_error_code, last_sync_error_text, local_pricelist_count, pricelist_items_count, components_count, db_size_bytes, last_checked_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE app_version = VALUES(app_version), last_sync_at = VALUES(last_sync_at), last_sync_status = VALUES(last_sync_status), pending_changes_count = VALUES(pending_changes_count), pending_errors_count = VALUES(pending_errors_count), configurations_count = VALUES(configurations_count), projects_count = VALUES(projects_count), estimate_pricelist_version = VALUES(estimate_pricelist_version), warehouse_pricelist_version = VALUES(warehouse_pricelist_version), competitor_pricelist_version = VALUES(competitor_pricelist_version), last_sync_error_code = VALUES(last_sync_error_code), last_sync_error_text = VALUES(last_sync_error_text), local_pricelist_count = VALUES(local_pricelist_count), pricelist_items_count = VALUES(pricelist_items_count), components_count = VALUES(components_count), db_size_bytes = VALUES(db_size_bytes), last_checked_at = VALUES(last_checked_at), updated_at = VALUES(updated_at) `, username, hostname, appmeta.Version(), lastSyncAt, lastSyncStatus, pendingChangesCount, pendingErrorsCount, configurationsCount, projectsCount, estimateVersion, warehouseVersion, competitorVersion, lastSyncErrorCode, lastSyncErrorText, localPricelistCount, pricelistItemsCount, componentsCount, dbSizeBytes, checkedAt, checkedAt).Error } func isDuplicatePrimaryKeyDefinition(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "multiple primary key defined") || strings.Contains(msg, "duplicate key name 'primary'") || strings.Contains(msg, "duplicate entry") } func latestPricelistVersion(local *localdb.LocalDB, source string) *string { if local == nil { return nil } pl, err := local.GetLatestLocalPricelistBySource(source) if err != nil || pl == nil { return nil } version := strings.TrimSpace(pl.Version) if version == "" { return nil } return &version } func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) { if local == nil { return nil, nil } if guard, err := local.GetSyncGuardState(); err == nil && guard != nil && strings.EqualFold(guard.Status, ReadinessBlocked) { return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText)) } var errored []localdb.PendingChange if err := local.DB(). Where("TRIM(COALESCE(last_error, '')) <> ''"). Order("id DESC"). Limit(20). Find(&errored).Error; err != nil || len(errored) == 0 { return nil, nil } type errorEntry struct { Type string `json:"type"` UUID string `json:"uuid"` Op string `json:"op"` Attempts int `json:"attempts"` Error string `json:"error"` } entries := make([]errorEntry, 0, len(errored)) for _, ch := range errored { entries = append(entries, errorEntry{ Type: ch.EntityType, UUID: ch.EntityUUID, Op: ch.Operation, Attempts: ch.Attempts, Error: strings.TrimSpace(ch.LastError), }) } detail, jsonErr := json.Marshal(entries) if jsonErr != nil { return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(errored[0].LastError)) } return optionalString("PENDING_CHANGE_ERROR"), optionalString(string(detail)) } func optionalString(value string) *string { if strings.TrimSpace(value) == "" { return nil } v := strings.TrimSpace(value) return &v } func toReadinessFromState(state *localdb.LocalSyncGuardState) *SyncReadiness { if state == nil { return nil } blocked := state.Status == ReadinessBlocked return &SyncReadiness{ Status: state.Status, Blocked: blocked, ReasonCode: state.ReasonCode, ReasonText: state.ReasonText, RequiredMinAppVersion: state.RequiredMinAppVersion, LastCheckedAt: state.LastCheckedAt, } }