feat: add projects flow and consolidate default project handling
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"git.mchus.pro/mchus/quoteforge/internal/localdb"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/models"
|
||||
"git.mchus.pro/mchus/quoteforge/internal/repository"
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
@@ -19,8 +20,9 @@ var ErrOffline = errors.New("database is offline")
|
||||
|
||||
// Service handles synchronization between MariaDB and local SQLite
|
||||
type Service struct {
|
||||
connMgr *db.ConnectionManager
|
||||
localDB *localdb.LocalDB
|
||||
connMgr *db.ConnectionManager
|
||||
localDB *localdb.LocalDB
|
||||
directDB *gorm.DB
|
||||
}
|
||||
|
||||
// NewService creates a new sync service
|
||||
@@ -31,6 +33,14 @@ func NewService(connMgr *db.ConnectionManager, localDB *localdb.LocalDB) *Servic
|
||||
}
|
||||
}
|
||||
|
||||
// NewServiceWithDB creates sync service that uses a direct DB handle (used in tests).
|
||||
func NewServiceWithDB(mariaDB *gorm.DB, localDB *localdb.LocalDB) *Service {
|
||||
return &Service{
|
||||
localDB: localDB,
|
||||
directDB: mariaDB,
|
||||
}
|
||||
}
|
||||
|
||||
// SyncStatus represents the current sync status
|
||||
type SyncStatus struct {
|
||||
LastSyncAt *time.Time `json:"last_sync_at"`
|
||||
@@ -52,6 +62,7 @@ type ConfigurationChangePayload struct {
|
||||
EventID string `json:"event_id"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
ConfigurationUUID string `json:"configuration_uuid"`
|
||||
ProjectUUID *string `json:"project_uuid,omitempty"`
|
||||
Operation string `json:"operation"` // create/update/rollback/deactivate/reactivate/delete
|
||||
CurrentVersionID string `json:"current_version_id,omitempty"`
|
||||
CurrentVersionNo int `json:"current_version_no,omitempty"`
|
||||
@@ -61,10 +72,19 @@ type ConfigurationChangePayload struct {
|
||||
CreatedBy *string `json:"created_by,omitempty"`
|
||||
}
|
||||
|
||||
type ProjectChangePayload struct {
|
||||
EventID string `json:"event_id"`
|
||||
IdempotencyKey string `json:"idempotency_key"`
|
||||
ProjectUUID string `json:"project_uuid"`
|
||||
Operation string `json:"operation"`
|
||||
Snapshot models.Project `json:"snapshot"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// ImportConfigurationsToLocal imports configurations from MariaDB into local SQLite.
|
||||
// Existing local configs with pending local changes are skipped to avoid data loss.
|
||||
func (s *Service) ImportConfigurationsToLocal() (*ConfigImportResult, error) {
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return nil, ErrOffline
|
||||
}
|
||||
@@ -130,9 +150,9 @@ func (s *Service) GetStatus() (*SyncStatus, error) {
|
||||
|
||||
// Count server pricelists (only if already connected, don't reconnect)
|
||||
serverCount := 0
|
||||
connStatus := s.connMgr.GetStatus()
|
||||
connStatus := s.getConnectionStatus()
|
||||
if connStatus.IsConnected {
|
||||
if mariaDB, err := s.connMgr.GetDB(); err == nil && mariaDB != nil {
|
||||
if mariaDB, err := s.getDB(); err == nil && mariaDB != nil {
|
||||
pricelistRepo := repository.NewPricelistRepository(mariaDB)
|
||||
activeCount, err := pricelistRepo.CountActive()
|
||||
if err == nil {
|
||||
@@ -170,13 +190,13 @@ func (s *Service) NeedSync() (bool, error) {
|
||||
}
|
||||
|
||||
// Check if there are new pricelists on server (only if already connected)
|
||||
connStatus := s.connMgr.GetStatus()
|
||||
connStatus := s.getConnectionStatus()
|
||||
if !connStatus.IsConnected {
|
||||
// If offline, can't check server, no need to sync
|
||||
return false, nil
|
||||
}
|
||||
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
// If offline, can't check server, no need to sync
|
||||
return false, nil
|
||||
@@ -208,7 +228,7 @@ func (s *Service) SyncPricelists() (int, error) {
|
||||
slog.Info("starting pricelist sync")
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -301,7 +321,7 @@ func (s *Service) SyncPricelistItems(localPricelistID uint) (int, error) {
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -418,8 +438,9 @@ func (s *Service) PushPendingChanges() (int, error) {
|
||||
slog.Info("pushing pending changes", "count", len(changes))
|
||||
pushed := 0
|
||||
var syncedIDs []int64
|
||||
sortedChanges := prioritizeProjectChanges(changes)
|
||||
|
||||
for _, change := range changes {
|
||||
for _, change := range sortedChanges {
|
||||
err := s.pushSingleChange(&change)
|
||||
if err != nil {
|
||||
slog.Warn("failed to push change", "id", change.ID, "type", change.EntityType, "operation", change.Operation, "error", err)
|
||||
@@ -446,6 +467,8 @@ func (s *Service) PushPendingChanges() (int, error) {
|
||||
// pushSingleChange pushes a single pending change to the server
|
||||
func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
||||
switch change.EntityType {
|
||||
case "project":
|
||||
return s.pushProjectChange(change)
|
||||
case "configuration":
|
||||
return s.pushConfigurationChange(change)
|
||||
default:
|
||||
@@ -453,6 +476,95 @@ func (s *Service) pushSingleChange(change *localdb.PendingChange) error {
|
||||
}
|
||||
}
|
||||
|
||||
func prioritizeProjectChanges(changes []localdb.PendingChange) []localdb.PendingChange {
|
||||
if len(changes) < 2 {
|
||||
return changes
|
||||
}
|
||||
|
||||
projectChanges := make([]localdb.PendingChange, 0, len(changes))
|
||||
otherChanges := make([]localdb.PendingChange, 0, len(changes))
|
||||
for _, change := range changes {
|
||||
if change.EntityType == "project" {
|
||||
projectChanges = append(projectChanges, change)
|
||||
continue
|
||||
}
|
||||
otherChanges = append(otherChanges, change)
|
||||
}
|
||||
|
||||
sorted := make([]localdb.PendingChange, 0, len(changes))
|
||||
sorted = append(sorted, projectChanges...)
|
||||
sorted = append(sorted, otherChanges...)
|
||||
return sorted
|
||||
}
|
||||
|
||||
func (s *Service) pushProjectChange(change *localdb.PendingChange) error {
|
||||
payload, err := decodeProjectChangePayload(change)
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode project payload: %w", err)
|
||||
}
|
||||
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
|
||||
projectRepo := repository.NewProjectRepository(mariaDB)
|
||||
project := payload.Snapshot
|
||||
project.UUID = payload.ProjectUUID
|
||||
|
||||
serverProject, err := projectRepo.GetByUUID(project.UUID)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
if createErr := projectRepo.Create(&project); createErr != nil {
|
||||
return fmt.Errorf("create project on server: %w", createErr)
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("get project on server: %w", err)
|
||||
}
|
||||
} else {
|
||||
project.ID = serverProject.ID
|
||||
if updateErr := projectRepo.Update(&project); updateErr != nil {
|
||||
return fmt.Errorf("update project on server: %w", updateErr)
|
||||
}
|
||||
}
|
||||
|
||||
localProject, localErr := s.localDB.GetProjectByUUID(project.UUID)
|
||||
if localErr == nil {
|
||||
if project.ID > 0 {
|
||||
serverID := project.ID
|
||||
localProject.ServerID = &serverID
|
||||
}
|
||||
localProject.SyncStatus = "synced"
|
||||
now := time.Now()
|
||||
localProject.SyncedAt = &now
|
||||
_ = s.localDB.SaveProject(localProject)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func decodeProjectChangePayload(change *localdb.PendingChange) (ProjectChangePayload, error) {
|
||||
var payload ProjectChangePayload
|
||||
if err := json.Unmarshal([]byte(change.Payload), &payload); err == nil && payload.ProjectUUID != "" {
|
||||
if payload.Operation == "" {
|
||||
payload.Operation = change.Operation
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
var project models.Project
|
||||
if err := json.Unmarshal([]byte(change.Payload), &project); err != nil {
|
||||
return ProjectChangePayload{}, fmt.Errorf("unmarshal legacy project payload: %w", err)
|
||||
}
|
||||
|
||||
return ProjectChangePayload{
|
||||
ProjectUUID: project.UUID,
|
||||
Operation: change.Operation,
|
||||
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", project.UUID, change.Operation),
|
||||
Snapshot: project,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// pushConfigurationChange pushes a configuration change to the server
|
||||
func (s *Service) pushConfigurationChange(change *localdb.PendingChange) error {
|
||||
switch change.Operation {
|
||||
@@ -485,7 +597,7 @@ func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -495,6 +607,9 @@ func (s *Service) pushConfigurationCreate(change *localdb.PendingChange) error {
|
||||
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration owner: %w", err)
|
||||
}
|
||||
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration project: %w", err)
|
||||
}
|
||||
|
||||
// Create on server
|
||||
if err := configRepo.Create(&cfg); err != nil {
|
||||
@@ -540,7 +655,7 @@ func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
}
|
||||
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -550,6 +665,9 @@ func (s *Service) pushConfigurationUpdate(change *localdb.PendingChange) error {
|
||||
if err := s.ensureConfigurationOwner(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration owner: %w", err)
|
||||
}
|
||||
if err := s.ensureConfigurationProject(mariaDB, &cfg); err != nil {
|
||||
return fmt.Errorf("resolve configuration project: %w", err)
|
||||
}
|
||||
|
||||
// Ensure we have a server ID before updating
|
||||
// If the payload doesn't have ID, get it from local configuration
|
||||
@@ -620,6 +738,69 @@ func (s *Service) ensureConfigurationOwner(mariaDB *gorm.DB, cfg *models.Configu
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) ensureConfigurationProject(mariaDB *gorm.DB, cfg *models.Configuration) error {
|
||||
if cfg == nil {
|
||||
return fmt.Errorf("configuration is nil")
|
||||
}
|
||||
|
||||
projectRepo := repository.NewProjectRepository(mariaDB)
|
||||
|
||||
if cfg.ProjectUUID != nil && *cfg.ProjectUUID != "" {
|
||||
_, err := projectRepo.GetByUUID(*cfg.ProjectUUID)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
localProject, localErr := s.localDB.GetProjectByUUID(*cfg.ProjectUUID)
|
||||
if localErr != nil {
|
||||
return err
|
||||
}
|
||||
modelProject := localdb.LocalToProject(localProject)
|
||||
if modelProject.OwnerUsername == "" {
|
||||
modelProject.OwnerUsername = cfg.OwnerUsername
|
||||
}
|
||||
if createErr := projectRepo.Create(modelProject); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
if modelProject.ID > 0 {
|
||||
serverID := modelProject.ID
|
||||
localProject.ServerID = &serverID
|
||||
localProject.SyncStatus = "synced"
|
||||
now := time.Now()
|
||||
localProject.SyncedAt = &now
|
||||
_ = s.localDB.SaveProject(localProject)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
systemProject := &models.Project{}
|
||||
err := mariaDB.
|
||||
Where("LOWER(TRIM(COALESCE(name, ''))) = LOWER(?) AND is_system = ?", "Без проекта", true).
|
||||
Order("CASE WHEN TRIM(COALESCE(owner_username, '')) = '' THEN 0 ELSE 1 END, created_at ASC, id ASC").
|
||||
First(systemProject).Error
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return err
|
||||
}
|
||||
systemProject = &models.Project{
|
||||
UUID: uuid.NewString(),
|
||||
OwnerUsername: "",
|
||||
Name: "Без проекта",
|
||||
IsActive: true,
|
||||
IsSystem: true,
|
||||
}
|
||||
if createErr := projectRepo.Create(systemProject); createErr != nil {
|
||||
return createErr
|
||||
}
|
||||
}
|
||||
|
||||
cfg.ProjectUUID = &systemProject.UUID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) pushConfigurationRollback(change *localdb.PendingChange) error {
|
||||
// Last-write-wins for now: rollback is pushed as an update with rollback metadata.
|
||||
return s.pushConfigurationUpdate(change)
|
||||
@@ -703,6 +884,7 @@ func decodeConfigurationChangePayload(change *localdb.PendingChange) (Configurat
|
||||
EventID: "",
|
||||
IdempotencyKey: fmt.Sprintf("%s:%s:legacy", cfg.UUID, change.Operation),
|
||||
ConfigurationUUID: cfg.UUID,
|
||||
ProjectUUID: cfg.ProjectUUID,
|
||||
Operation: change.Operation,
|
||||
ConflictPolicy: "last_write_wins",
|
||||
Snapshot: cfg,
|
||||
@@ -759,7 +941,7 @@ func (s *Service) loadCurrentConfigurationState(configurationUUID string) (model
|
||||
// pushConfigurationDelete deletes a configuration from the server
|
||||
func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
// Get database connection
|
||||
mariaDB, err := s.connMgr.GetDB()
|
||||
mariaDB, err := s.getDB()
|
||||
if err != nil {
|
||||
return fmt.Errorf("database not available: %w", err)
|
||||
}
|
||||
@@ -783,3 +965,23 @@ func (s *Service) pushConfigurationDelete(change *localdb.PendingChange) error {
|
||||
slog.Info("configuration deleted on server", "uuid", change.EntityUUID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getDB() (*gorm.DB, error) {
|
||||
if s.directDB != nil {
|
||||
return s.directDB, nil
|
||||
}
|
||||
if s.connMgr == nil {
|
||||
return nil, ErrOffline
|
||||
}
|
||||
return s.connMgr.GetDB()
|
||||
}
|
||||
|
||||
func (s *Service) getConnectionStatus() db.ConnectionStatus {
|
||||
if s.directDB != nil {
|
||||
return db.ConnectionStatus{IsConnected: true}
|
||||
}
|
||||
if s.connMgr == nil {
|
||||
return db.ConnectionStatus{IsConnected: false}
|
||||
}
|
||||
return s.connMgr.GetStatus()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user