Renamed module path git.mchus.pro/mchus/quoteforge → git.mchus.pro/mchus/priceforge, renamed package quoteforge → priceforge, moved binary from cmd/qfs to cmd/pfs.
390 lines
11 KiB
Go
390 lines
11 KiB
Go
package sync
|
||
|
||
import (
|
||
"bufio"
|
||
"crypto/sha256"
|
||
"encoding/hex"
|
||
"errors"
|
||
"fmt"
|
||
"log/slog"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"git.mchus.pro/mchus/priceforge/internal/appmeta"
|
||
"git.mchus.pro/mchus/priceforge/internal/localdb"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
const (
|
||
ReadinessReady = "ready"
|
||
ReadinessBlocked = "blocked"
|
||
ReadinessUnknown = "unknown"
|
||
)
|
||
|
||
var ErrSyncBlockedByReadiness = errors.New("sync blocked by readiness guard")
|
||
|
||
type SyncReadiness struct {
|
||
Status string `json:"status"`
|
||
Blocked bool `json:"blocked"`
|
||
ReasonCode string `json:"reason_code,omitempty"`
|
||
ReasonText string `json:"reason_text,omitempty"`
|
||
RequiredMinAppVersion *string `json:"required_min_app_version,omitempty"`
|
||
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
|
||
}
|
||
|
||
type SyncBlockedError struct {
|
||
Readiness SyncReadiness
|
||
}
|
||
|
||
func (e *SyncBlockedError) Error() string {
|
||
if e == nil {
|
||
return ErrSyncBlockedByReadiness.Error()
|
||
}
|
||
if strings.TrimSpace(e.Readiness.ReasonText) != "" {
|
||
return e.Readiness.ReasonText
|
||
}
|
||
return ErrSyncBlockedByReadiness.Error()
|
||
}
|
||
|
||
func (s *Service) EnsureReadinessForSync() (*SyncReadiness, error) {
|
||
readiness, err := s.GetReadiness()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if readiness.Blocked {
|
||
return readiness, &SyncBlockedError{Readiness: *readiness}
|
||
}
|
||
return readiness, nil
|
||
}
|
||
|
||
func (s *Service) GetReadiness() (*SyncReadiness, error) {
|
||
now := time.Now().UTC()
|
||
if !s.isOnline() {
|
||
return s.blockedReadiness(
|
||
now,
|
||
"OFFLINE_UNVERIFIED_SCHEMA",
|
||
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
|
||
nil,
|
||
)
|
||
}
|
||
|
||
mariaDB, err := s.getDB()
|
||
if err != nil || mariaDB == nil {
|
||
return s.blockedReadiness(
|
||
now,
|
||
"OFFLINE_UNVERIFIED_SCHEMA",
|
||
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
|
||
nil,
|
||
)
|
||
}
|
||
|
||
migrations, err := listActiveClientMigrations(mariaDB)
|
||
if err != nil {
|
||
return s.blockedReadiness(
|
||
now,
|
||
"REMOTE_MIGRATION_REGISTRY_UNAVAILABLE",
|
||
"Синхронизация заблокирована: не удалось проверить централизованные миграции локальной БД.",
|
||
nil,
|
||
)
|
||
}
|
||
|
||
for i := range migrations {
|
||
m := migrations[i]
|
||
if strings.TrimSpace(m.MinAppVersion) != "" {
|
||
if compareVersions(appmeta.Version(), m.MinAppVersion) < 0 {
|
||
min := m.MinAppVersion
|
||
return s.blockedReadiness(
|
||
now,
|
||
"MIN_APP_VERSION_REQUIRED",
|
||
fmt.Sprintf("Требуется обновление приложения до версии %s для безопасной синхронизации.", m.MinAppVersion),
|
||
&min,
|
||
)
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := s.applyMissingRemoteMigrations(migrations); err != nil {
|
||
if strings.Contains(strings.ToLower(err.Error()), "checksum") {
|
||
return s.blockedReadiness(
|
||
now,
|
||
"REMOTE_MIGRATION_CHECKSUM_MISMATCH",
|
||
"Синхронизация заблокирована: контрольная сумма миграции не совпадает.",
|
||
nil,
|
||
)
|
||
}
|
||
return s.blockedReadiness(
|
||
now,
|
||
"LOCAL_MIGRATION_APPLY_FAILED",
|
||
"Синхронизация заблокирована: не удалось применить миграции локальной БД.",
|
||
nil,
|
||
)
|
||
}
|
||
|
||
if err := s.reportClientSchemaState(mariaDB, now); err != nil {
|
||
slog.Warn("failed to report client schema state", "error", err)
|
||
}
|
||
|
||
ready := &SyncReadiness{Status: ReadinessReady, Blocked: false, LastCheckedAt: &now}
|
||
if setErr := s.localDB.SetSyncGuardState(ReadinessReady, "", "", nil, &now); setErr != nil {
|
||
slog.Warn("failed to persist sync guard state", "error", setErr)
|
||
}
|
||
return ready, nil
|
||
}
|
||
|
||
func (s *Service) blockedReadiness(now time.Time, code, text string, minVersion *string) (*SyncReadiness, error) {
|
||
readiness := &SyncReadiness{
|
||
Status: ReadinessBlocked,
|
||
Blocked: true,
|
||
ReasonCode: code,
|
||
ReasonText: text,
|
||
RequiredMinAppVersion: minVersion,
|
||
LastCheckedAt: &now,
|
||
}
|
||
if err := s.localDB.SetSyncGuardState(ReadinessBlocked, code, text, minVersion, &now); err != nil {
|
||
slog.Warn("failed to persist blocked sync guard state", "error", err)
|
||
}
|
||
return readiness, nil
|
||
}
|
||
|
||
func (s *Service) isOnline() bool {
|
||
if s.directDB != nil {
|
||
return true
|
||
}
|
||
if s.connMgr == nil {
|
||
return false
|
||
}
|
||
return s.connMgr.IsOnline()
|
||
}
|
||
|
||
type clientLocalMigration struct {
|
||
ID string `gorm:"column:id"`
|
||
Name string `gorm:"column:name"`
|
||
SQLText string `gorm:"column:sql_text"`
|
||
Checksum string `gorm:"column:checksum"`
|
||
MinAppVersion string `gorm:"column:min_app_version"`
|
||
OrderNo int `gorm:"column:order_no"`
|
||
CreatedAt time.Time `gorm:"column:created_at"`
|
||
}
|
||
|
||
func listActiveClientMigrations(db *gorm.DB) ([]clientLocalMigration, error) {
|
||
if strings.EqualFold(db.Dialector.Name(), "sqlite") {
|
||
return []clientLocalMigration{}, nil
|
||
}
|
||
if err := ensureClientMigrationRegistryTable(db); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
rows := make([]clientLocalMigration, 0)
|
||
if err := db.Raw(`
|
||
SELECT id, name, sql_text, checksum, COALESCE(min_app_version, '') AS min_app_version, order_no, created_at
|
||
FROM qt_client_local_migrations
|
||
WHERE is_active = 1
|
||
ORDER BY order_no ASC, created_at ASC, id ASC
|
||
`).Scan(&rows).Error; err != nil {
|
||
return nil, fmt.Errorf("load client local migrations: %w", err)
|
||
}
|
||
|
||
return rows, nil
|
||
}
|
||
|
||
func ensureClientMigrationRegistryTable(db *gorm.DB) error {
|
||
if err := db.Exec(`
|
||
CREATE TABLE IF NOT EXISTS qt_client_local_migrations (
|
||
id VARCHAR(128) NOT NULL,
|
||
name VARCHAR(255) NOT NULL,
|
||
sql_text LONGTEXT NOT NULL,
|
||
checksum VARCHAR(128) NOT NULL,
|
||
min_app_version VARCHAR(64) NULL,
|
||
order_no INT NOT NULL DEFAULT 0,
|
||
is_active TINYINT(1) NOT NULL DEFAULT 1,
|
||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||
PRIMARY KEY (id),
|
||
INDEX idx_qt_client_local_migrations_active_order (is_active, order_no, created_at)
|
||
)
|
||
`).Error; err != nil {
|
||
return err
|
||
}
|
||
return db.Exec(`
|
||
CREATE TABLE IF NOT EXISTS qt_client_schema_state (
|
||
username VARCHAR(100) NOT NULL,
|
||
last_applied_migration_id VARCHAR(128) NULL,
|
||
app_version VARCHAR(64) NULL,
|
||
last_checked_at DATETIME NOT NULL,
|
||
updated_at DATETIME NOT NULL,
|
||
PRIMARY KEY (username),
|
||
INDEX idx_qt_client_schema_state_checked (last_checked_at)
|
||
)
|
||
`).Error
|
||
}
|
||
|
||
func (s *Service) applyMissingRemoteMigrations(migrations []clientLocalMigration) error {
|
||
for i := range migrations {
|
||
m := migrations[i]
|
||
computedChecksum := digestSQL(m.SQLText)
|
||
checksum := strings.TrimSpace(m.Checksum)
|
||
if checksum == "" {
|
||
checksum = computedChecksum
|
||
} else if !strings.EqualFold(checksum, computedChecksum) {
|
||
return fmt.Errorf("checksum mismatch for migration %s", m.ID)
|
||
}
|
||
|
||
applied, err := s.localDB.GetRemoteMigrationApplied(m.ID)
|
||
if err == nil {
|
||
if strings.TrimSpace(applied.Checksum) != checksum {
|
||
return fmt.Errorf("checksum mismatch for migration %s", m.ID)
|
||
}
|
||
continue
|
||
}
|
||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return fmt.Errorf("check local applied migration %s: %w", m.ID, err)
|
||
}
|
||
|
||
if strings.TrimSpace(m.SQLText) == "" {
|
||
if err := s.localDB.UpsertRemoteMigrationApplied(m.ID, checksum, appmeta.Version(), time.Now().UTC()); err != nil {
|
||
return fmt.Errorf("mark empty migration %s as applied: %w", m.ID, err)
|
||
}
|
||
continue
|
||
}
|
||
|
||
statements := splitSQLStatementsLite(m.SQLText)
|
||
if err := s.localDB.DB().Transaction(func(tx *gorm.DB) error {
|
||
for _, stmt := range statements {
|
||
if err := tx.Exec(stmt).Error; err != nil {
|
||
return fmt.Errorf("apply migration %s statement %q: %w", m.ID, stmt, err)
|
||
}
|
||
}
|
||
return nil
|
||
}); err != nil {
|
||
return err
|
||
}
|
||
|
||
if err := s.localDB.UpsertRemoteMigrationApplied(m.ID, checksum, appmeta.Version(), time.Now().UTC()); err != nil {
|
||
return fmt.Errorf("record applied migration %s: %w", m.ID, err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func splitSQLStatementsLite(script string) []string {
|
||
scanner := bufio.NewScanner(strings.NewReader(script))
|
||
scanner.Buffer(make([]byte, 1024), 1024*1024)
|
||
|
||
lines := make([]string, 0, 64)
|
||
for scanner.Scan() {
|
||
line := strings.TrimSpace(scanner.Text())
|
||
if line == "" || strings.HasPrefix(line, "--") {
|
||
continue
|
||
}
|
||
lines = append(lines, scanner.Text())
|
||
}
|
||
combined := strings.Join(lines, "\n")
|
||
raw := strings.Split(combined, ";")
|
||
stmts := make([]string, 0, len(raw))
|
||
for _, stmt := range raw {
|
||
trimmed := strings.TrimSpace(stmt)
|
||
if trimmed == "" {
|
||
continue
|
||
}
|
||
stmts = append(stmts, trimmed)
|
||
}
|
||
return stmts
|
||
}
|
||
|
||
func digestSQL(sqlText string) string {
|
||
hash := sha256.Sum256([]byte(sqlText))
|
||
return hex.EncodeToString(hash[:])
|
||
}
|
||
|
||
func compareVersions(left, right string) int {
|
||
leftParts := normalizeVersionParts(left)
|
||
rightParts := normalizeVersionParts(right)
|
||
maxLen := len(leftParts)
|
||
if len(rightParts) > maxLen {
|
||
maxLen = len(rightParts)
|
||
}
|
||
for i := 0; i < maxLen; i++ {
|
||
lv := 0
|
||
rv := 0
|
||
if i < len(leftParts) {
|
||
lv = leftParts[i]
|
||
}
|
||
if i < len(rightParts) {
|
||
rv = rightParts[i]
|
||
}
|
||
if lv < rv {
|
||
return -1
|
||
}
|
||
if lv > rv {
|
||
return 1
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
|
||
func (s *Service) reportClientSchemaState(mariaDB *gorm.DB, checkedAt time.Time) error {
|
||
if strings.EqualFold(mariaDB.Dialector.Name(), "sqlite") {
|
||
return nil
|
||
}
|
||
username := strings.TrimSpace(s.localDB.GetDBUser())
|
||
if username == "" {
|
||
return nil
|
||
}
|
||
lastMigrationID := ""
|
||
if id, err := s.localDB.GetLatestAppliedRemoteMigrationID(); err == nil {
|
||
lastMigrationID = id
|
||
}
|
||
return mariaDB.Exec(`
|
||
INSERT INTO qt_client_schema_state (username, last_applied_migration_id, app_version, last_checked_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
ON DUPLICATE KEY UPDATE
|
||
last_applied_migration_id = VALUES(last_applied_migration_id),
|
||
app_version = VALUES(app_version),
|
||
last_checked_at = VALUES(last_checked_at),
|
||
updated_at = VALUES(updated_at)
|
||
`, username, lastMigrationID, appmeta.Version(), checkedAt, checkedAt).Error
|
||
}
|
||
|
||
func normalizeVersionParts(v string) []int {
|
||
trimmed := strings.TrimSpace(v)
|
||
trimmed = strings.TrimPrefix(trimmed, "v")
|
||
chunks := strings.Split(trimmed, ".")
|
||
parts := make([]int, 0, len(chunks))
|
||
for _, chunk := range chunks {
|
||
clean := strings.TrimSpace(chunk)
|
||
if clean == "" {
|
||
parts = append(parts, 0)
|
||
continue
|
||
}
|
||
n := 0
|
||
for i := 0; i < len(clean); i++ {
|
||
if clean[i] < '0' || clean[i] > '9' {
|
||
clean = clean[:i]
|
||
break
|
||
}
|
||
}
|
||
if clean != "" {
|
||
if parsed, err := strconv.Atoi(clean); err == nil {
|
||
n = parsed
|
||
}
|
||
}
|
||
parts = append(parts, n)
|
||
}
|
||
return parts
|
||
}
|
||
|
||
func toReadinessFromState(state *localdb.LocalSyncGuardState) *SyncReadiness {
|
||
if state == nil {
|
||
return nil
|
||
}
|
||
blocked := state.Status == ReadinessBlocked
|
||
return &SyncReadiness{
|
||
Status: state.Status,
|
||
Blocked: blocked,
|
||
ReasonCode: state.ReasonCode,
|
||
ReasonText: state.ReasonText,
|
||
RequiredMinAppVersion: state.RequiredMinAppVersion,
|
||
LastCheckedAt: state.LastCheckedAt,
|
||
}
|
||
}
|