Files
QuoteForge/internal/services/sync/readiness.go
2026-03-07 23:18:07 +03:00

316 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"errors"
"fmt"
"log/slog"
"os"
"strings"
"time"
"git.mchus.pro/mchus/quoteforge/internal/appmeta"
"git.mchus.pro/mchus/quoteforge/internal/localdb"
"gorm.io/gorm"
)
const (
ReadinessReady = "ready"
ReadinessBlocked = "blocked"
ReadinessUnknown = "unknown"
)
var ErrSyncBlockedByReadiness = errors.New("sync blocked by readiness guard")
type SyncReadiness struct {
Status string `json:"status"`
Blocked bool `json:"blocked"`
ReasonCode string `json:"reason_code,omitempty"`
ReasonText string `json:"reason_text,omitempty"`
RequiredMinAppVersion *string `json:"required_min_app_version,omitempty"`
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
}
type SyncBlockedError struct {
Readiness SyncReadiness
}
func (e *SyncBlockedError) Error() string {
if e == nil {
return ErrSyncBlockedByReadiness.Error()
}
if strings.TrimSpace(e.Readiness.ReasonText) != "" {
return e.Readiness.ReasonText
}
return ErrSyncBlockedByReadiness.Error()
}
func (s *Service) EnsureReadinessForSync() (*SyncReadiness, error) {
readiness, err := s.GetReadiness()
if err != nil {
return nil, err
}
if readiness.Blocked {
return readiness, &SyncBlockedError{Readiness: *readiness}
}
return readiness, nil
}
func (s *Service) GetReadiness() (*SyncReadiness, error) {
now := time.Now().UTC()
if !s.isOnline() {
return s.blockedReadiness(
now,
"OFFLINE_UNVERIFIED_SCHEMA",
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
nil,
)
}
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return s.blockedReadiness(
now,
"OFFLINE_UNVERIFIED_SCHEMA",
"Синхронизация недоступна: нет соединения с сервером и нельзя проверить миграции локальной БД.",
nil,
)
}
if err := s.reportClientSchemaState(mariaDB, now); err != nil {
slog.Warn("failed to report client schema state", "error", err)
}
ready := &SyncReadiness{Status: ReadinessReady, Blocked: false, LastCheckedAt: &now}
if setErr := s.localDB.SetSyncGuardState(ReadinessReady, "", "", nil, &now); setErr != nil {
slog.Warn("failed to persist sync guard state", "error", setErr)
}
return ready, nil
}
func (s *Service) blockedReadiness(now time.Time, code, text string, minVersion *string) (*SyncReadiness, error) {
readiness := &SyncReadiness{
Status: ReadinessBlocked,
Blocked: true,
ReasonCode: code,
ReasonText: text,
RequiredMinAppVersion: minVersion,
LastCheckedAt: &now,
}
if err := s.localDB.SetSyncGuardState(ReadinessBlocked, code, text, minVersion, &now); err != nil {
slog.Warn("failed to persist blocked sync guard state", "error", err)
}
return readiness, nil
}
func (s *Service) isOnline() bool {
if s.directDB != nil {
return true
}
if s.connMgr == nil {
return false
}
return s.connMgr.IsOnline()
}
func ensureClientSchemaStateTable(db *gorm.DB) error {
if !tableExists(db, "qt_client_schema_state") {
if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_client_schema_state (
username VARCHAR(100) NOT NULL,
hostname VARCHAR(255) NOT NULL DEFAULT '',
app_version VARCHAR(64) NULL,
last_sync_at DATETIME NULL,
last_sync_status VARCHAR(32) NULL,
pending_changes_count INT NOT NULL DEFAULT 0,
pending_errors_count INT NOT NULL DEFAULT 0,
configurations_count INT NOT NULL DEFAULT 0,
projects_count INT NOT NULL DEFAULT 0,
estimate_pricelist_version VARCHAR(128) NULL,
warehouse_pricelist_version VARCHAR(128) NULL,
competitor_pricelist_version VARCHAR(128) NULL,
last_sync_error_code VARCHAR(128) NULL,
last_sync_error_text TEXT NULL,
last_checked_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
PRIMARY KEY (username, hostname),
INDEX idx_qt_client_schema_state_checked (last_checked_at)
)
`).Error; err != nil {
return fmt.Errorf("create qt_client_schema_state table: %w", err)
}
}
if tableExists(db, "qt_client_schema_state") {
if err := db.Exec(`
ALTER TABLE qt_client_schema_state
ADD COLUMN IF NOT EXISTS hostname VARCHAR(255) NOT NULL DEFAULT '' AFTER username
`).Error; err != nil {
return fmt.Errorf("add qt_client_schema_state.hostname: %w", err)
}
if err := db.Exec(`
ALTER TABLE qt_client_schema_state
DROP PRIMARY KEY,
ADD PRIMARY KEY (username, hostname)
`).Error; err != nil && !isDuplicatePrimaryKeyDefinition(err) {
return fmt.Errorf("set qt_client_schema_state primary key: %w", err)
}
for _, stmt := range []string{
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_at DATETIME NULL AFTER app_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_status VARCHAR(32) NULL AFTER last_sync_at",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_changes_count INT NOT NULL DEFAULT 0 AFTER last_sync_status",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS pending_errors_count INT NOT NULL DEFAULT 0 AFTER pending_changes_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS configurations_count INT NOT NULL DEFAULT 0 AFTER pending_errors_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS projects_count INT NOT NULL DEFAULT 0 AFTER configurations_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS estimate_pricelist_version VARCHAR(128) NULL AFTER projects_count",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS warehouse_pricelist_version VARCHAR(128) NULL AFTER estimate_pricelist_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS competitor_pricelist_version VARCHAR(128) NULL AFTER warehouse_pricelist_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_code VARCHAR(128) NULL AFTER competitor_pricelist_version",
"ALTER TABLE qt_client_schema_state ADD COLUMN IF NOT EXISTS last_sync_error_text TEXT NULL AFTER last_sync_error_code",
} {
if err := db.Exec(stmt).Error; err != nil {
return fmt.Errorf("expand qt_client_schema_state: %w", err)
}
}
}
return nil
}
func tableExists(db *gorm.DB, tableName string) bool {
var count int64
// For MariaDB/MySQL, check information_schema
if err := db.Raw(`
SELECT COUNT(*) FROM information_schema.TABLES
WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = ?
`, tableName).Scan(&count).Error; err != nil {
return false
}
return count > 0
}
func (s *Service) reportClientSchemaState(mariaDB *gorm.DB, checkedAt time.Time) error {
if strings.EqualFold(mariaDB.Dialector.Name(), "sqlite") {
return nil
}
if err := ensureClientSchemaStateTable(mariaDB); err != nil {
return err
}
username := strings.TrimSpace(s.localDB.GetDBUser())
if username == "" {
return nil
}
hostname, err := os.Hostname()
if err != nil {
hostname = ""
}
hostname = strings.TrimSpace(hostname)
lastSyncAt := s.localDB.GetLastSyncTime()
lastSyncStatus := ReadinessReady
pendingChangesCount := s.localDB.CountPendingChanges()
pendingErrorsCount := s.localDB.CountErroredChanges()
configurationsCount := s.localDB.CountConfigurations()
projectsCount := s.localDB.CountProjects()
estimateVersion := latestPricelistVersion(s.localDB, "estimate")
warehouseVersion := latestPricelistVersion(s.localDB, "warehouse")
competitorVersion := latestPricelistVersion(s.localDB, "competitor")
lastSyncErrorCode, lastSyncErrorText := latestSyncErrorState(s.localDB)
return mariaDB.Exec(`
INSERT INTO qt_client_schema_state (
username, hostname, app_version,
last_sync_at, last_sync_status, pending_changes_count, pending_errors_count,
configurations_count, projects_count,
estimate_pricelist_version, warehouse_pricelist_version, competitor_pricelist_version,
last_sync_error_code, last_sync_error_text,
last_checked_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
app_version = VALUES(app_version),
last_sync_at = VALUES(last_sync_at),
last_sync_status = VALUES(last_sync_status),
pending_changes_count = VALUES(pending_changes_count),
pending_errors_count = VALUES(pending_errors_count),
configurations_count = VALUES(configurations_count),
projects_count = VALUES(projects_count),
estimate_pricelist_version = VALUES(estimate_pricelist_version),
warehouse_pricelist_version = VALUES(warehouse_pricelist_version),
competitor_pricelist_version = VALUES(competitor_pricelist_version),
last_sync_error_code = VALUES(last_sync_error_code),
last_sync_error_text = VALUES(last_sync_error_text),
last_checked_at = VALUES(last_checked_at),
updated_at = VALUES(updated_at)
`, username, hostname, appmeta.Version(),
lastSyncAt, lastSyncStatus, pendingChangesCount, pendingErrorsCount,
configurationsCount, projectsCount,
estimateVersion, warehouseVersion, competitorVersion,
lastSyncErrorCode, lastSyncErrorText,
checkedAt, checkedAt).Error
}
func isDuplicatePrimaryKeyDefinition(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "multiple primary key defined") ||
strings.Contains(msg, "duplicate key name 'primary'") ||
strings.Contains(msg, "duplicate entry")
}
func latestPricelistVersion(local *localdb.LocalDB, source string) *string {
if local == nil {
return nil
}
pl, err := local.GetLatestLocalPricelistBySource(source)
if err != nil || pl == nil {
return nil
}
version := strings.TrimSpace(pl.Version)
if version == "" {
return nil
}
return &version
}
func latestSyncErrorState(local *localdb.LocalDB) (*string, *string) {
if local == nil {
return nil, nil
}
if guard, err := local.GetSyncGuardState(); err == nil && guard != nil && strings.EqualFold(guard.Status, ReadinessBlocked) {
return optionalString(strings.TrimSpace(guard.ReasonCode)), optionalString(strings.TrimSpace(guard.ReasonText))
}
var pending localdb.PendingChange
if err := local.DB().
Where("TRIM(COALESCE(last_error, '')) <> ''").
Order("id DESC").
First(&pending).Error; err == nil {
return optionalString("PENDING_CHANGE_ERROR"), optionalString(strings.TrimSpace(pending.LastError))
}
return nil, nil
}
func optionalString(value string) *string {
if strings.TrimSpace(value) == "" {
return nil
}
v := strings.TrimSpace(value)
return &v
}
func toReadinessFromState(state *localdb.LocalSyncGuardState) *SyncReadiness {
if state == nil {
return nil
}
blocked := state.Status == ReadinessBlocked
return &SyncReadiness{
Status: state.Status,
Blocked: blocked,
ReasonCode: state.ReasonCode,
ReasonText: state.ReasonText,
RequiredMinAppVersion: state.RequiredMinAppVersion,
LastCheckedAt: state.LastCheckedAt,
}
}