feat: add projects flow and consolidate default project handling
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/models"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/repository"
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
@@ -19,8 +20,9 @@ var ErrOffline = errors.New("database is offline")
|
||||
|
||||
// Service handles synchronization between MariaDB and local SQLite
|
||||
type Service struct {
|
||||
connMgr *db.ConnectionManager
|
||||
localDB *localdb.LocalDB
|
||||
connMgr *db.ConnectionManager
|
||||
localDB *localdb.LocalDB
|
||||
directDB *gorm.DB
|
||||
}
|
||||
|
||||
// NewService creates a new sync service
|
||||
@@ -31,6 +33,14 @@ func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Servic
|
||||
}
|
||||
}
|
||||
|
||||
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
|
||||
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
|
||||
return &Service{
|
||||
localDB: localDB,
|
||||
directDB: mariaDB,
|
||||
}
|
||||
}
|
||||
|
||||
// SyncStatus represents the current sync status
|
||||
type SyncStatus struct {
|
||||
LastSyncAt *time.Time `json:"last_sync_at"`
|
||||
@@ -52,6 +62,7 @@ type ConfigurationChangePayload struct {
|
||||
EventID string `json:"event_id"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
ConfigurationUUID string `json:"configuration_uuid"`
|
||||
ProjectUUID *string `json:"project_uuid,omitempty"`
|
||||
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
|
||||
CurrentVersionID string `json:"current_version_id,omitempty"`
|
||||
CurrentVersionNo int `json:"current_version_no,omitempty"`
|
||||
@@ -61,10 +72,19 @@ type ConfigurationChangePayload struct {
|
||||
CreatedBy *string `json:"created_by,omitempty"`
|
||||
}
|
||||
|
||||
type ProjectChangePayload struct {
|
||||
EventID string `json:"event_id"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
ProjectUUID string `json:"project_uuid"`
|
||||
Operation string `json:"operation"`
|
||||
Snapshot models.Project `json:"snapshot"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
|
||||
// Existing local configs with pending local changes are skipped to avoid data loss.
|
||||
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return nil, ErrOffline
|
||||
}
|
||||
@@ -130,9 +150,9 @@ func (s *Service) GetStatus() (*SyncStatus, error) {
|
||||
|
||||
// Count server pricelists (only if already connected, don't reconnect)
|
||||
serverCount := 0
|
||||
connStatus := s.connMgr.GetStatus()
|
||||
connStatus := s.getConnectionStatus()
|
||||
if connStatus.IsConnected {
|
||||
if mariaDB, err := s.connMgr.GetDB(); err == nil && mariaDB != nil {
|
||||
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
|
||||
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
||||
activeCount, err := pricelistRepo.CountActive()
|
||||
if err == nil {
|
||||
@@ -170,13 +190,13 @@ func (s *Service) NeedSync() (bool, error) {
|
||||
}
|
||||
|
||||
// Check if there are new pricelists on server (only if already connected)
|
||||
connStatus := s.connMgr.GetStatus()
|
||||
connStatus := s.getConnectionStatus()
|
||||
if !connStatus.IsConnected {
|
||||
// If offline, can't check server, no need to sync
|
||||
return false, nil
|
||||
}
|
||||
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
// If offline, can't check server, no need to sync
|
||||
return false, nil
|
||||
@@ -208,7 +228,7 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
slog.Info("starting pricelist sync")
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -301,7 +321,7 @@ func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -418,8 +438,9 @@ func (s *Service) PushPendingChanges() (int, error) {
|
||||
slog.Info("pushing pending changes", "count", len(changes))
|
||||
pushed := 0
|
||||
var syncedIDs []int64
|
||||
sortedChanges := prioritizeProjectChanges(changes)
|
||||
|
||||
for _, change := range changes {
|
||||
for _, change := range sortedChanges {
|
||||
err := s.pushSingleChange(&change)
|
||||
if err != nil {
|
||||
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
||||
@@ -446,6 +467,8 @@ func (s *Service) PushPendingChanges() (int, error) {
|
||||
// pushSingleChange pushes a single pending change to the server
|
||||
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
||||
switch change.EntityType {
|
||||
case "project":
|
||||
return s.pushProjectChange(change)
|
||||
case "configuration":
|
||||
return s.pushConfigurationChange(change)
|
||||
default:
|
||||
@@ -453,6 +476,95 @@ func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
||||
}
|
||||
}
|
||||
|
||||
func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange {
|
||||
if len(changes) < 2 {
|
||||
return changes
|
||||
}
|
||||
|
||||
projectChanges := make([]localdb.PendingChange, 0, len(changes))
|
||||
otherChanges := make([]localdb.PendingChange, 0, len(changes))
|
||||
for _, change := range changes {
|
||||
if change.EntityType == "project" {
|
||||
projectChanges = append(projectChanges, change)
|
||||
continue
|
||||
}
|
||||
otherChanges = append(otherChanges, change)
|
||||
}
|
||||
|
||||
sorted := make([]localdb.PendingChange, 0, len(changes))
|
||||
sorted = append(sorted, projectChanges...)
|
||||
sorted = append(sorted, otherChanges...)
|
||||
return sorted
|
||||
}
|
||||
|
||||
func (s *Service) pushProjectChange(change *localdb.PendingChange) error {
|
||||
payload, err := decodeProjectChangePayload(change)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode project payload: %w", err)
|
||||
}
|
||||
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
|
||||
projectRepo := repository.NewProjectRepository(mariaDB)
|
||||
project := payload.Snapshot
|
||||
project.UUID = payload.ProjectUUID
|
||||
|
||||
serverProject, err := projectRepo.GetByUUID(project.UUID)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
if createErr := projectRepo.Create(&project); createErr != nil {
|
||||
return fmt.Errorf("create project on server: %w", createErr)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("get project on server: %w", err)
|
||||
}
|
||||
} else {
|
||||
project.ID = serverProject.ID
|
||||
if updateErr := projectRepo.Update(&project); updateErr != nil {
|
||||
return fmt.Errorf("update project on server: %w", updateErr)
|
||||
}
|
||||
}
|
||||
|
||||
localProject, localErr := s.localDB.GetProjectByUUID(project.UUID)
|
||||
if localErr == nil {
|
||||
if project.ID > 0 {
|
||||
serverID := project.ID
|
||||
localProject.ServerID = &serverID
|
||||
}
|
||||
localProject.SyncStatus = "synced"
|
||||
now := time.Now()
|
||||
localProject.SyncedAt = &now
|
||||
_ = s.localDB.SaveProject(localProject)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) {
|
||||
var payload ProjectChangePayload
|
||||
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" {
|
||||
if payload.Operation == "" {
|
||||
payload.Operation = change.Operation
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
var project models.Project
|
||||
if err := json.Unmarshal([]byte(change.Payload), &project); err != nil {
|
||||
return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err)
|
||||
}
|
||||
|
||||
return ProjectChangePayload{
|
||||
ProjectUUID: project.UUID,
|
||||
Operation: change.Operation,
|
||||
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation),
|
||||
Snapshot: project,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// pushConfigurationChange pushes a configuration change to the server
|
||||
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
||||
switch change.Operation {
|
||||
@@ -485,7 +597,7 @@ func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -495,6 +607,9 @@ func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration owner: %w", err)
|
||||
}
|
||||
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration project: %w", err)
|
||||
}
|
||||
|
||||
// Create on server
|
||||
if err := configRepo.Create(&cfg); err != nil {
|
||||
@@ -540,7 +655,7 @@ func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -550,6 +665,9 @@ func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration owner: %w", err)
|
||||
}
|
||||
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration project: %w", err)
|
||||
}
|
||||
|
||||
// Ensure we have a server ID before updating
|
||||
// If the payload doesn't have ID, get it from local configuration
|
||||
@@ -620,6 +738,69 @@ func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configu
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
||||
if cfg == nil {
|
||||
return fmt.Errorf("configuration is nil")
|
||||
}
|
||||
|
||||
projectRepo := repository.NewProjectRepository(mariaDB)
|
||||
|
||||
if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" {
|
||||
_, err := projectRepo.GetByUUID(*cfg.ProjectUUID)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
|
||||
if localErr != nil {
|
||||
return err
|
||||
}
|
||||
modelProject := localdb.LocalToProject(localProject)
|
||||
if modelProject.OwnerUsername == "" {
|
||||
modelProject.OwnerUsername = cfg.OwnerUsername
|
||||
}
|
||||
if createErr := projectRepo.Create(modelProject); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
if modelProject.ID > 0 {
|
||||
serverID := modelProject.ID
|
||||
localProject.ServerID = &serverID
|
||||
localProject.SyncStatus = "synced"
|
||||
now := time.Now()
|
||||
localProject.SyncedAt = &now
|
||||
_ = s.localDB.SaveProject(localProject)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
systemProject := &models.Project{}
|
||||
err := mariaDB.
|
||||
Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true).
|
||||
Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC").
|
||||
First(systemProject).Error
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return err
|
||||
}
|
||||
systemProject = &models.Project{
|
||||
UUID: uuid.NewString(),
|
||||
OwnerUsername: "",
|
||||
Name: "Без проекта",
|
||||
IsActive: true,
|
||||
IsSystem: true,
|
||||
}
|
||||
if createErr := projectRepo.Create(systemProject); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
}
|
||||
|
||||
cfg.ProjectUUID = &systemProject.UUID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
|
||||
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
|
||||
return s.pushConfigurationUpdate(change)
|
||||
@@ -703,6 +884,7 @@ func decodeConfigurationChangePayload(change *localdb.PendingChange) (Configurat
|
||||
EventID: "",
|
||||
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
|
||||
ConfigurationUUID: cfg.UUID,
|
||||
ProjectUUID: cfg.ProjectUUID,
|
||||
Operation: change.Operation,
|
||||
ConflictPolicy: "last_write_wins",
|
||||
Snapshot: cfg,
|
||||
@@ -759,7 +941,7 @@ func (s *Service) loadCurrentConfigurationState(configurationUUID string) (model
|
||||
// pushConfigurationDelete deletes a configuration from the server
|
||||
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -783,3 +965,23 @@ func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getDB() (*gorm.DB, error) {
|
||||
if s.directDB != nil {
|
||||
return s.directDB, nil
|
||||
}
|
||||
if s.connMgr == nil {
|
||||
return nil, ErrOffline
|
||||
}
|
||||
return s.connMgr.GetDB()
|
||||
}
|
||||
|
||||
func (s *Service) getConnectionStatus() db.ConnectionStatus {
|
||||
if s.directDB != nil {
|
||||
return db.ConnectionStatus{IsConnected: true}
|
||||
}
|
||||
if s.connMgr == nil {
|
||||
return db.ConnectionStatus{IsConnected: false}
|
||||
}
|
||||
return s.connMgr.GetStatus()
|
||||
}
|
||||
|
||||
25
internal/services/sync/service_order_test.go
Normal file
25
internal/services/sync/service_order_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
||||
)
|
||||
|
||||
func TestPrioritizeProjectChanges(t *testing.T) {
|
||||
changes := []localdb.PendingChange{
|
||||
{ID: 1, EntityType: "configuration"},
|
||||
{ID: 2, EntityType: "project"},
|
||||
{ID: 3, EntityType: "configuration"},
|
||||
{ID: 4, EntityType: "project"},
|
||||
}
|
||||
|
||||
sorted := prioritizeProjectChanges(changes)
|
||||
if len(sorted) != 4 {
|
||||
t.Fatalf("unexpected sorted length: %d", len(sorted))
|
||||
}
|
||||
|
||||
if sorted[0].EntityType != "project" || sorted[1].EntityType != "project" {
|
||||
t.Fatalf("expected project changes first, got order: %s, %s", sorted[0].EntityType, sorted[1].EntityType)
|
||||
}
|
||||
}
|
||||
273
internal/services/sync/service_projects_push_test.go
Normal file
273
internal/services/sync/service_projects_push_test.go
Normal file
@@ -0,0 +1,273 @@
|
||||
package sync_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/models"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/services"
|
||||
syncsvc "git.mchus.pro/mchus/quoteforge/internal/services/sync"
|
||||
"github.com/glebarez/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func TestPushPendingChangesProjectsBeforeConfigurations(t *testing.T) {
|
||||
local := newLocalDBForSyncTest(t)
|
||||
serverDB := newServerDBForSyncTest(t)
|
||||
|
||||
localSync := syncsvc.NewService(nil, local)
|
||||
projectService := services.NewProjectService(local)
|
||||
configService := services.NewLocalConfigurationService(local, localSync, &services.QuoteService{}, func() bool { return false })
|
||||
|
||||
project, err := projectService.Create("tester", &services.CreateProjectRequest{Name: "Project A"})
|
||||
if err != nil {
|
||||
t.Fatalf("create project: %v", err)
|
||||
}
|
||||
|
||||
cfg, err := configService.Create("tester", &services.CreateConfigRequest{
|
||||
Name: "Cfg A",
|
||||
Items: models.ConfigItems{{LotName: "CPU_A", Quantity: 1, UnitPrice: 1000}},
|
||||
ServerCount: 1,
|
||||
ProjectUUID: &project.UUID,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create config: %v", err)
|
||||
}
|
||||
|
||||
pushService := syncsvc.NewServiceWithDB(serverDB, local)
|
||||
pushed, err := pushService.PushPendingChanges()
|
||||
if err != nil {
|
||||
t.Fatalf("push pending changes: %v", err)
|
||||
}
|
||||
if pushed < 2 {
|
||||
t.Fatalf("expected at least 2 pushed changes, got %d", pushed)
|
||||
}
|
||||
|
||||
var serverProject models.Project
|
||||
if err := serverDB.Where("uuid = ?", project.UUID).First(&serverProject).Error; err != nil {
|
||||
t.Fatalf("project not pushed to server: %v", err)
|
||||
}
|
||||
|
||||
var serverCfg models.Configuration
|
||||
if err := serverDB.Where("uuid = ?", cfg.UUID).First(&serverCfg).Error; err != nil {
|
||||
t.Fatalf("configuration not pushed to server: %v", err)
|
||||
}
|
||||
if serverCfg.ProjectUUID == nil || *serverCfg.ProjectUUID != project.UUID {
|
||||
t.Fatalf("expected project_uuid=%s on pushed config, got %v", project.UUID, serverCfg.ProjectUUID)
|
||||
}
|
||||
|
||||
if got := local.CountPendingChanges(); got != 0 {
|
||||
t.Fatalf("expected pending queue to be empty, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushPendingChangesSkipsStaleUpdateAndAppliesLatest(t *testing.T) {
|
||||
local := newLocalDBForSyncTest(t)
|
||||
serverDB := newServerDBForSyncTest(t)
|
||||
|
||||
localSync := syncsvc.NewService(nil, local)
|
||||
configService := services.NewLocalConfigurationService(local, localSync, &services.QuoteService{}, func() bool { return false })
|
||||
pushService := syncsvc.NewServiceWithDB(serverDB, local)
|
||||
|
||||
created, err := configService.Create("tester", &services.CreateConfigRequest{
|
||||
Name: "Cfg v1",
|
||||
Items: models.ConfigItems{{LotName: "CPU_A", Quantity: 1, UnitPrice: 1000}},
|
||||
ServerCount: 1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create config: %v", err)
|
||||
}
|
||||
if _, err := pushService.PushPendingChanges(); err != nil {
|
||||
t.Fatalf("initial push: %v", err)
|
||||
}
|
||||
|
||||
if _, err := configService.UpdateNoAuth(created.UUID, &services.CreateConfigRequest{
|
||||
Name: "Cfg v2",
|
||||
Items: models.ConfigItems{{LotName: "CPU_A", Quantity: 2, UnitPrice: 1000}},
|
||||
ServerCount: 1,
|
||||
ProjectUUID: created.ProjectUUID,
|
||||
}); err != nil {
|
||||
t.Fatalf("update config: %v", err)
|
||||
}
|
||||
|
||||
localCfg, err := local.GetConfigurationByUUID(created.UUID)
|
||||
if err != nil {
|
||||
t.Fatalf("get local config: %v", err)
|
||||
}
|
||||
cfgSnapshot := localdb.LocalToConfiguration(localCfg)
|
||||
stalePayload := syncsvc.ConfigurationChangePayload{
|
||||
EventID: "stale-event",
|
||||
IdempotencyKey: fmt.Sprintf("%s:v1:update", created.UUID),
|
||||
ConfigurationUUID: created.UUID,
|
||||
ProjectUUID: cfgSnapshot.ProjectUUID,
|
||||
Operation: "update",
|
||||
CurrentVersionID: "stale-v1",
|
||||
CurrentVersionNo: 1,
|
||||
ConflictPolicy: "last_write_wins",
|
||||
Snapshot: *cfgSnapshot,
|
||||
CreatedAt: time.Now().UTC().Add(-2 * time.Second),
|
||||
}
|
||||
raw, err := json.Marshal(stalePayload)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal stale payload: %v", err)
|
||||
}
|
||||
if err := local.DB().Create(&localdb.PendingChange{
|
||||
EntityType: "configuration",
|
||||
EntityUUID: created.UUID,
|
||||
Operation: "update",
|
||||
Payload: string(raw),
|
||||
CreatedAt: time.Now().Add(-1 * time.Second),
|
||||
}).Error; err != nil {
|
||||
t.Fatalf("insert stale pending change: %v", err)
|
||||
}
|
||||
|
||||
if _, err := pushService.PushPendingChanges(); err != nil {
|
||||
t.Fatalf("push pending with stale event: %v", err)
|
||||
}
|
||||
|
||||
var serverCfg models.Configuration
|
||||
if err := serverDB.Where("uuid = ?", created.UUID).First(&serverCfg).Error; err != nil {
|
||||
t.Fatalf("get server config: %v", err)
|
||||
}
|
||||
if serverCfg.Name != "Cfg v2" {
|
||||
t.Fatalf("expected latest name to win, got %q", serverCfg.Name)
|
||||
}
|
||||
if got := local.CountPendingChanges(); got != 0 {
|
||||
t.Fatalf("expected empty pending queue, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushPendingChangesCreateIsIdempotent(t *testing.T) {
|
||||
local := newLocalDBForSyncTest(t)
|
||||
serverDB := newServerDBForSyncTest(t)
|
||||
|
||||
localSync := syncsvc.NewService(nil, local)
|
||||
configService := services.NewLocalConfigurationService(local, localSync, &services.QuoteService{}, func() bool { return false })
|
||||
pushService := syncsvc.NewServiceWithDB(serverDB, local)
|
||||
|
||||
created, err := configService.Create("tester", &services.CreateConfigRequest{
|
||||
Name: "Cfg Idempotent",
|
||||
Items: models.ConfigItems{{LotName: "CPU_B", Quantity: 1, UnitPrice: 500}},
|
||||
ServerCount: 1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create config: %v", err)
|
||||
}
|
||||
if _, err := pushService.PushPendingChanges(); err != nil {
|
||||
t.Fatalf("initial push: %v", err)
|
||||
}
|
||||
|
||||
localCfg, err := local.GetConfigurationByUUID(created.UUID)
|
||||
if err != nil {
|
||||
t.Fatalf("get local config: %v", err)
|
||||
}
|
||||
currentVersionNo, currentVersionID := getCurrentVersionInfo(t, local, created.UUID, localCfg.CurrentVersionID)
|
||||
cfgSnapshot := localdb.LocalToConfiguration(localCfg)
|
||||
duplicatePayload := syncsvc.ConfigurationChangePayload{
|
||||
EventID: "duplicate-create-event",
|
||||
IdempotencyKey: fmt.Sprintf("%s:v%d:create", created.UUID, currentVersionNo),
|
||||
ConfigurationUUID: created.UUID,
|
||||
ProjectUUID: cfgSnapshot.ProjectUUID,
|
||||
Operation: "create",
|
||||
CurrentVersionID: currentVersionID,
|
||||
CurrentVersionNo: currentVersionNo,
|
||||
ConflictPolicy: "last_write_wins",
|
||||
Snapshot: *cfgSnapshot,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
}
|
||||
raw, err := json.Marshal(duplicatePayload)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal duplicate payload: %v", err)
|
||||
}
|
||||
if err := local.AddPendingChange("configuration", created.UUID, "create", string(raw)); err != nil {
|
||||
t.Fatalf("add duplicate create pending change: %v", err)
|
||||
}
|
||||
|
||||
if pushed, err := pushService.PushPendingChanges(); err != nil {
|
||||
t.Fatalf("push duplicate create: %v", err)
|
||||
} else if pushed != 1 {
|
||||
t.Fatalf("expected 1 pushed change for duplicate create, got %d", pushed)
|
||||
}
|
||||
|
||||
var count int64
|
||||
if err := serverDB.Model(&models.Configuration{}).Where("uuid = ?", created.UUID).Count(&count).Error; err != nil {
|
||||
t.Fatalf("count server configs: %v", err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Fatalf("expected one server row after idempotent create, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func newLocalDBForSyncTest(t *testing.T) *localdb.LocalDB {
|
||||
t.Helper()
|
||||
localPath := filepath.Join(t.TempDir(), "local.db")
|
||||
local, err := localdb.New(localPath)
|
||||
if err != nil {
|
||||
t.Fatalf("init local db: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { _ = local.Close() })
|
||||
return local
|
||||
}
|
||||
|
||||
func newServerDBForSyncTest(t *testing.T) *gorm.DB {
|
||||
t.Helper()
|
||||
serverPath := filepath.Join(t.TempDir(), "server.db")
|
||||
db, err := gorm.Open(sqlite.Open(serverPath), &gorm.Config{})
|
||||
if err != nil {
|
||||
t.Fatalf("open server sqlite: %v", err)
|
||||
}
|
||||
if err := db.Exec(`
|
||||
CREATE TABLE qt_projects (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
uuid TEXT NOT NULL UNIQUE,
|
||||
owner_username TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
is_active INTEGER NOT NULL DEFAULT 1,
|
||||
is_system INTEGER NOT NULL DEFAULT 0,
|
||||
created_at DATETIME,
|
||||
updated_at DATETIME
|
||||
);`).Error; err != nil {
|
||||
t.Fatalf("create qt_projects: %v", err)
|
||||
}
|
||||
if err := db.Exec(`
|
||||
CREATE TABLE qt_configurations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
uuid TEXT NOT NULL UNIQUE,
|
||||
user_id INTEGER NULL,
|
||||
owner_username TEXT NOT NULL,
|
||||
project_uuid TEXT NULL,
|
||||
app_version TEXT NULL,
|
||||
name TEXT NOT NULL,
|
||||
items TEXT NOT NULL,
|
||||
total_price REAL NULL,
|
||||
custom_price REAL NULL,
|
||||
notes TEXT NULL,
|
||||
is_template INTEGER NOT NULL DEFAULT 0,
|
||||
server_count INTEGER NOT NULL DEFAULT 1,
|
||||
price_updated_at DATETIME NULL,
|
||||
created_at DATETIME
|
||||
);`).Error; err != nil {
|
||||
t.Fatalf("create qt_configurations: %v", err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
func getCurrentVersionInfo(t *testing.T, local *localdb.LocalDB, configurationUUID string, currentVersionID *string) (int, string) {
|
||||
t.Helper()
|
||||
if currentVersionID == nil || *currentVersionID == "" {
|
||||
t.Fatalf("current version id is empty for %s", configurationUUID)
|
||||
}
|
||||
|
||||
var version localdb.LocalConfigurationVersion
|
||||
if err := local.DB().
|
||||
Where("id = ? AND configuration_uuid = ?", *currentVersionID, configurationUUID).
|
||||
First(&version).Error; err != nil {
|
||||
t.Fatalf("get current version info: %v", err)
|
||||
}
|
||||
|
||||
return version.VersionNo, version.ID
|
||||
}
|
||||
Reference in New Issue
Block a user