Files
PriceForge/internal/services/sync/service.go
Mikhail Chusavitin f48615e8a9 Modularize Go files, extract JS to static, implement competitor pricelists
Go refactoring:
- Split handlers/pricing.go (2446→291 lines) into 5 focused files
- Split services/stock_import.go (1334→~400 lines) into stock_mappings.go + stock_parse.go
- Split services/sync/service.go (1290→~250 lines) into 3 files

JS extraction:
- Move all inline <script> blocks to web/static/js/ (6 files)
- Templates reduced: admin_pricing 2873→521, lot 1531→304, vendor_mappings 1063→169, etc.

Competitor pricelists (migrations 033-039):
- qt_competitors + partnumber_log_competitors tables
- Excel import with column mapping, dedup, bulk insert
- p/n→lot resolution via weighted_median, discount applied
- Unmapped p/ns written to qt_vendor_partnumber_seen
- Quote counts (unique/total) shown on /admin/competitors
- price_method="weighted_median", price_period_days=0 stored explicitly

Fix price_method/price_period_days for warehouse items:
- warehouse: weighted_avg, period=0
- competitor: weighted_median, period=0
- Removes misleading DB defaults (was: median/90)

Update bible: architecture.md, pricelist.md, history.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-13 07:44:10 +03:00

360 lines
10 KiB
Go

