Make full sync push pending and pull projects/configurations

This commit is contained in:
Mikhail Chusavitin
2026-02-06 15:25:07 +03:00
parent c02a7eac73
commit 3d5ab63970
3 changed files with 221 additions and 54 deletions

View File

@@ -527,44 +527,8 @@ func setupRouter(cfg *config.Config, local *localdb.LocalDB, connMgr *db.Connect
if !connMgr.IsOnline() { if !connMgr.IsOnline() {
return return
} }
serverDB, err := connMgr.GetDB() if _, err := syncService.ImportProjectsToLocal(); err != nil && !errors.Is(err, sync.ErrOffline) {
if err != nil || serverDB == nil { slog.Warn("failed to sync projects from server", "error", err)
return
}
projectRepo := repository.NewProjectRepository(serverDB)
serverProjects, _, err := projectRepo.List(0, 10000, true)
if err != nil {
return
}
now := time.Now()
for i := range serverProjects {
sp := serverProjects[i]
localProject, getErr := local.GetProjectByUUID(sp.UUID)
if getErr == nil && localProject != nil {
// Keep unsynced local changes intact.
if localProject.SyncStatus == "pending" {
continue
}
localProject.OwnerUsername = sp.OwnerUsername
localProject.Name = sp.Name
localProject.IsActive = sp.IsActive
localProject.IsSystem = sp.IsSystem
localProject.CreatedAt = sp.CreatedAt
localProject.UpdatedAt = sp.UpdatedAt
serverID := sp.ID
localProject.ServerID = &serverID
localProject.SyncStatus = "synced"
localProject.SyncedAt = &now
_ = local.SaveProject(localProject)
continue
}
lp := localdb.ProjectToLocal(&sp)
lp.SyncStatus = "synced"
lp.SyncedAt = &now
_ = local.SaveProject(lp)
} }
} }

View File

