Files
PriceForge/internal/scheduler/scheduler.go
2026-03-07 21:10:20 +03:00

266 lines
6.3 KiB
Go

package scheduler
import (
"context"
"database/sql"
"errors"
"fmt"
"log/slog"
"strings"
"time"
"git.mchus.pro/mchus/priceforge/internal/config"
"git.mchus.pro/mchus/priceforge/internal/models"
"git.mchus.pro/mchus/priceforge/internal/repository"
alertsvc "git.mchus.pro/mchus/priceforge/internal/services/alerts"
pricingsvc "git.mchus.pro/mchus/priceforge/internal/services/pricing"
"gorm.io/gorm"
)
type Scheduler struct {
db *gorm.DB
alertService *alertsvc.Service
pricingService *pricingsvc.Service
statsRepo *repository.StatsRepository
cfg config.SchedulerConfig
logger *slog.Logger
}
type job struct {
Name string
Interval time.Duration
Run func(context.Context) error
}
func New(
db *gorm.DB,
alertService *alertsvc.Service,
pricingService *pricingsvc.Service,
statsRepo *repository.StatsRepository,
cfg config.SchedulerConfig,
) *Scheduler {
return &Scheduler{
db: db,
alertService: alertService,
pricingService: pricingService,
statsRepo: statsRepo,
cfg: cfg,
logger: slog.Default(),
}
}
func (s *Scheduler) Start(ctx context.Context) {
if s == nil || s.db == nil || !s.cfg.Enabled {
return
}
pollInterval := s.cfg.PollInterval
if pollInterval <= 0 {
pollInterval = time.Minute
}
s.logger.Info("starting embedded scheduler", "poll_interval", pollInterval)
s.runDueJobs(ctx)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.logger.Info("embedded scheduler stopped")
return
case <-ticker.C:
s.runDueJobs(ctx)
}
}
}
func (s *Scheduler) runDueJobs(ctx context.Context) {
now := time.Now()
for _, job := range s.jobs() {
if job.Interval <= 0 {
continue
}
if err := s.tryRunJob(ctx, job, now); err != nil {
s.logger.Warn("scheduler job failed", "job", job.Name, "err", err)
}
}
}
func (s *Scheduler) jobs() []job {
return []job{
{
Name: "alerts",
Interval: s.cfg.AlertsInterval,
Run: func(ctx context.Context) error {
if s.alertService == nil {
return nil
}
return s.alertService.CheckAndGenerateAlerts()
},
},
{
Name: "update_prices",
Interval: s.cfg.UpdatePricesInterval,
Run: func(ctx context.Context) error {
if s.pricingService == nil {
return nil
}
updated, errs := s.pricingService.RecalculateAllPrices()
if errs > 0 {
return fmt.Errorf("recalculation finished with %d errors (updated=%d)", errs, updated)
}
return nil
},
},
{
Name: "update_popularity",
Interval: s.cfg.UpdatePopularityInterval,
Run: func(ctx context.Context) error {
if s.statsRepo == nil {
return nil
}
return s.statsRepo.UpdatePopularityScores()
},
},
{
Name: "reset_weekly_counters",
Interval: s.cfg.ResetWeeklyCountersInterval,
Run: func(ctx context.Context) error {
if s.statsRepo == nil {
return nil
}
return s.statsRepo.ResetWeeklyCounters()
},
},
{
Name: "reset_monthly_counters",
Interval: s.cfg.ResetMonthlyCountersInterval,
Run: func(ctx context.Context) error {
if s.statsRepo == nil {
return nil
}
return s.statsRepo.ResetMonthlyCounters()
},
},
}
}
func (s *Scheduler) tryRunJob(ctx context.Context, job job, now time.Time) error {
acquired, release, err := acquireDBLock(ctx, s.db, "priceforge:scheduler:"+job.Name)
if err != nil {
return fmt.Errorf("acquire db lock: %w", err)
}
if !acquired {
return nil
}
defer release()
state, err := s.getRunState(job.Name)
if err != nil {
return fmt.Errorf("load run state: %w", err)
}
if !jobDue(state.LastFinishedAt, now, job.Interval) {
return nil
}
if err := s.markJobStarted(job.Name, now); err != nil {
return fmt.Errorf("mark started: %w", err)
}
runErr := job.Run(ctx)
finishedAt := time.Now()
if err := s.markJobFinished(job.Name, finishedAt, runErr); err != nil {
return fmt.Errorf("mark finished: %w", err)
}
if runErr != nil {
return runErr
}
s.logger.Info("scheduler job completed", "job", job.Name, "finished_at", finishedAt)
return nil
}
func (s *Scheduler) getRunState(jobName string) (*models.SchedulerRun, error) {
var run models.SchedulerRun
err := s.db.Where("job_name = ?", jobName).First(&run).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return &models.SchedulerRun{JobName: jobName}, nil
}
return nil, err
}
return &run, nil
}
func (s *Scheduler) markJobStarted(jobName string, startedAt time.Time) error {
run := &models.SchedulerRun{
JobName: jobName,
LastStartedAt: &startedAt,
LastStatus: "running",
LastError: "",
}
return s.db.
Where("job_name = ?", jobName).
Assign(run).
FirstOrCreate(run).
Error
}
func (s *Scheduler) markJobFinished(jobName string, finishedAt time.Time, runErr error) error {
status := "success"
lastError := ""
if runErr != nil {
status = "failed"
lastError = truncateText(runErr.Error(), 2000)
}
return s.db.Model(&models.SchedulerRun{}).
Where("job_name = ?", jobName).
Updates(map[string]any{
"last_finished_at": finishedAt,
"last_status": status,
"last_error": lastError,
}).Error
}
func acquireDBLock(ctx context.Context, db *gorm.DB, key string) (bool, func(), error) {
sqlDB, err := db.DB()
if err != nil {
return false, nil, err
}
conn, err := sqlDB.Conn(ctx)
if err != nil {
return false, nil, err
}
var acquired sql.NullInt64
if err := conn.QueryRowContext(ctx, "SELECT GET_LOCK(?, 0)", key).Scan(&acquired); err != nil {
_ = conn.Close()
return false, nil, err
}
if !acquired.Valid || acquired.Int64 != 1 {
_ = conn.Close()
return false, nil, nil
}
release := func() {
var released sql.NullInt64
if err := conn.QueryRowContext(context.Background(), "SELECT RELEASE_LOCK(?)", key).Scan(&released); err != nil {
slog.Warn("failed to release scheduler db lock", "key", key, "err", err)
}
_ = conn.Close()
}
return true, release, nil
}
func jobDue(lastFinishedAt *time.Time, now time.Time, interval time.Duration) bool {
if interval <= 0 {
return false
}
if lastFinishedAt == nil || lastFinishedAt.IsZero() {
return true
}
return !lastFinishedAt.Add(interval).After(now)
}
func truncateText(v string, limit int) string {
if limit <= 0 || len(v) <= limit {
return strings.TrimSpace(v)
}
return strings.TrimSpace(v[:limit])
}