package sync
import (
"errors"
"fmt"
"log/slog"
"sort"
"strings"
"time"
"git.mchus.pro/mchus/priceforge/internal/appmeta"
"git.mchus.pro/mchus/priceforge/internal/db"
"git.mchus.pro/mchus/priceforge/internal/localdb"
"git.mchus.pro/mchus/priceforge/internal/models"
"git.mchus.pro/mchus/priceforge/internal/repository"
"gorm.io/gorm"
)
var ErrOffline = errors.New("database is offline")
// Service handles synchronization between MariaDB and local SQLite
type Service struct {
connMgr *db.ConnectionManager
localDB *localdb.LocalDB
directDB *gorm.DB
}
// NewService creates a new sync service
func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Service {
return &Service{
connMgr: connMgr,
localDB: localDB,
}
}
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
return &Service{
localDB: localDB,
directDB: mariaDB,
}
}
// SyncStatus represents the current sync status
type SyncStatus struct {
LastSyncAt *time.Time `json:"last_sync_at"`
ServerPricelists int `json:"server_pricelists"`
LocalPricelists int `json:"local_pricelists"`
NeedsSync bool `json:"needs_sync"`
}
type UserSyncStatus struct {
Username string `json:"username"`
LastSyncAt time.Time `json:"last_sync_at"`
AppVersion string `json:"app_version,omitempty"`
IsOnline bool `json:"is_online"`
}
// ConfigImportResult represents server->local configuration import stats.
type ConfigImportResult struct {
Imported int `json:"imported"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
}
// ProjectImportResult represents server->local project import stats.
type ProjectImportResult struct {
Imported int `json:"imported"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
}
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
type ConfigurationChangePayload struct {
EventID string `json:"event_id"`
IdempotencyKey string `json:"idempotency_key"`
ConfigurationUUID string `json:"configuration_uuid"`
ProjectUUID *string `json:"project_uuid,omitempty"`
PricelistID *uint `json:"pricelist_id,omitempty"`
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
CurrentVersionID string `json:"current_version_id,omitempty"`
CurrentVersionNo int `json:"current_version_no,omitempty"`
ConflictPolicy string `json:"conflict_policy,omitempty"` // currently: last_write_wins
Snapshot models.Configuration `json:"snapshot"`
CreatedAt time.Time `json:"created_at"`
CreatedBy *string `json:"created_by,omitempty"`
}
type ProjectChangePayload struct {
EventID string `json:"event_id"`
IdempotencyKey string `json:"idempotency_key"`
ProjectUUID string `json:"project_uuid"`
Operation string `json:"operation"`
Snapshot models.Project `json:"snapshot"`
CreatedAt time.Time `json:"created_at"`
}
// GetStatus returns the current sync status
func (s *Service) GetStatus() (*SyncStatus, error) {
lastSync := s.localDB.GetLastSyncTime()
// Count server pricelists (only if already connected, don't reconnect)
serverCount := 0
connStatus := s.getConnectionStatus()
if connStatus.IsConnected {
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
pricelistRepo := repository.NewPricelistRepository(mariaDB)
activeCount, err := pricelistRepo.CountActive()
if err == nil {
serverCount = int(activeCount)
}
}
}
// Count local pricelists
localCount := s.localDB.CountLocalPricelists()
needsSync, _ := s.NeedSync()
return &SyncStatus{
LastSyncAt: lastSync,
ServerPricelists: serverCount,
LocalPricelists: int(localCount),
NeedsSync: needsSync,
}, nil
}
// NeedSync checks if synchronization is needed
// Returns true if there are new pricelists on server or last sync was >1 hour ago
func (s *Service) NeedSync() (bool, error) {
lastSync := s.localDB.GetLastSyncTime()
// If never synced, need sync
if lastSync == nil {
return true, nil
}
// If last sync was more than 1 hour ago, suggest sync
if time.Since(*lastSync) > time.Hour {
return true, nil
}
// Check if there are new pricelists on server (only if already connected)
connStatus := s.getConnectionStatus()
if !connStatus.IsConnected {
// If offline, can't check server, no need to sync
return false, nil
}
mariaDB, err := s.getDB()
if err != nil {
// If offline, can't check server, no need to sync
return false, nil
}
pricelistRepo := repository.NewPricelistRepository(mariaDB)
sources := []models.PricelistSource{
models.PricelistSourceEstimate,
models.PricelistSourceWarehouse,
models.PricelistSourceCompetitor,
}
for _, source := range sources {
latestServer, err := pricelistRepo.GetLatestActiveBySource(string(source))
if err != nil {
// No active pricelist for this source yet.
continue
}
latestLocal, err := s.localDB.GetLatestLocalPricelistBySource(string(source))
if err != nil {
// No local pricelist for an existing source on server.
return true, nil
}
// If server has newer pricelist for this source, need sync.
if latestServer.ID != latestLocal.ServerID {
return true, nil
}
}
return false, nil
}
// RecordSyncHeartbeat updates shared sync heartbeat for current DB user.
// Only users with write rights are expected to be able to update this table.
func (s *Service) RecordSyncHeartbeat() {
username := strings.TrimSpace(s.localDB.GetDBUser())
if username == "" {
return
}
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return
}
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
slog.Warn("sync heartbeat: failed to ensure table", "error", err)
return
}
now := time.Now().UTC()
if err := mariaDB.Exec(`
INSERT INTO qt_pricelist_sync_status (username, last_sync_at, updated_at, app_version)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
last_sync_at = VALUES(last_sync_at),
updated_at = VALUES(updated_at),
app_version = VALUES(app_version)
`, username, now, now, appmeta.Version()).Error; err != nil {
slog.Debug("sync heartbeat: skipped", "username", username, "error", err)
}
}
// ListUserSyncStatuses returns users who have recorded sync heartbeat.
func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyncStatus, error) {
mariaDB, err := s.getDB()
if err != nil || mariaDB == nil {
return nil, ErrOffline
}
if err := ensureUserSyncStatusTable(mariaDB); err != nil {
return nil, fmt.Errorf("ensure sync status table: %w", err)
}
type row struct {
Username string `gorm:"column:username"`
LastSyncAt time.Time `gorm:"column:last_sync_at"`
AppVersion string `gorm:"column:app_version"`
}
var rows []row
if err := mariaDB.Raw(`
SELECT username, last_sync_at, COALESCE(app_version, '') AS app_version
FROM qt_pricelist_sync_status
ORDER BY last_sync_at DESC, username ASC
`).Scan(&rows).Error; err != nil {
return nil, fmt.Errorf("load sync status rows: %w", err)
}
activeUsers, err := s.listConnectedDBUsers(mariaDB)
if err != nil {
slog.Debug("sync status: failed to load connected DB users", "error", err)
activeUsers = map[string]struct{}{}
}
now := time.Now().UTC()
result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers))
for i := range rows {
r := rows[i]
username := strings.TrimSpace(r.Username)
if username == "" {
continue
}
isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold
if _, connected := activeUsers[username]; connected {
isOnline = true
delete(activeUsers, username)
}
appVersion := strings.TrimSpace(r.AppVersion)
result = append(result, UserSyncStatus{
Username: username,
LastSyncAt: r.LastSyncAt,
AppVersion: appVersion,
IsOnline: isOnline,
})
}
for username := range activeUsers {
result = append(result, UserSyncStatus{
Username: username,
LastSyncAt: now,
AppVersion: "",
IsOnline: true,
})
}
sort.SliceStable(result, func(i, j int) bool {
if result[i].IsOnline != result[j].IsOnline {
return result[i].IsOnline
}
if result[i].LastSyncAt.Equal(result[j].LastSyncAt) {
return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username)
}
return result[i].LastSyncAt.After(result[j].LastSyncAt)
})
return result, nil
}
func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) {
type processUserRow struct {
Username string `gorm:"column:username"`
}
var rows []processUserRow
if err := mariaDB.Raw(`
SELECT DISTINCT TRIM(USER) AS username
FROM information_schema.PROCESSLIST
WHERE COALESCE(TRIM(USER), '') <> ''
AND DB = DATABASE()
`).Scan(&rows).Error; err != nil {
return nil, err
}
users := make(map[string]struct{}, len(rows))
for i := range rows {
username := strings.TrimSpace(rows[i].Username)
if username == "" {
continue
}
users[username] = struct{}{}
}
return users, nil
}
func ensureUserSyncStatusTable(db *gorm.DB) error {
if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status (
username VARCHAR(100) NOT NULL,
last_sync_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
app_version VARCHAR(64) NULL,
PRIMARY KEY (username),
INDEX idx_qt_pricelist_sync_status_last_sync (last_sync_at)
)
`).Error; err != nil {
return err
}
// Backward compatibility for environments where table was created without app_version.
return db.Exec(`
ALTER TABLE qt_pricelist_sync_status
ADD COLUMN IF NOT EXISTS app_version VARCHAR(64) NULL
`).Error
}
func (s *Service) getDB() (*gorm.DB, error) {
if s.directDB != nil {
return s.directDB, nil
}
if s.connMgr == nil {
return nil, ErrOffline
}
return s.connMgr.GetDB()
}
func (s *Service) getConnectionStatus() db.ConnectionStatus {
if s.directDB != nil {
return db.ConnectionStatus{IsConnected: true}
}
if s.connMgr == nil {
return db.ConnectionStatus{IsConnected: false}
}
return s.connMgr.GetStatus()
}