Files
QuoteForge/internal/services/sync/readiness.go
Michael Chus 0072f2a15f fix: ALTER spam в логах — DDL на qt_client_schema_state только при нужде
Раньше ensureClientSchemaStateTable запускался на каждом цикле синка
(каждые 5 минут) и пытался ALTER TABLE, даже если все колонки уже были.
Для пользователей без DDL-прав это давало WARN-спам в каждом цикле.

Два изменения:
- schemaOnce (sync.Once) на Service: ensureClientSchemaStateTable
  вызывается не более одного раза за жизнь процесса
- columnExists() проверяет information_schema.COLUMNS перед каждым
  ALTER — если колонка уже есть, ALTER пропускается без ошибки

Если таблица уже мигрирована сервером, клиент молча пропускает все DDL.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-02 13:02:40 +03:00

355 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"errors"
"fmt"
"log/slog"
"os"
"strings"
"time"
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
"git.mchus.pro/mchus/quoteforge/internal/localdb"
"gorm.io/gorm"
)
const (
ReadinessReady = "ready"
ReadinessBlocked = "blocked"
ReadinessUnknown = "unknown"
)
var ErrSyncBlockedByReadiness = errors.New("sync blocked by readiness guard")
type SyncReadiness struct {
Status string `json:"status"`
Blocked bool `json:"blocked"`
ReasonCode string `json:"reason_code,omitempty"`
ReasonText string `json:"reason_text,omitempty"`
RequiredMinAppVersion *string `json:"required_min_app_version,omitempty"`
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
}
type SyncBlockedError struct {
Readiness SyncReadiness
}
func (e *SyncBlockedError) Error() string {
if e == nil {
return ErrSyncBlockedByReadiness.Error()
}
if strings.TrimSpace(e.Readiness.ReasonText) != "" {
return e.Readiness.ReasonText
}
return ErrSyncBlockedByReadiness.Error()
}
func (s *Service) EnsureReadinessForSync() (*SyncReadiness, error) {
readiness, err := s.GetReadiness()
if err != nil {
return nil, err
}
if readiness.Blocked {
return readiness, &SyncBlockedError{Readiness: *readiness}
}
return readiness, nil
}
func (s *Service) GetReadiness() (*SyncReadiness, error) {
now := time.Now().UTC()
if !s.isOnline() {
return s.blockedReadiness(
now,
"OFFLINE_UNVERIFIED_SCHEMA",
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
nil,
)
}
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return s.blockedReadiness(
now,
"OFFLINE_UNVERIFIED_SCHEMA",
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
nil,
)
}
s.schemaOnce.Do(func() {
if err := ensureClientSchemaStateTable(mariaDB); err != nil {
slog.Warn("qt_client_schema_state migration skipped (no DDL rights — run server migrate)", "error", err)
}
})
if err := s.reportClientSchemaState(mariaDB, now); err != nil {
slog.Warn("failed to report client schema state", "error", err)
}
ready := &SyncReadiness{Status: ReadinessReady, Blocked: false, LastCheckedAt: &now}
if setErr := s.localDB.SetSyncGuardState(ReadinessReady, "", "", nil, &now); setErr != nil {
slog.Warn("failed to persist sync guard state", "error", setErr)
}
return ready, nil
}
func (s *Service) blockedReadiness(now time.Time, code, text string, minVersion *string) (*SyncReadiness, error) {
readiness := &SyncReadiness{
Status: ReadinessBlocked,
Blocked: true,
ReasonCode: code,
ReasonText: text,
RequiredMinAppVersion: minVersion,
LastCheckedAt: &now,
}
if err := s.localDB.SetSyncGuardState(ReadinessBlocked, code, text, minVersion, &now); err != nil {
slog.Warn("failed to persist blocked sync guard state", "error", err)
}
return readiness, nil
}
func (s *Service) isOnline() bool {
if s.directDB != nil {
return true
}
if s.connMgr == nil {
return false
}
return s.connMgr.IsOnline()
}
func ensureClientSchemaStateTable(db *gorm.DB) error {
if !tableExists(db, "qt_client_schema_state") {
if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_client_schema_state (
username VARCHAR(100) NOT NULL,
hostname VARCHAR(255) NOT NULL DEFAULT '',
app_version VARCHAR(64) NULL,
last_sync_at DATETIME NULL,
last_sync_status VARCHAR(32) NULL,
pending_changes_count INT NOT NULL DEFAULT 0,
pending_errors_count INT NOT NULL DEFAULT 0,
configurations_count INT NOT NULL DEFAULT 0,
projects_count INT NOT NULL DEFAULT 0,
estimate_pricelist_version VARCHAR(128) NULL,
warehouse_pricelist_version VARCHAR(128) NULL,
competitor_pricelist_version VARCHAR(128) NULL,
last_sync_error_code VARCHAR(128) NULL,
last_sync_error_text TEXT NULL,
last_checked_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
PRIMARY KEY (username, hostname),
INDEX idx_qt_client_schema_state_checked (last_checked_at)
)
`).Error; err != nil {
return fmt.Errorf("create qt_client_schema_state table: %w", err)
}
}
if tableExists(db, "qt_client_schema_state") {
// Each ALTER is guarded by a column existence check so users without DDL
// rights don't get a permission error on every sync cycle — the server
// migration tool is the authoritative path for schema changes.
if !columnExists(db, "qt_client_schema_state", "hostname") {
if err := db.Exec(`
ALTER TABLE qt_client_schema_state
ADD COLUMN IF NOT EXISTS hostname VARCHAR(255) NOT NULL DEFAULT '' AFTER username
`).Error; err != nil {
return fmt.Errorf("add qt_client_schema_state.hostname: %w", err)
}
if err := db.Exec(`
ALTER TABLE qt_client_schema_state
DROP PRIMARY KEY,
ADD PRIMARY KEY (username, hostname)
`).Error; err != nil && !isDuplicatePrimaryKeyDefinition(err) {
return fmt.Errorf("set qt_client_schema_state primary key: %w", err)
}
}
type colMigration struct {
column string
stmt string
}
migrations := []colMigration{
{"last_sync_at", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_at DATETIME NULL AFTER app_version"},
{"last_sync_status", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_status VARCHAR(32) NULL AFTER last_sync_at"},
{"pending_changes_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_changes_count INT NOT NULL DEFAULT 0 AFTER last_sync_status"},
{"pending_errors_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_errors_count INT NOT NULL DEFAULT 0 AFTER pending_changes_count"},
{"configurations_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS configurations_count INT NOT NULL DEFAULT 0 AFTER pending_errors_count"},
{"projects_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS projects_count INT NOT NULL DEFAULT 0 AFTER configurations_count"},
{"estimate_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS estimate_pricelist_version VARCHAR(128) NULL AFTER projects_count"},
{"warehouse_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS warehouse_pricelist_version VARCHAR(128) NULL AFTER estimate_pricelist_version"},
{"competitor_pricelist_version", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS competitor_pricelist_version VARCHAR(128) NULL AFTER warehouse_pricelist_version"},
{"last_sync_error_code", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_code VARCHAR(128) NULL AFTER competitor_pricelist_version"},
{"last_sync_error_text", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_text TEXT NULL AFTER last_sync_error_code"},
{"local_pricelist_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS local_pricelist_count INT NOT NULL DEFAULT 0 AFTER last_sync_error_text"},
{"pricelist_items_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pricelist_items_count INT NOT NULL DEFAULT 0 AFTER local_pricelist_count"},
{"components_count", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS components_count INT NOT NULL DEFAULT 0 AFTER pricelist_items_count"},
{"db_size_bytes", "ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS db_size_bytes BIGINT NOT NULL DEFAULT 0 AFTER components_count"},
}
for _, m := range migrations {
if columnExists(db, "qt_client_schema_state", m.column) {
continue
}
if err := db.Exec(m.stmt).Error; err != nil {
return fmt.Errorf("expand qt_client_schema_state: %w", err)
}
}
}
return nil
}
func columnExists(db *gorm.DB, tableName, columnName string) bool {
var count int64
if err := db.Raw(`
SELECT COUNT(*) FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ? AND COLUMN_NAME = ?
`, tableName, columnName).Scan(&count).Error; err != nil {
return false
}
return count > 0
}
func tableExists(db *gorm.DB, tableName string) bool {
var count int64
// For MariaDB/MySQL, check information_schema
if err := db.Raw(`
SELECT COUNT(*) FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ?
`, tableName).Scan(&count).Error; err != nil {
return false
}
return count > 0
}
func (s *Service) reportClientSchemaState(mariaDB *gorm.DB, checkedAt time.Time) error {
if strings.EqualFold(mariaDB.Dialector.Name(), "sqlite") {
return nil
}
username := strings.TrimSpace(s.localDB.GetDBUser())
if username == "" {
return nil
}
hostname, err := os.Hostname()
if err != nil {
hostname = ""
}
hostname = strings.TrimSpace(hostname)
lastSyncAt := s.localDB.GetLastSyncTime()
lastSyncStatus := ReadinessReady
pendingChangesCount := s.localDB.CountPendingChanges()
pendingErrorsCount := s.localDB.CountErroredChanges()
configurationsCount := s.localDB.CountConfigurations()
projectsCount := s.localDB.CountProjects()
estimateVersion := latestPricelistVersion(s.localDB, "estimate")
warehouseVersion := latestPricelistVersion(s.localDB, "warehouse")
competitorVersion := latestPricelistVersion(s.localDB, "competitor")
lastSyncErrorCode, lastSyncErrorText := latestSyncErrorState(s.localDB)
localPricelistCount := s.localDB.CountLocalPricelists()
pricelistItemsCount := s.localDB.CountAllPricelistItems()
componentsCount := s.localDB.CountComponents()
dbSizeBytes := s.localDB.DBFileSizeBytes()
return mariaDB.Exec(`
INSERT INTO qt_client_schema_state (
username, hostname, app_version,
last_sync_at, last_sync_status, pending_changes_count, pending_errors_count,
configurations_count, projects_count,
estimate_pricelist_version, warehouse_pricelist_version, competitor_pricelist_version,
last_sync_error_code, last_sync_error_text,
local_pricelist_count, pricelist_items_count, components_count, db_size_bytes,
last_checked_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
app_version = VALUES(app_version),
last_sync_at = VALUES(last_sync_at),
last_sync_status = VALUES(last_sync_status),
pending_changes_count = VALUES(pending_changes_count),
pending_errors_count = VALUES(pending_errors_count),
configurations_count = VALUES(configurations_count),
projects_count = VALUES(projects_count),
estimate_pricelist_version = VALUES(estimate_pricelist_version),
warehouse_pricelist_version = VALUES(warehouse_pricelist_version),
competitor_pricelist_version = VALUES(competitor_pricelist_version),
last_sync_error_code = VALUES(last_sync_error_code),
last_sync_error_text = VALUES(last_sync_error_text),
local_pricelist_count = VALUES(local_pricelist_count),
pricelist_items_count = VALUES(pricelist_items_count),
components_count = VALUES(components_count),
db_size_bytes = VALUES(db_size_bytes),
last_checked_at = VALUES(last_checked_at),
updated_at = VALUES(updated_at)
`, username, hostname, appmeta.Version(),
lastSyncAt, lastSyncStatus, pendingChangesCount, pendingErrorsCount,
configurationsCount, projectsCount,
estimateVersion, warehouseVersion, competitorVersion,
lastSyncErrorCode, lastSyncErrorText,
localPricelistCount, pricelistItemsCount, componentsCount, dbSizeBytes,
checkedAt, checkedAt).Error
}
func isDuplicatePrimaryKeyDefinition(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "multiple primary key defined") ||
strings.Contains(msg, "duplicate key name 'primary'") ||
strings.Contains(msg, "duplicate entry")
}
func latestPricelistVersion(local *localdb.LocalDB, source string) *string {
if local == nil {
return nil
}
pl, err := local.GetLatestLocalPricelistBySource(source)
if err != nil || pl == nil {
return nil
}
version := strings.TrimSpace(pl.Version)
if version == "" {
return nil
}
return &version
}
func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) {
if local == nil {
return nil, nil
}
if guard, err := local.GetSyncGuardState(); err == nil && guard != nil && strings.EqualFold(guard.Status, ReadinessBlocked) {
return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText))
}
var pending localdb.PendingChange
if err := local.DB().
Where("TRIM(COALESCE(last_error, '')) <> ''").
Order("id DESC").
First(&pending).Error; err == nil {
return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(pending.LastError))
}
return nil, nil
}
func optionalString(value string) *string {
if strings.TrimSpace(value) == "" {
return nil
}
v := strings.TrimSpace(value)
return &v
}
func toReadinessFromState(state *localdb.LocalSyncGuardState) *SyncReadiness {
if state == nil {
return nil
}
blocked := state.Status == ReadinessBlocked
return &SyncReadiness{
Status: state.Status,
Blocked: blocked,
ReasonCode: state.ReasonCode,
ReasonText: state.ReasonText,
RequiredMinAppVersion: state.RequiredMinAppVersion,
LastCheckedAt: state.LastCheckedAt,
}
}