package scheduler import ( "context" "database/sql" "errors" "fmt" "log/slog" "strings" "time" "git.mchus.pro/mchus/priceforge/internal/config" "git.mchus.pro/mchus/priceforge/internal/models" "git.mchus.pro/mchus/priceforge/internal/repository" alertsvc "git.mchus.pro/mchus/priceforge/internal/services/alerts" pricingsvc "git.mchus.pro/mchus/priceforge/internal/services/pricing" "gorm.io/gorm" ) type Scheduler struct { db *gorm.DB alertService *alertsvc.Service pricingService *pricingsvc.Service statsRepo *repository.StatsRepository cfg config.SchedulerConfig logger *slog.Logger } type job struct { Name string Interval time.Duration Run func(context.Context) error } func New( db *gorm.DB, alertService *alertsvc.Service, pricingService *pricingsvc.Service, statsRepo *repository.StatsRepository, cfg config.SchedulerConfig, ) *Scheduler { return &Scheduler{ db: db, alertService: alertService, pricingService: pricingService, statsRepo: statsRepo, cfg: cfg, logger: slog.Default(), } } func (s *Scheduler) Start(ctx context.Context) { if s == nil || s.db == nil || !s.cfg.Enabled { return } pollInterval := s.cfg.PollInterval if pollInterval <= 0 { pollInterval = time.Minute } s.logger.Info("starting embedded scheduler", "poll_interval", pollInterval) s.runDueJobs(ctx) ticker := time.NewTicker(pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): s.logger.Info("embedded scheduler stopped") return case <-ticker.C: s.runDueJobs(ctx) } } } func (s *Scheduler) runDueJobs(ctx context.Context) { now := time.Now() for _, job := range s.jobs() { if job.Interval <= 0 { continue } if err := s.tryRunJob(ctx, job, now); err != nil { s.logger.Warn("scheduler job failed", "job", job.Name, "err", err) } } } func (s *Scheduler) jobs() []job { return []job{ { Name: "alerts", Interval: s.cfg.AlertsInterval, Run: func(ctx context.Context) error { if s.alertService == nil { return nil } return s.alertService.CheckAndGenerateAlerts() }, }, { Name: "update_prices", Interval: s.cfg.UpdatePricesInterval, Run: func(ctx context.Context) error { if s.pricingService == nil { return nil } updated, errs := s.pricingService.RecalculateAllPrices() if errs > 0 { return fmt.Errorf("recalculation finished with %d errors (updated=%d)", errs, updated) } return nil }, }, { Name: "update_popularity", Interval: s.cfg.UpdatePopularityInterval, Run: func(ctx context.Context) error { if s.statsRepo == nil { return nil } return s.statsRepo.UpdatePopularityScores() }, }, { Name: "reset_weekly_counters", Interval: s.cfg.ResetWeeklyCountersInterval, Run: func(ctx context.Context) error { if s.statsRepo == nil { return nil } return s.statsRepo.ResetWeeklyCounters() }, }, { Name: "reset_monthly_counters", Interval: s.cfg.ResetMonthlyCountersInterval, Run: func(ctx context.Context) error { if s.statsRepo == nil { return nil } return s.statsRepo.ResetMonthlyCounters() }, }, } } func (s *Scheduler) tryRunJob(ctx context.Context, job job, now time.Time) error { acquired, release, err := acquireDBLock(ctx, s.db, "priceforge:scheduler:"+job.Name) if err != nil { return fmt.Errorf("acquire db lock: %w", err) } if !acquired { return nil } defer release() state, err := s.getRunState(job.Name) if err != nil { return fmt.Errorf("load run state: %w", err) } if !jobDue(state.LastFinishedAt, now, job.Interval) { return nil } if err := s.markJobStarted(job.Name, now); err != nil { return fmt.Errorf("mark started: %w", err) } runErr := job.Run(ctx) finishedAt := time.Now() if err := s.markJobFinished(job.Name, finishedAt, runErr); err != nil { return fmt.Errorf("mark finished: %w", err) } if runErr != nil { return runErr } s.logger.Info("scheduler job completed", "job", job.Name, "finished_at", finishedAt) return nil } func (s *Scheduler) getRunState(jobName string) (*models.SchedulerRun, error) { var run models.SchedulerRun err := s.db.Where("job_name = ?", jobName).First(&run).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return &models.SchedulerRun{JobName: jobName}, nil } return nil, err } return &run, nil } func (s *Scheduler) markJobStarted(jobName string, startedAt time.Time) error { run := &models.SchedulerRun{ JobName: jobName, LastStartedAt: &startedAt, LastStatus: "running", LastError: "", } return s.db. Where("job_name = ?", jobName). Assign(run). FirstOrCreate(run). Error } func (s *Scheduler) markJobFinished(jobName string, finishedAt time.Time, runErr error) error { status := "success" lastError := "" if runErr != nil { status = "failed" lastError = truncateText(runErr.Error(), 2000) } return s.db.Model(&models.SchedulerRun{}). Where("job_name = ?", jobName). Updates(map[string]any{ "last_finished_at": finishedAt, "last_status": status, "last_error": lastError, }).Error } func acquireDBLock(ctx context.Context, db *gorm.DB, key string) (bool, func(), error) { sqlDB, err := db.DB() if err != nil { return false, nil, err } conn, err := sqlDB.Conn(ctx) if err != nil { return false, nil, err } var acquired sql.NullInt64 if err := conn.QueryRowContext(ctx, "SELECT GET_LOCK(?, 0)", key).Scan(&acquired); err != nil { _ = conn.Close() return false, nil, err } if !acquired.Valid || acquired.Int64 != 1 { _ = conn.Close() return false, nil, nil } release := func() { var released sql.NullInt64 if err := conn.QueryRowContext(context.Background(), "SELECT RELEASE_LOCK(?)", key).Scan(&released); err != nil { slog.Warn("failed to release scheduler db lock", "key", key, "err", err) } _ = conn.Close() } return true, release, nil } func jobDue(lastFinishedAt *time.Time, now time.Time, interval time.Duration) bool { if interval <= 0 { return false } if lastFinishedAt == nil || lastFinishedAt.IsZero() { return true } return !lastFinishedAt.Add(interval).After(now) } func truncateText(v string, limit int) string { if limit <= 0 || len(v) <= limit { return strings.TrimSpace(v) } return strings.TrimSpace(v[:limit]) }