- Added background task manager with goroutine execution and panic recovery - Replaced SSE streaming with background task execution for: * Price recalculation (RecalculateAll) * Stock import (ImportStockLog) * Pricelist creation (CreateWithProgress) - Implemented unified polling for task status and DB connection in frontend - Added task indicator in top bar showing running tasks count - Added toast notifications for task completion/error - Tasks automatically cleaned up after 10 minutes - Tasks show progress (0-100%) with descriptive messages - Updated handler constructors to receive task manager - Added API endpoints for task status (/api/tasks, /api/tasks/:id) Fixes issue with SSE disconnection on slow connections during long-running operations
575 lines
16 KiB
Go
575 lines
16 KiB
Go
package pricing
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.mchus.pro/mchus/priceforge/internal/config"
|
|
"git.mchus.pro/mchus/priceforge/internal/models"
|
|
"git.mchus.pro/mchus/priceforge/internal/repository"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type Service struct {
|
|
componentRepo *repository.ComponentRepository
|
|
priceRepo *repository.PriceRepository
|
|
config config.PricingConfig
|
|
db *gorm.DB
|
|
}
|
|
|
|
type RecalculateProgress struct {
|
|
Current int
|
|
Total int
|
|
LotName string
|
|
Updated int
|
|
Errors int
|
|
}
|
|
|
|
func NewService(
|
|
componentRepo *repository.ComponentRepository,
|
|
priceRepo *repository.PriceRepository,
|
|
cfg config.PricingConfig,
|
|
) *Service {
|
|
var db *gorm.DB
|
|
if componentRepo != nil {
|
|
db = componentRepo.DB()
|
|
}
|
|
|
|
return &Service{
|
|
componentRepo: componentRepo,
|
|
priceRepo: priceRepo,
|
|
config: cfg,
|
|
db: db,
|
|
}
|
|
}
|
|
|
|
// GetEffectivePrice returns the current effective price for a component
|
|
// Priority: active override > calculated price > nil
|
|
func (s *Service) GetEffectivePrice(lotName string) (*float64, error) {
|
|
// Check for active override first
|
|
override, err := s.priceRepo.GetPriceOverride(lotName)
|
|
if err == nil && override != nil {
|
|
return &override.Price, nil
|
|
}
|
|
|
|
// Get component metadata
|
|
component, err := s.componentRepo.GetByLotName(lotName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return component.CurrentPrice, nil
|
|
}
|
|
|
|
// CalculatePrice calculates price using the specified method
|
|
func (s *Service) CalculatePrice(lotName string, method models.PriceMethod, periodDays int) (float64, error) {
|
|
if periodDays == 0 {
|
|
periodDays = s.config.DefaultPeriodDays
|
|
}
|
|
|
|
points, err := s.priceRepo.GetPriceHistory(lotName, periodDays)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(points) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
prices := make([]float64, len(points))
|
|
for i, p := range points {
|
|
prices[i] = p.Price
|
|
}
|
|
|
|
switch method {
|
|
case models.PriceMethodAverage:
|
|
return CalculateAverage(prices), nil
|
|
case models.PriceMethodWeightedMedian:
|
|
return CalculateWeightedMedian(points, periodDays), nil
|
|
case models.PriceMethodMedian:
|
|
fallthrough
|
|
default:
|
|
return CalculateMedian(prices), nil
|
|
}
|
|
}
|
|
|
|
// UpdateComponentPrice recalculates and updates the price for a component
|
|
func (s *Service) UpdateComponentPrice(lotName string) error {
|
|
component, err := s.componentRepo.GetByLotName(lotName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
price, err := s.CalculatePrice(lotName, component.PriceMethod, component.PricePeriodDays)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
now := time.Now()
|
|
if price > 0 {
|
|
component.CurrentPrice = &price
|
|
component.PriceUpdatedAt = &now
|
|
}
|
|
|
|
return s.componentRepo.Update(component)
|
|
}
|
|
|
|
// SetManualPrice sets a manual price override
|
|
func (s *Service) SetManualPrice(lotName string, price float64, reason string, userID uint) error {
|
|
override := &models.PriceOverride{
|
|
LotName: lotName,
|
|
Price: price,
|
|
ValidFrom: time.Now(),
|
|
Reason: reason,
|
|
CreatedBy: userID,
|
|
}
|
|
return s.priceRepo.CreatePriceOverride(override)
|
|
}
|
|
|
|
// UpdatePriceMethod changes the pricing method for a component
|
|
func (s *Service) UpdatePriceMethod(lotName string, method models.PriceMethod, periodDays int) error {
|
|
component, err := s.componentRepo.GetByLotName(lotName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
component.PriceMethod = method
|
|
if periodDays > 0 {
|
|
component.PricePeriodDays = periodDays
|
|
}
|
|
|
|
if err := s.componentRepo.Update(component); err != nil {
|
|
return err
|
|
}
|
|
|
|
return s.UpdateComponentPrice(lotName)
|
|
}
|
|
|
|
// GetPriceStats returns statistics for a component's price history
|
|
func (s *Service) GetPriceStats(lotName string, periodDays int) (*PriceStats, error) {
|
|
if periodDays == 0 {
|
|
periodDays = s.config.DefaultPeriodDays
|
|
}
|
|
|
|
points, err := s.priceRepo.GetPriceHistory(lotName, periodDays)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(points) == 0 {
|
|
return &PriceStats{QuoteCount: 0}, nil
|
|
}
|
|
|
|
prices := make([]float64, len(points))
|
|
for i, p := range points {
|
|
prices[i] = p.Price
|
|
}
|
|
|
|
return &PriceStats{
|
|
QuoteCount: len(points),
|
|
MinPrice: CalculatePercentile(prices, 0),
|
|
MaxPrice: CalculatePercentile(prices, 100),
|
|
MedianPrice: CalculateMedian(prices),
|
|
AveragePrice: CalculateAverage(prices),
|
|
StdDeviation: CalculateStdDev(prices),
|
|
LatestPrice: points[0].Price,
|
|
LatestDate: points[0].Date,
|
|
OldestDate: points[len(points)-1].Date,
|
|
Percentile25: CalculatePercentile(prices, 25),
|
|
Percentile75: CalculatePercentile(prices, 75),
|
|
}, nil
|
|
}
|
|
|
|
type PriceStats struct {
|
|
QuoteCount int `json:"quote_count"`
|
|
MinPrice float64 `json:"min_price"`
|
|
MaxPrice float64 `json:"max_price"`
|
|
MedianPrice float64 `json:"median_price"`
|
|
AveragePrice float64 `json:"average_price"`
|
|
StdDeviation float64 `json:"std_deviation"`
|
|
LatestPrice float64 `json:"latest_price"`
|
|
LatestDate time.Time `json:"latest_date"`
|
|
OldestDate time.Time `json:"oldest_date"`
|
|
Percentile25 float64 `json:"percentile_25"`
|
|
Percentile75 float64 `json:"percentile_75"`
|
|
}
|
|
|
|
// RecalculateAllPrices recalculates prices for all components
|
|
func (s *Service) RecalculateAllPrices() (updated int, errors int) {
|
|
return s.RecalculateAllPricesWithProgress(nil)
|
|
}
|
|
|
|
// RecalculateAllPricesWithProgress recalculates prices and reports progress.
|
|
func (s *Service) RecalculateAllPricesWithProgress(onProgress func(RecalculateProgress)) (updated int, errors int) {
|
|
if s.db == nil {
|
|
return 0, 0
|
|
}
|
|
|
|
// Logic mirrors "Обновить цены" in admin pricing.
|
|
var components []models.LotMetadata
|
|
if err := s.db.Find(&components).Error; err != nil {
|
|
return 0, len(components)
|
|
}
|
|
total := len(components)
|
|
|
|
var allLotNames []string
|
|
if err := s.db.Model(&models.LotMetadata{}).Pluck("lot_name", &allLotNames).Error; err != nil {
|
|
// Log error but continue
|
|
allLotNames = []string{}
|
|
}
|
|
|
|
type lotDate struct {
|
|
Lot string
|
|
Date time.Time
|
|
}
|
|
var latestDates []lotDate
|
|
if err := s.db.Raw(`SELECT lot, MAX(date) as date FROM lot_log GROUP BY lot`).Scan(&latestDates).Error; err != nil {
|
|
// Log error but continue
|
|
latestDates = []lotDate{}
|
|
}
|
|
lotLatestDate := make(map[string]time.Time, len(latestDates))
|
|
for _, ld := range latestDates {
|
|
lotLatestDate[ld.Lot] = ld.Date
|
|
}
|
|
|
|
var skipped, manual, unchanged int
|
|
now := time.Now()
|
|
current := 0
|
|
failedComponents := make([]models.LotMetadata, 0) // Track failed components for retry
|
|
lastReportedProgress := 0
|
|
|
|
for _, comp := range components {
|
|
current++
|
|
fmt.Printf("[Price] 🔄 Processing %d/%d: %s\n", current, total, comp.LotName)
|
|
|
|
reportProgress := func() {
|
|
if onProgress != nil {
|
|
// Report progress every 10 components OR on last component to avoid SSE overload
|
|
if current%10 == 0 || current == total || current-lastReportedProgress >= 10 {
|
|
lastReportedProgress = current
|
|
onProgress(RecalculateProgress{
|
|
Current: current,
|
|
Total: total,
|
|
LotName: comp.LotName,
|
|
Updated: updated,
|
|
Errors: errors,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
if comp.ManualPrice != nil && *comp.ManualPrice > 0 {
|
|
manual++
|
|
fmt.Printf("[Price] ⏭️ Skipped %s (manual price set)\n", comp.LotName)
|
|
reportProgress()
|
|
continue
|
|
}
|
|
|
|
method := comp.PriceMethod
|
|
if method == "" {
|
|
method = models.PriceMethodMedian
|
|
}
|
|
|
|
var sourceLots []string
|
|
if comp.MetaPrices != "" {
|
|
sourceLots = expandMetaPricesWithCache(comp.MetaPrices, comp.LotName, allLotNames)
|
|
} else {
|
|
sourceLots = []string{comp.LotName}
|
|
}
|
|
if len(sourceLots) == 0 {
|
|
skipped++
|
|
reportProgress()
|
|
continue
|
|
}
|
|
|
|
if comp.PriceUpdatedAt != nil {
|
|
hasNewData := false
|
|
for _, lot := range sourceLots {
|
|
if latestDate, ok := lotLatestDate[lot]; ok && latestDate.After(*comp.PriceUpdatedAt) {
|
|
hasNewData = true
|
|
break
|
|
}
|
|
}
|
|
if !hasNewData {
|
|
unchanged++
|
|
reportProgress()
|
|
continue
|
|
}
|
|
}
|
|
|
|
var prices []float64
|
|
// Add timeout for SQL queries to prevent hanging on slow connections
|
|
fmt.Printf("[Price] 🔍 Querying prices for %s (lots: %d, period: %d days)\n", comp.LotName, len(sourceLots), comp.PricePeriodDays)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
dbWithTimeout := s.db.WithContext(ctx)
|
|
|
|
queryStart := time.Now()
|
|
var queryErr error
|
|
if comp.PricePeriodDays > 0 {
|
|
queryErr = dbWithTimeout.Raw(
|
|
`SELECT price FROM lot_log WHERE lot IN ? AND date >= DATE_SUB(NOW(), INTERVAL ? DAY) ORDER BY price`,
|
|
sourceLots, comp.PricePeriodDays,
|
|
).Pluck("price", &prices).Error
|
|
} else {
|
|
queryErr = dbWithTimeout.Raw(
|
|
`SELECT price FROM lot_log WHERE lot IN ? ORDER BY price`,
|
|
sourceLots,
|
|
).Pluck("price", &prices).Error
|
|
}
|
|
queryDuration := time.Since(queryStart)
|
|
cancel()
|
|
|
|
// If query failed or timed out, count as error and save for retry
|
|
if queryErr != nil {
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
slog.Error("Price query timeout", "lot", comp.LotName, "duration", queryDuration)
|
|
fmt.Printf("[Price] ❌ TIMEOUT querying %s after %v\n", comp.LotName, queryDuration)
|
|
} else {
|
|
slog.Error("Price query error", "lot", comp.LotName, "error", queryErr, "duration", queryDuration)
|
|
fmt.Printf("[Price] ❌ ERROR querying %s after %v: %v\n", comp.LotName, queryDuration, queryErr)
|
|
}
|
|
errors++
|
|
failedComponents = append(failedComponents, comp)
|
|
reportProgress()
|
|
continue
|
|
}
|
|
fmt.Printf("[Price] ✅ Found %d prices for %s in %v\n", len(prices), comp.LotName, queryDuration)
|
|
|
|
if len(prices) == 0 && comp.PricePeriodDays > 0 {
|
|
fmt.Printf("[Price] 🔄 No prices in period, trying all-time for %s\n", comp.LotName)
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
|
|
queryStart2 := time.Now()
|
|
queryErr = s.db.WithContext(ctx2).Raw(`SELECT price FROM lot_log WHERE lot IN ? ORDER BY price`, sourceLots).Pluck("price", &prices).Error
|
|
queryDuration2 := time.Since(queryStart2)
|
|
cancel2()
|
|
if queryErr != nil {
|
|
if ctx2.Err() == context.DeadlineExceeded {
|
|
fmt.Printf("[Price] ❌ TIMEOUT all-time query for %s after %v\n", comp.LotName, queryDuration2)
|
|
} else {
|
|
fmt.Printf("[Price] ❌ ERROR all-time query for %s: %v\n", comp.LotName, queryErr)
|
|
}
|
|
errors++
|
|
failedComponents = append(failedComponents, comp)
|
|
reportProgress()
|
|
continue
|
|
}
|
|
fmt.Printf("[Price] ✅ Found %d all-time prices for %s\n", len(prices), comp.LotName)
|
|
}
|
|
if len(prices) == 0 {
|
|
skipped++
|
|
reportProgress()
|
|
continue
|
|
}
|
|
|
|
var basePrice float64
|
|
switch method {
|
|
case models.PriceMethodAverage:
|
|
basePrice = CalculateAverage(prices)
|
|
default:
|
|
basePrice = CalculateMedian(prices)
|
|
}
|
|
if basePrice <= 0 {
|
|
skipped++
|
|
reportProgress()
|
|
continue
|
|
}
|
|
|
|
finalPrice := basePrice
|
|
if comp.PriceCoefficient != 0 {
|
|
finalPrice = finalPrice * (1 + comp.PriceCoefficient/100)
|
|
}
|
|
|
|
fmt.Printf("[Price] 💾 Updating %s: %.2f\n", comp.LotName, finalPrice)
|
|
|
|
// Add timeout for UPDATE query
|
|
ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second)
|
|
updateStart := time.Now()
|
|
updateErr := s.db.WithContext(ctx3).Model(&models.LotMetadata{}).
|
|
Where("lot_name = ?", comp.LotName).
|
|
Updates(map[string]interface{}{
|
|
"current_price": finalPrice,
|
|
"price_updated_at": now,
|
|
}).Error
|
|
updateDuration := time.Since(updateStart)
|
|
cancel3()
|
|
|
|
if updateErr != nil {
|
|
if ctx3.Err() == context.DeadlineExceeded {
|
|
fmt.Printf("[Price] ❌ TIMEOUT updating %s after %v\n", comp.LotName, updateDuration)
|
|
} else {
|
|
fmt.Printf("[Price] ❌ ERROR updating %s: %v\n", comp.LotName, updateErr)
|
|
}
|
|
errors++
|
|
failedComponents = append(failedComponents, comp)
|
|
} else {
|
|
fmt.Printf("[Price] ✅ Updated %s in %v (total: %d)\n", comp.LotName, updateDuration, updated+1)
|
|
updated++
|
|
}
|
|
|
|
reportProgress()
|
|
}
|
|
|
|
// Retry failed components once
|
|
if len(failedComponents) > 0 {
|
|
fmt.Printf("[Price] 🔁 Retry phase: %d failed components\n", len(failedComponents))
|
|
slog.Info("Starting retry phase", "failed_count", len(failedComponents))
|
|
if onProgress != nil {
|
|
onProgress(RecalculateProgress{
|
|
Current: total,
|
|
Total: total,
|
|
LotName: "",
|
|
Updated: updated,
|
|
Errors: errors,
|
|
})
|
|
}
|
|
}
|
|
|
|
retryCount := 0
|
|
for _, comp := range failedComponents {
|
|
retryCount++
|
|
fmt.Printf("[Price] 🔁 Retry %d/%d: %s\n", retryCount, len(failedComponents), comp.LotName)
|
|
if onProgress != nil {
|
|
onProgress(RecalculateProgress{
|
|
Current: total + retryCount,
|
|
Total: total + len(failedComponents),
|
|
LotName: comp.LotName,
|
|
Updated: updated,
|
|
Errors: errors,
|
|
})
|
|
}
|
|
|
|
method := comp.PriceMethod
|
|
if method == "" {
|
|
method = models.PriceMethodMedian
|
|
}
|
|
|
|
var sourceLots []string
|
|
if comp.MetaPrices != "" {
|
|
sourceLots = expandMetaPricesWithCache(comp.MetaPrices, comp.LotName, allLotNames)
|
|
} else {
|
|
sourceLots = []string{comp.LotName}
|
|
}
|
|
if len(sourceLots) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Retry with shorter timeout (5 seconds)
|
|
var prices []float64
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
var queryErr error
|
|
if comp.PricePeriodDays > 0 {
|
|
queryErr = s.db.WithContext(ctx).Raw(
|
|
`SELECT price FROM lot_log WHERE lot IN ? AND date >= DATE_SUB(NOW(), INTERVAL ? DAY) ORDER BY price`,
|
|
sourceLots, comp.PricePeriodDays,
|
|
).Pluck("price", &prices).Error
|
|
} else {
|
|
queryErr = s.db.WithContext(ctx).Raw(
|
|
`SELECT price FROM lot_log WHERE lot IN ? ORDER BY price`,
|
|
sourceLots,
|
|
).Pluck("price", &prices).Error
|
|
}
|
|
cancel()
|
|
|
|
if queryErr != nil {
|
|
continue
|
|
}
|
|
|
|
if len(prices) == 0 && comp.PricePeriodDays > 0 {
|
|
ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
|
|
queryErr = s.db.WithContext(ctx2).Raw(`SELECT price FROM lot_log WHERE lot IN ? ORDER BY price`, sourceLots).Pluck("price", &prices).Error
|
|
cancel2()
|
|
if queryErr != nil {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(prices) == 0 {
|
|
continue
|
|
}
|
|
|
|
var basePrice float64
|
|
switch method {
|
|
case models.PriceMethodAverage:
|
|
basePrice = CalculateAverage(prices)
|
|
default:
|
|
basePrice = CalculateMedian(prices)
|
|
}
|
|
if basePrice <= 0 {
|
|
continue
|
|
}
|
|
|
|
finalPrice := basePrice
|
|
if comp.PriceCoefficient != 0 {
|
|
finalPrice = finalPrice * (1 + comp.PriceCoefficient/100)
|
|
}
|
|
|
|
ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second)
|
|
updateErr := s.db.WithContext(ctx3).Model(&models.LotMetadata{}).
|
|
Where("lot_name = ?", comp.LotName).
|
|
Updates(map[string]interface{}{
|
|
"current_price": finalPrice,
|
|
"price_updated_at": now,
|
|
}).Error
|
|
cancel3()
|
|
|
|
if updateErr == nil {
|
|
fmt.Printf("[Price] ✅ Retry SUCCESS for %s (price: %.2f)\n", comp.LotName, finalPrice)
|
|
updated++
|
|
errors-- // Decrease error count on successful retry
|
|
} else {
|
|
fmt.Printf("[Price] ❌ Retry FAILED for %s: %v\n", comp.LotName, updateErr)
|
|
}
|
|
}
|
|
|
|
if len(failedComponents) > 0 {
|
|
fmt.Printf("[Price] 🔁 Retry phase complete: %d recovered, %d still failed\n", len(failedComponents)-(errors-len(failedComponents)), errors)
|
|
}
|
|
|
|
if onProgress != nil && total == 0 {
|
|
onProgress(RecalculateProgress{
|
|
Current: 0,
|
|
Total: 0,
|
|
LotName: "",
|
|
Updated: updated,
|
|
Errors: errors,
|
|
})
|
|
}
|
|
|
|
fmt.Printf("[Price] ✅ Complete: %d updated, %d errors, %d skipped, %d manual, %d unchanged\n", updated, errors, skipped, manual, unchanged)
|
|
slog.Info("Price recalculation complete", "updated", updated, "errors", errors, "skipped", skipped, "manual", manual, "unchanged", unchanged)
|
|
|
|
return updated, errors
|
|
}
|
|
|
|
func expandMetaPricesWithCache(metaPrices, excludeLot string, allLotNames []string) []string {
|
|
sources := strings.Split(metaPrices, ",")
|
|
var result []string
|
|
seen := make(map[string]bool)
|
|
|
|
for _, source := range sources {
|
|
source = strings.TrimSpace(source)
|
|
if source == "" || source == excludeLot {
|
|
continue
|
|
}
|
|
|
|
if strings.HasSuffix(source, "*") {
|
|
prefix := strings.TrimSuffix(source, "*")
|
|
for _, lot := range allLotNames {
|
|
if strings.HasPrefix(lot, prefix) && lot != excludeLot && !seen[lot] {
|
|
result = append(result, lot)
|
|
seen[lot] = true
|
|
}
|
|
}
|
|
} else if !seen[source] {
|
|
result = append(result, source)
|
|
seen[source] = true
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|