diff --git a/cmd/qfs/main.go b/cmd/qfs/main.go index 43746c4..4525d56 100644 --- a/cmd/qfs/main.go +++ b/cmd/qfs/main.go @@ -527,44 +527,8 @@ func setupRouter(cfg *config.Config, local *localdb.LocalDB, connMgr *db.Connect if !connMgr.IsOnline() { return } - serverDB, err := connMgr.GetDB() - if err != nil || serverDB == nil { - return - } - - projectRepo := repository.NewProjectRepository(serverDB) - serverProjects, _, err := projectRepo.List(0, 10000, true) - if err != nil { - return - } - - now := time.Now() - for i := range serverProjects { - sp := serverProjects[i] - localProject, getErr := local.GetProjectByUUID(sp.UUID) - if getErr == nil && localProject != nil { - // Keep unsynced local changes intact. - if localProject.SyncStatus == "pending" { - continue - } - localProject.OwnerUsername = sp.OwnerUsername - localProject.Name = sp.Name - localProject.IsActive = sp.IsActive - localProject.IsSystem = sp.IsSystem - localProject.CreatedAt = sp.CreatedAt - localProject.UpdatedAt = sp.UpdatedAt - serverID := sp.ID - localProject.ServerID = &serverID - localProject.SyncStatus = "synced" - localProject.SyncedAt = &now - _ = local.SaveProject(localProject) - continue - } - - lp := localdb.ProjectToLocal(&sp) - lp.SyncStatus = "synced" - lp.SyncedAt = &now - _ = local.SaveProject(lp) + if _, err := syncService.ImportProjectsToLocal(); err != nil && !errors.Is(err, sync.ErrOffline) { + slog.Warn("failed to sync projects from server", "error", err) } } diff --git a/internal/handlers/sync.go b/internal/handlers/sync.go index 35bb5a8..f9f3a9a 100644 --- a/internal/handlers/sync.go +++ b/internal/handlers/sync.go @@ -182,14 +182,23 @@ func (h *SyncHandler) SyncPricelists(c *gin.Context) { // SyncAllResponse represents result of full sync type SyncAllResponse struct { - Success bool `json:"success"` - Message string `json:"message"` - ComponentsSynced int `json:"components_synced"` - PricelistsSynced int `json:"pricelists_synced"` - Duration string `json:"duration"` + Success bool `json:"success"` + Message string `json:"message"` + PendingPushed int `json:"pending_pushed"` + ComponentsSynced int `json:"components_synced"` + PricelistsSynced int `json:"pricelists_synced"` + ProjectsImported int `json:"projects_imported"` + ProjectsUpdated int `json:"projects_updated"` + ProjectsSkipped int `json:"projects_skipped"` + ConfigurationsImported int `json:"configurations_imported"` + ConfigurationsUpdated int `json:"configurations_updated"` + ConfigurationsSkipped int `json:"configurations_skipped"` + Duration string `json:"duration"` } -// SyncAll syncs both components and pricelists +// SyncAll performs full bidirectional sync: +// - push pending local changes (projects/configurations) to server +// - pull components, pricelists, projects, and configurations from server // POST /api/sync/all func (h *SyncHandler) SyncAll(c *gin.Context) { if !h.checkOnline() { @@ -201,7 +210,18 @@ func (h *SyncHandler) SyncAll(c *gin.Context) { } startTime := time.Now() - var componentsSynced, pricelistsSynced int + var pendingPushed, componentsSynced, pricelistsSynced int + + // Push local pending changes first (projects/configurations) + pendingPushed, err := h.syncService.PushPendingChanges() + if err != nil { + slog.Error("pending push failed during full sync", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{ + "success": false, + "error": "Pending changes push failed: " + err.Error(), + }) + return + } // Sync components mariaDB, err := h.connMgr.GetDB() @@ -231,17 +251,54 @@ func (h *SyncHandler) SyncAll(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{ "success": false, "error": "Pricelist sync failed: " + err.Error(), + "pending_pushed": pendingPushed, "components_synced": componentsSynced, }) return } + projectsResult, err := h.syncService.ImportProjectsToLocal() + if err != nil { + slog.Error("project import failed during full sync", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{ + "success": false, + "error": "Project import failed: " + err.Error(), + "pending_pushed": pendingPushed, + "components_synced": componentsSynced, + "pricelists_synced": pricelistsSynced, + }) + return + } + + configsResult, err := h.syncService.ImportConfigurationsToLocal() + if err != nil { + slog.Error("configuration import failed during full sync", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{ + "success": false, + "error": "Configuration import failed: " + err.Error(), + "pending_pushed": pendingPushed, + "components_synced": componentsSynced, + "pricelists_synced": pricelistsSynced, + "projects_imported": projectsResult.Imported, + "projects_updated": projectsResult.Updated, + "projects_skipped": projectsResult.Skipped, + }) + return + } + c.JSON(http.StatusOK, SyncAllResponse{ - Success: true, - Message: "Full sync completed successfully", - ComponentsSynced: componentsSynced, - PricelistsSynced: pricelistsSynced, - Duration: time.Since(startTime).String(), + Success: true, + Message: "Full sync completed successfully", + PendingPushed: pendingPushed, + ComponentsSynced: componentsSynced, + PricelistsSynced: pricelistsSynced, + ProjectsImported: projectsResult.Imported, + ProjectsUpdated: projectsResult.Updated, + ProjectsSkipped: projectsResult.Skipped, + ConfigurationsImported: configsResult.Imported, + ConfigurationsUpdated: configsResult.Updated, + ConfigurationsSkipped: configsResult.Skipped, + Duration: time.Since(startTime).String(), }) h.syncService.RecordSyncHeartbeat() } @@ -396,6 +453,9 @@ func (h *SyncHandler) GetUsersStatus(c *gin.Context) { return } + // Keep current client heartbeat fresh so app version is available in the table. + h.syncService.RecordSyncHeartbeat() + users, err := h.syncService.ListUserSyncStatuses(threshold) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ diff --git a/internal/services/sync/service.go b/internal/services/sync/service.go index 126e356..4c03f2b 100644 --- a/internal/services/sync/service.go +++ b/internal/services/sync/service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "sort" "strings" "time" @@ -64,6 +65,13 @@ type ConfigImportResult struct { Skipped int `json:"skipped"` } +// ProjectImportResult represents server->local project import stats. +type ProjectImportResult struct { + Imported int `json:"imported"` + Updated int `json:"updated"` + Skipped int `json:"skipped"` +} + // ConfigurationChangePayload is stored in pending_changes.payload for configuration events. // It carries version metadata so sync can push the latest snapshot and prepare for conflict resolution. type ConfigurationChangePayload struct { @@ -153,6 +161,77 @@ func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) { return result, nil } +// ImportProjectsToLocal imports projects from MariaDB into local SQLite. +// Existing local projects with pending local changes are skipped to avoid data loss. +func (s *Service) ImportProjectsToLocal() (*ProjectImportResult, error) { + mariaDB, err := s.getDB() + if err != nil { + return nil, ErrOffline + } + + projectRepo := repository.NewProjectRepository(mariaDB) + result := &ProjectImportResult{} + + offset := 0 + const limit = 200 + for { + serverProjects, _, err := projectRepo.List(offset, limit, true) + if err != nil { + return nil, fmt.Errorf("listing server projects: %w", err) + } + if len(serverProjects) == 0 { + break + } + + now := time.Now() + for i := range serverProjects { + project := serverProjects[i] + + existing, getErr := s.localDB.GetProjectByUUID(project.UUID) + if getErr != nil && !errors.Is(getErr, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("getting local project %s: %w", project.UUID, getErr) + } + + if existing != nil && getErr == nil { + // Keep unsynced local changes intact. + if existing.SyncStatus == "pending" { + result.Skipped++ + continue + } + + existing.OwnerUsername = project.OwnerUsername + existing.Name = project.Name + existing.IsActive = project.IsActive + existing.IsSystem = project.IsSystem + existing.CreatedAt = project.CreatedAt + existing.UpdatedAt = project.UpdatedAt + serverID := project.ID + existing.ServerID = &serverID + existing.SyncStatus = "synced" + existing.SyncedAt = &now + + if err := s.localDB.SaveProject(existing); err != nil { + return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err) + } + result.Updated++ + continue + } + + localProject := localdb.ProjectToLocal(&project) + localProject.SyncStatus = "synced" + localProject.SyncedAt = &now + if err := s.localDB.SaveProject(localProject); err != nil { + return nil, fmt.Errorf("saving local project %s: %w", project.UUID, err) + } + result.Imported++ + } + + offset += len(serverProjects) + } + + return result, nil +} + // GetStatus returns the current sync status func (s *Service) GetStatus() (*SyncStatus, error) { lastSync := s.localDB.GetLastSyncTime() @@ -371,21 +450,85 @@ func (s *Service) ListUserSyncStatuses(onlineThreshold time.Duration) ([]UserSyn return nil, fmt.Errorf("load sync status rows: %w", err) } + activeUsers, err := s.listConnectedDBUsers(mariaDB) + if err != nil { + slog.Debug("sync status: failed to load connected DB users", "error", err) + activeUsers = map[string]struct{}{} + } + now := time.Now().UTC() - result := make([]UserSyncStatus, 0, len(rows)) + result := make([]UserSyncStatus, 0, len(rows)+len(activeUsers)) for i := range rows { r := rows[i] + username := strings.TrimSpace(r.Username) + if username == "" { + continue + } + + isOnline := now.Sub(r.LastSyncAt) <= onlineThreshold + if _, connected := activeUsers[username]; connected { + isOnline = true + delete(activeUsers, username) + } + + appVersion := strings.TrimSpace(r.AppVersion) + result = append(result, UserSyncStatus{ - Username: r.Username, + Username: username, LastSyncAt: r.LastSyncAt, - AppVersion: strings.TrimSpace(r.AppVersion), - IsOnline: now.Sub(r.LastSyncAt) <= onlineThreshold, + AppVersion: appVersion, + IsOnline: isOnline, }) } + for username := range activeUsers { + result = append(result, UserSyncStatus{ + Username: username, + LastSyncAt: now, + AppVersion: "", + IsOnline: true, + }) + } + + sort.SliceStable(result, func(i, j int) bool { + if result[i].IsOnline != result[j].IsOnline { + return result[i].IsOnline + } + if result[i].LastSyncAt.Equal(result[j].LastSyncAt) { + return strings.ToLower(result[i].Username) < strings.ToLower(result[j].Username) + } + return result[i].LastSyncAt.After(result[j].LastSyncAt) + }) + return result, nil } +func (s *Service) listConnectedDBUsers(mariaDB *gorm.DB) (map[string]struct{}, error) { + type processUserRow struct { + Username string `gorm:"column:username"` + } + + var rows []processUserRow + if err := mariaDB.Raw(` + SELECT DISTINCT TRIM(USER) AS username + FROM information_schema.PROCESSLIST + WHERE COALESCE(TRIM(USER), '') <> '' + AND DB = DATABASE() + `).Scan(&rows).Error; err != nil { + return nil, err + } + + users := make(map[string]struct{}, len(rows)) + for i := range rows { + username := strings.TrimSpace(rows[i].Username) + if username == "" { + continue + } + users[username] = struct{}{} + } + return users, nil +} + func ensureUserSyncStatusTable(db *gorm.DB) error { if err := db.Exec(` CREATE TABLE IF NOT EXISTS qt_pricelist_sync_status (