Files
QuoteForge/internal/services/sync/readiness.go
2026-03-07 21:03:40 +03:00

548 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"bufio"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"time"
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
"git.mchus.pro/mchus/quoteforge/internal/localdb"
"gorm.io/gorm"
)
const (
ReadinessReady = "ready"
ReadinessBlocked = "blocked"
ReadinessUnknown = "unknown"
)
var ErrSyncBlockedByReadiness = errors.New("sync blocked by readiness guard")
type SyncReadiness struct {
Status string `json:"status"`
Blocked bool `json:"blocked"`
ReasonCode string `json:"reason_code,omitempty"`
ReasonText string `json:"reason_text,omitempty"`
RequiredMinAppVersion *string `json:"required_min_app_version,omitempty"`
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
}
type SyncBlockedError struct {
Readiness SyncReadiness
}
func (e *SyncBlockedError) Error() string {
if e == nil {
return ErrSyncBlockedByReadiness.Error()
}
if strings.TrimSpace(e.Readiness.ReasonText) != "" {
return e.Readiness.ReasonText
}
return ErrSyncBlockedByReadiness.Error()
}
func (s *Service) EnsureReadinessForSync() (*SyncReadiness, error) {
readiness, err := s.GetReadiness()
if err != nil {
return nil, err
}
if readiness.Blocked {
return readiness, &SyncBlockedError{Readiness: *readiness}
}
return readiness, nil
}
func (s *Service) GetReadiness() (*SyncReadiness, error) {
now := time.Now().UTC()
if !s.isOnline() {
return s.blockedReadiness(
now,
"OFFLINE_UNVERIFIED_SCHEMA",
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
nil,
)
}
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return s.blockedReadiness(
now,
"OFFLINE_UNVERIFIED_SCHEMA",
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
nil,
)
}
migrations, err := listActiveClientMigrations(mariaDB)
if err != nil {
return s.blockedReadiness(
now,
"REMOTE_MIGRATION_REGISTRY_UNAVAILABLE",
"Синхронизация заблокирована: не удалось проверить централизованные миграции локальной БД.",
nil,
)
}
for i := range migrations {
m := migrations[i]
if strings.TrimSpace(m.MinAppVersion) != "" {
if compareVersions(appmeta.Version(), m.MinAppVersion) < 0 {
min := m.MinAppVersion
return s.blockedReadiness(
now,
"MIN_APP_VERSION_REQUIRED",
fmt.Sprintf("Требуется обновление приложения до версии %s для безопасной синхронизации.", m.MinAppVersion),
&min,
)
}
}
}
if err := s.applyMissingRemoteMigrations(migrations); err != nil {
if strings.Contains(strings.ToLower(err.Error()), "checksum") {
return s.blockedReadiness(
now,
"REMOTE_MIGRATION_CHECKSUM_MISMATCH",
"Синхронизация заблокирована: контрольная сумма миграции не совпадает.",
nil,
)
}
return s.blockedReadiness(
now,
"LOCAL_MIGRATION_APPLY_FAILED",
"Синхронизация заблокирована: не удалось применить миграции локальной БД.",
nil,
)
}
if err := s.reportClientSchemaState(mariaDB, now); err != nil {
slog.Warn("failed to report client schema state", "error", err)
}
ready := &SyncReadiness{Status: ReadinessReady, Blocked: false, LastCheckedAt: &now}
if setErr := s.localDB.SetSyncGuardState(ReadinessReady, "", "", nil, &now); setErr != nil {
slog.Warn("failed to persist sync guard state", "error", setErr)
}
return ready, nil
}
func (s *Service) blockedReadiness(now time.Time, code, text string, minVersion *string) (*SyncReadiness, error) {
readiness := &SyncReadiness{
Status: ReadinessBlocked,
Blocked: true,
ReasonCode: code,
ReasonText: text,
RequiredMinAppVersion: minVersion,
LastCheckedAt: &now,
}
if err := s.localDB.SetSyncGuardState(ReadinessBlocked, code, text, minVersion, &now); err != nil {
slog.Warn("failed to persist blocked sync guard state", "error", err)
}
return readiness, nil
}
func (s *Service) isOnline() bool {
if s.directDB != nil {
return true
}
if s.connMgr == nil {
return false
}
return s.connMgr.IsOnline()
}
type clientLocalMigration struct {
ID string `gorm:"column:id"`
Name string `gorm:"column:name"`
SQLText string `gorm:"column:sql_text"`
Checksum string `gorm:"column:checksum"`
MinAppVersion string `gorm:"column:min_app_version"`
OrderNo int `gorm:"column:order_no"`
CreatedAt time.Time `gorm:"column:created_at"`
}
func listActiveClientMigrations(db *gorm.DB) ([]clientLocalMigration, error) {
if strings.EqualFold(db.Dialector.Name(), "sqlite") {
return []clientLocalMigration{}, nil
}
if err := ensureClientMigrationRegistryTable(db); err != nil {
return nil, err
}
rows := make([]clientLocalMigration, 0)
if err := db.Raw(`
SELECT id, name, sql_text, checksum, COALESCE(min_app_version, '') AS min_app_version, order_no, created_at
FROM qt_client_local_migrations
WHERE is_active = 1
ORDER BY order_no ASC, created_at ASC, id ASC
`).Scan(&rows).Error; err != nil {
return nil, fmt.Errorf("load client local migrations: %w", err)
}
return rows, nil
}
func ensureClientMigrationRegistryTable(db *gorm.DB) error {
// Check if table exists instead of trying to create (avoids permission issues)
if !tableExists(db, "qt_client_local_migrations") {
if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_client_local_migrations (
id VARCHAR(128) NOT NULL,
name VARCHAR(255) NOT NULL,
sql_text LONGTEXT NOT NULL,
checksum VARCHAR(128) NOT NULL,
min_app_version VARCHAR(64) NULL,
order_no INT NOT NULL DEFAULT 0,
is_active TINYINT(1) NOT NULL DEFAULT 1,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
INDEX idx_qt_client_local_migrations_active_order (is_active, order_no, created_at)
)
`).Error; err != nil {
return fmt.Errorf("create qt_client_local_migrations table: %w", err)
}
}
if !tableExists(db, "qt_client_schema_state") {
if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_client_schema_state (
username VARCHAR(100) NOT NULL,
hostname VARCHAR(255) NOT NULL DEFAULT '',
last_applied_migration_id VARCHAR(128) NULL,
app_version VARCHAR(64) NULL,
last_sync_at DATETIME NULL,
last_sync_status VARCHAR(32) NULL,
pending_changes_count INT NOT NULL DEFAULT 0,
pending_errors_count INT NOT NULL DEFAULT 0,
configurations_count INT NOT NULL DEFAULT 0,
projects_count INT NOT NULL DEFAULT 0,
estimate_pricelist_version VARCHAR(128) NULL,
warehouse_pricelist_version VARCHAR(128) NULL,
competitor_pricelist_version VARCHAR(128) NULL,
last_sync_error_code VARCHAR(128) NULL,
last_sync_error_text TEXT NULL,
last_checked_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
PRIMARY KEY (username, hostname),
INDEX idx_qt_client_schema_state_checked (last_checked_at)
)
`).Error; err != nil {
return fmt.Errorf("create qt_client_schema_state table: %w", err)
}
}
if tableExists(db, "qt_client_schema_state") {
if err := db.Exec(`
ALTER TABLE qt_client_schema_state
ADD COLUMN IF NOT EXISTS hostname VARCHAR(255) NOT NULL DEFAULT '' AFTER username
`).Error; err != nil {
return fmt.Errorf("add qt_client_schema_state.hostname: %w", err)
}
if err := db.Exec(`
ALTER TABLE qt_client_schema_state
DROP PRIMARY KEY,
ADD PRIMARY KEY (username, hostname)
`).Error; err != nil && !isDuplicatePrimaryKeyDefinition(err) {
return fmt.Errorf("set qt_client_schema_state primary key: %w", err)
}
for _, stmt := range []string{
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_at DATETIME NULL AFTER app_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_status VARCHAR(32) NULL AFTER last_sync_at",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_changes_count INT NOT NULL DEFAULT 0 AFTER last_sync_status",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_errors_count INT NOT NULL DEFAULT 0 AFTER pending_changes_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS configurations_count INT NOT NULL DEFAULT 0 AFTER pending_errors_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS projects_count INT NOT NULL DEFAULT 0 AFTER configurations_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS estimate_pricelist_version VARCHAR(128) NULL AFTER projects_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS warehouse_pricelist_version VARCHAR(128) NULL AFTER estimate_pricelist_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS competitor_pricelist_version VARCHAR(128) NULL AFTER warehouse_pricelist_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_code VARCHAR(128) NULL AFTER competitor_pricelist_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_text TEXT NULL AFTER last_sync_error_code",
} {
if err := db.Exec(stmt).Error; err != nil {
return fmt.Errorf("expand qt_client_schema_state: %w", err)
}
}
}
return nil
}
func tableExists(db *gorm.DB, tableName string) bool {
var count int64
// For MariaDB/MySQL, check information_schema
if err := db.Raw(`
SELECT COUNT(*) FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ?
`, tableName).Scan(&count).Error; err != nil {
return false
}
return count > 0
}
func (s *Service) applyMissingRemoteMigrations(migrations []clientLocalMigration) error {
for i := range migrations {
m := migrations[i]
computedChecksum := digestSQL(m.SQLText)
checksum := strings.TrimSpace(m.Checksum)
if checksum == "" {
checksum = computedChecksum
} else if !strings.EqualFold(checksum, computedChecksum) {
return fmt.Errorf("checksum mismatch for migration %s", m.ID)
}
applied, err := s.localDB.GetRemoteMigrationApplied(m.ID)
if err == nil {
if strings.TrimSpace(applied.Checksum) != checksum {
return fmt.Errorf("checksum mismatch for migration %s", m.ID)
}
continue
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("check local applied migration %s: %w", m.ID, err)
}
if strings.TrimSpace(m.SQLText) == "" {
if err := s.localDB.UpsertRemoteMigrationApplied(m.ID, checksum, appmeta.Version(), time.Now().UTC()); err != nil {
return fmt.Errorf("mark empty migration %s as applied: %w", m.ID, err)
}
continue
}
statements := splitSQLStatementsLite(m.SQLText)
if err := s.localDB.DB().Transaction(func(tx *gorm.DB) error {
for _, stmt := range statements {
if err := tx.Exec(stmt).Error; err != nil {
return fmt.Errorf("apply migration %s statement %q: %w", m.ID, stmt, err)
}
}
return nil
}); err != nil {
return err
}
if err := s.localDB.UpsertRemoteMigrationApplied(m.ID, checksum, appmeta.Version(), time.Now().UTC()); err != nil {
return fmt.Errorf("record applied migration %s: %w", m.ID, err)
}
}
return nil
}
func splitSQLStatementsLite(script string) []string {
scanner := bufio.NewScanner(strings.NewReader(script))
scanner.Buffer(make([]byte, 1024), 1024*1024)
lines := make([]string, 0, 64)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" || strings.HasPrefix(line, "--") {
continue
}
lines = append(lines, scanner.Text())
}
combined := strings.Join(lines, "\n")
raw := strings.Split(combined, ";")
stmts := make([]string, 0, len(raw))
for _, stmt := range raw {
trimmed := strings.TrimSpace(stmt)
if trimmed == "" {
continue
}
stmts = append(stmts, trimmed)
}
return stmts
}
func digestSQL(sqlText string) string {
hash := sha256.Sum256([]byte(sqlText))
return hex.EncodeToString(hash[:])
}
func compareVersions(left, right string) int {
leftParts := normalizeVersionParts(left)
rightParts := normalizeVersionParts(right)
maxLen := len(leftParts)
if len(rightParts) > maxLen {
maxLen = len(rightParts)
}
for i := 0; i < maxLen; i++ {
lv := 0
rv := 0
if i < len(leftParts) {
lv = leftParts[i]
}
if i < len(rightParts) {
rv = rightParts[i]
}
if lv < rv {
return -1
}
if lv > rv {
return 1
}
}
return 0
}
func (s *Service) reportClientSchemaState(mariaDB *gorm.DB, checkedAt time.Time) error {
if strings.EqualFold(mariaDB.Dialector.Name(), "sqlite") {
return nil
}
username := strings.TrimSpace(s.localDB.GetDBUser())
if username == "" {
return nil
}
hostname, err := os.Hostname()
if err != nil {
hostname = ""
}
hostname = strings.TrimSpace(hostname)
lastMigrationID := ""
if id, err := s.localDB.GetLatestAppliedRemoteMigrationID(); err == nil {
lastMigrationID = id
}
lastSyncAt := s.localDB.GetLastSyncTime()
lastSyncStatus := ReadinessReady
pendingChangesCount := s.localDB.CountPendingChanges()
pendingErrorsCount := s.localDB.CountErroredChanges()
configurationsCount := s.localDB.CountConfigurations()
projectsCount := s.localDB.CountProjects()
estimateVersion := latestPricelistVersion(s.localDB, "estimate")
warehouseVersion := latestPricelistVersion(s.localDB, "warehouse")
competitorVersion := latestPricelistVersion(s.localDB, "competitor")
lastSyncErrorCode, lastSyncErrorText := latestSyncErrorState(s.localDB)
return mariaDB.Exec(`
INSERT INTO qt_client_schema_state (
username, hostname, last_applied_migration_id, app_version,
last_sync_at, last_sync_status, pending_changes_count, pending_errors_count,
configurations_count, projects_count,
estimate_pricelist_version, warehouse_pricelist_version, competitor_pricelist_version,
last_sync_error_code, last_sync_error_text,
last_checked_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
last_applied_migration_id = VALUES(last_applied_migration_id),
app_version = VALUES(app_version),
last_sync_at = VALUES(last_sync_at),
last_sync_status = VALUES(last_sync_status),
pending_changes_count = VALUES(pending_changes_count),
pending_errors_count = VALUES(pending_errors_count),
configurations_count = VALUES(configurations_count),
projects_count = VALUES(projects_count),
estimate_pricelist_version = VALUES(estimate_pricelist_version),
warehouse_pricelist_version = VALUES(warehouse_pricelist_version),
competitor_pricelist_version = VALUES(competitor_pricelist_version),
last_sync_error_code = VALUES(last_sync_error_code),
last_sync_error_text = VALUES(last_sync_error_text),
last_checked_at = VALUES(last_checked_at),
updated_at = VALUES(updated_at)
`, username, hostname, lastMigrationID, appmeta.Version(),
lastSyncAt, lastSyncStatus, pendingChangesCount, pendingErrorsCount,
configurationsCount, projectsCount,
estimateVersion, warehouseVersion, competitorVersion,
lastSyncErrorCode, lastSyncErrorText,
checkedAt, checkedAt).Error
}
func isDuplicatePrimaryKeyDefinition(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "multiple primary key defined") ||
strings.Contains(msg, "duplicate key name 'primary'") ||
strings.Contains(msg, "duplicate entry")
}
func latestPricelistVersion(local *localdb.LocalDB, source string) *string {
if local == nil {
return nil
}
pl, err := local.GetLatestLocalPricelistBySource(source)
if err != nil || pl == nil {
return nil
}
version := strings.TrimSpace(pl.Version)
if version == "" {
return nil
}
return &version
}
func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) {
if local == nil {
return nil, nil
}
if guard, err := local.GetSyncGuardState(); err == nil && guard != nil && strings.EqualFold(guard.Status, ReadinessBlocked) {
return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText))
}
var pending localdb.PendingChange
if err := local.DB().
Where("TRIM(COALESCE(last_error, '')) <> ''").
Order("id DESC").
First(&pending).Error; err == nil {
return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(pending.LastError))
}
return nil, nil
}
func optionalString(value string) *string {
if strings.TrimSpace(value) == "" {
return nil
}
v := strings.TrimSpace(value)
return &v
}
func normalizeVersionParts(v string) []int {
trimmed := strings.TrimSpace(v)
trimmed = strings.TrimPrefix(trimmed, "v")
chunks := strings.Split(trimmed, ".")
parts := make([]int, 0, len(chunks))
for _, chunk := range chunks {
clean := strings.TrimSpace(chunk)
if clean == "" {
parts = append(parts, 0)
continue
}
n := 0
for i := 0; i < len(clean); i++ {
if clean[i] < '0' || clean[i] > '9' {
clean = clean[:i]
break
}
}
if clean != "" {
if parsed, err := strconv.Atoi(clean); err == nil {
n = parsed
}
}
parts = append(parts, n)
}
return parts
}
func toReadinessFromState(state *localdb.LocalSyncGuardState) *SyncReadiness {
if state == nil {
return nil
}
blocked := state.Status == ReadinessBlocked
return &SyncReadiness{
Status: state.Status,
Blocked: blocked,
ReasonCode: state.ReasonCode,
ReasonText: state.ReasonText,
RequiredMinAppVersion: state.RequiredMinAppVersion,
LastCheckedAt: state.LastCheckedAt,
}
}