@@ -184,12 +184,21 @@ func (h *SyncHandler) SyncPricelists(c *gin.Context) {
type SyncAllResponse struct { type SyncAllResponse struct {
Success bool `json:"success"` Success bool `json:"success"`
Message string `json:"message"` Message string `json:"message"`
PendingPushed int `json:"pending_pushed"`
ComponentsSynced int `json:"components_synced"` ComponentsSynced int `json:"components_synced"`
PricelistsSynced int `json:"pricelists_synced"` PricelistsSynced int `json:"pricelists_synced"`
ProjectsImported int `json:"projects_imported"`
ProjectsUpdated int `json:"projects_updated"`
ProjectsSkipped int `json:"projects_skipped"`
ConfigurationsImported int `json:"configurations_imported"`
ConfigurationsUpdated int `json:"configurations_updated"`
ConfigurationsSkipped int `json:"configurations_skipped"`
Duration string `json:"duration"` Duration string `json:"duration"`
} }
// SyncAll syncs both components and pricelists // SyncAll performs full bidirectional sync:
// - push pending local changes (projects/configurations) to server
// - pull components, pricelists, projects, and configurations from server
// POST /api/sync/all // POST /api/sync/all
func (h *SyncHandler) SyncAll(c *gin.Context) { func (h *SyncHandler) SyncAll(c *gin.Context) {
if !h.checkOnline() { if !h.checkOnline() {
@@ -201,7 +210,18 @@ func (h *SyncHandler) SyncAll(c *gin.Context) {
} }
startTime := time.Now() startTime := time.Now()
var componentsSynced, pricelistsSynced int var pendingPushed, componentsSynced, pricelistsSynced int
// Push local pending changes first (projects/configurations)
pendingPushed, err := h.syncService.PushPendingChanges()
if err != nil {
slog.Error("pending push failed during full sync", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{
"success": false,
"error": "Pending changes push failed: " + err.Error(),
})
return
}
// Sync components // Sync components
mariaDB, err := h.connMgr.GetDB() mariaDB, err := h.connMgr.GetDB()
@@ -231,16 +251,53 @@ func (h *SyncHandler) SyncAll(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{ c.JSON(http.StatusInternalServerError, gin.H{
"success": false, "success": false,
"error": "Pricelist sync failed: " + err.Error(), "error": "Pricelist sync failed: " + err.Error(),
"pending_pushed": pendingPushed,
"components_synced": componentsSynced, "components_synced": componentsSynced,
}) })
return return
} }
projectsResult, err := h.syncService.ImportProjectsToLocal()
if err != nil {
slog.Error("project import failed during full sync", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{
"success": false,
"error": "Project import failed: " + err.Error(),
"pending_pushed": pendingPushed,
"components_synced": componentsSynced,
"pricelists_synced": pricelistsSynced,
})
return
}
configsResult, err := h.syncService.ImportConfigurationsToLocal()
if err != nil {
slog.Error("configuration import failed during full sync", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{
"success": false,
"error": "Configuration import failed: " + err.Error(),
"pending_pushed": pendingPushed,
"components_synced": componentsSynced,
"pricelists_synced": pricelistsSynced,
"projects_imported": projectsResult.Imported,
"projects_updated": projectsResult.Updated,
"projects_skipped": projectsResult.Skipped,
})
return
}
c.JSON(http.StatusOK, SyncAllResponse{ c.JSON(http.StatusOK, SyncAllResponse{
Success: true, Success: true,
Message: "Full sync completed successfully", Message: "Full sync completed successfully",
PendingPushed: pendingPushed,
ComponentsSynced: componentsSynced, ComponentsSynced: componentsSynced,
PricelistsSynced: pricelistsSynced, PricelistsSynced: pricelistsSynced,
ProjectsImported: projectsResult.Imported,
ProjectsUpdated: projectsResult.Updated,
ProjectsSkipped: projectsResult.Skipped,
ConfigurationsImported: configsResult.Imported,
ConfigurationsUpdated: configsResult.Updated,
ConfigurationsSkipped: configsResult.Skipped,
Duration: time.Since(startTime).String(), Duration: time.Since(startTime).String(),
}) })
h.syncService.RecordSyncHeartbeat() h.syncService.RecordSyncHeartbeat()
@@ -396,6 +453,9 @@ func (h *SyncHandler) GetUsersStatus(c *gin.Context) {
return return
} }
// Keep current client heartbeat fresh so app version is available in the table.
h.syncService.RecordSyncHeartbeat()
users, err := h.syncService.ListUserSyncStatuses(threshold) users, err := h.syncService.ListUserSyncStatuses(threshold)
if err != nil { if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{ c.JSON(http.StatusInternalServerError, gin.H{

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"log/slog" "log/slog"
"sort"
"strings" "strings"
"time" "time"
@@ -64,6 +65,13 @@ type ConfigImportResult struct {
Skipped int `json:"skipped"` Skipped int `json:"skipped"`
} }
// ProjectImportResult represents server->local project import stats.
type ProjectImportResult struct {
Imported int `json:"imported"`
Updated int `json:"updated"`
Skipped int `json:"skipped"`
}
// ConfigurationChangePayload is stored in pending_changes.payload for configuration events. // ConfigurationChangePayload is stored in pending_changes.payload for configuration events.
// It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution. // It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution.
type ConfigurationChangePayload struct { type ConfigurationChangePayload struct {
@@ -153,6 +161,77 @@ func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
return result, nil return result, nil
} }
// ImportProjectsToLocal imports projects from MariaDB into local SQLite.
// Existing local projects with pending local changes are skipped to avoid data loss.
func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) {
mariaDB, err := s.getDB()
if err != nil {
return nil, ErrOffline
}
projectRepo := repository.NewProjectRepository(mariaDB)
result := &ProjectImportResult{}
offset := 0
const limit = 200
for {
serverProjects, _, err := projectRepo.List(offset, limit, true)
if err != nil {
return nil, fmt.Errorf("listing server projects: %w", err)
}
if len(serverProjects) == 0 {
break
}
now := time.Now()
for i := range serverProjects {
project := serverProjects[i]
existing, getErr := s.localDB.GetProjectByUUID(project.UUID)
if getErr != nil && !errors.Is(getErr, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("getting local project %s: %w", project.UUID, getErr)
}
if existing != nil && getErr == nil {
// Keep unsynced local changes intact.
if existing.SyncStatus == "pending" {
result.Skipped++
continue
}
existing.OwnerUsername = project.OwnerUsername
existing.Name = project.Name
existing.IsActive = project.IsActive
existing.IsSystem = project.IsSystem
existing.CreatedAt = project.CreatedAt
existing.UpdatedAt = project.UpdatedAt
serverID := project.ID
existing.ServerID = &serverID
existing.SyncStatus = "synced"
existing.SyncedAt = &now
if err := s.localDB.SaveProject(existing); err != nil {
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
}
result.Updated++
continue
}
localProject := localdb.ProjectToLocal(&project)
localProject.SyncStatus = "synced"
localProject.SyncedAt = &now
if err := s.localDB.SaveProject(localProject); err != nil {
return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err)
}
result.Imported++
}
offset += len(serverProjects)
}
return result, nil
}
// GetStatus returns the current sync status // GetStatus returns the current sync status
func (s *Service) GetStatus() (*SyncStatus, error) { func (s *Service) GetStatus() (*SyncStatus, error) {
lastSync := s.localDB.GetLastSyncTime() lastSync := s.localDB.GetLastSyncTime()
@@ -371,21 +450,85 @@ func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyn
return nil, fmt.Errorf("load sync status rows: %w", err) return nil, fmt.Errorf("load sync status rows: %w", err)
} }
activeUsers, err := s.listConnectedDBUsers(mariaDB)
if err != nil {
slog.Debug("sync status: failed to load connected DB users", "error", err)
activeUsers = map[string]struct{}{}
}
now := time.Now().UTC() now := time.Now().UTC()
result := make([]UserSyncStatus, 0, len(rows)) result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers))
for i := range rows { for i := range rows {
r := rows[i] r := rows[i]
username := strings.TrimSpace(r.Username)
if username == "" {
continue
}
isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold
if _, connected := activeUsers[username]; connected {
isOnline = true
delete(activeUsers, username)
}
appVersion := strings.TrimSpace(r.AppVersion)
result = append(result, UserSyncStatus{ result = append(result, UserSyncStatus{
Username: r.Username, Username: username,
LastSyncAt: r.LastSyncAt, LastSyncAt: r.LastSyncAt,
AppVersion: strings.TrimSpace(r.AppVersion), AppVersion: appVersion,
IsOnline: now.Sub(r.LastSyncAt) <= onlineThreshold, IsOnline: isOnline,
}) })
} }
for username := range activeUsers {
result = append(result, UserSyncStatus{
Username: username,
LastSyncAt: now,
AppVersion: "",
IsOnline: true,
})
}
sort.SliceStable(result, func(i, j int) bool {
if result[i].IsOnline != result[j].IsOnline {
return result[i].IsOnline
}
if result[i].LastSyncAt.Equal(result[j].LastSyncAt) {
return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username)
}
return result[i].LastSyncAt.After(result[j].LastSyncAt)
})
return result, nil return result, nil
} }
func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) {
type processUserRow struct {
Username string `gorm:"column:username"`
}
var rows []processUserRow
if err := mariaDB.Raw(`
SELECT DISTINCT TRIM(USER) AS username
FROM information_schema.PROCESSLIST
WHERE COALESCE(TRIM(USER), '') <> ''
AND DB = DATABASE()
`).Scan(&rows).Error; err != nil {
return nil, err
}
users := make(map[string]struct{}, len(rows))
for i := range rows {
username := strings.TrimSpace(rows[i].Username)
if username == "" {
continue
}
users[username] = struct{}{}
}
return users, nil
}
func ensureUserSyncStatusTable(db *gorm.DB) error { func ensureUserSyncStatusTable(db *gorm.DB) error {
if err := db.Exec(` if err := db.Exec(`
CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status ( CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status (