Files
PriceForge/internal/tasks/manager.go
2026-03-07 23:11:42 +03:00

193 lines
4.4 KiB
Go

package tasks
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// TaskFunc is the function signature for background tasks
type TaskFunc func(ctx context.Context, progressCb func(progress int, message string)) (map[string]interface{}, error)
// Manager handles background task execution and lifecycle
type Manager struct {
tasks map[string]*Task
mu sync.RWMutex
}
// NewManager creates a new task manager with automatic cleanup
func NewManager() *Manager {
m := &Manager{
tasks: make(map[string]*Task),
}
// Start cleanup goroutine
go m.cleanupLoop()
return m
}
// Submit creates and starts a new background task
func (m *Manager) Submit(taskType TaskType, fn TaskFunc) string {
taskID := uuid.New().String()
task := &Task{
ID: taskID,
Type: taskType,
Status: TaskStatusRunning,
Progress: 0,
Message: "Starting...",
CreatedAt: time.Now(),
}
m.mu.Lock()
m.tasks[taskID] = task
m.mu.Unlock()
slog.Info("Task submitted", "task_id", taskID, "type", taskType)
// Start task in goroutine with panic recovery
go func() {
defer func() {
if r := recover(); r != nil {
slog.Error("Task panicked", "task_id", taskID, "type", taskType, "panic", r)
m.updateTask(taskID, func(t *Task) {
t.Status = TaskStatusError
t.Error = fmt.Sprintf("Task panicked: %v", r)
now := time.Now()
t.DoneAt = &now
})
}
}()
// Create progress callback
progressCb := func(progress int, message string) {
slog.Debug("Task progress", "task_id", taskID, "progress", progress, "message", message)
m.updateTask(taskID, func(t *Task) {
t.Progress = progress
t.Message = message
})
}
// Execute task
slog.Info("Task starting execution", "task_id", taskID, "type", taskType)
ctx := context.Background()
result, err := fn(ctx, progressCb)
// Update final status
m.updateTask(taskID, func(t *Task) {
now := time.Now()
t.DoneAt = &now
t.Progress = 100
if err != nil {
slog.Error("Task failed", "task_id", taskID, "type", taskType, "error", err)
t.Status = TaskStatusError
t.Error = err.Error()
} else {
slog.Info("Task completed successfully", "task_id", taskID, "type", taskType, "result", result)
t.Status = TaskStatusCompleted
t.Result = result
if t.Message == "Starting..." {
t.Message = "Completed"
}
}
})
}()
return taskID
}
// List returns running tasks and recently completed tasks (last 30 seconds)
func (m *Manager) List() []*Task {
m.mu.RLock()
defer m.mu.RUnlock()
// Keep completed tasks only for 30 seconds (enough to show toast notification)
cutoff := time.Now().Add(-30 * time.Second)
tasks := make([]*Task, 0, len(m.tasks))
for _, task := range m.tasks {
// Include running tasks or recently completed tasks (within 30 seconds)
if task.Status == TaskStatusRunning ||
(task.DoneAt != nil && task.DoneAt.After(cutoff)) {
tasks = append(tasks, task)
}
}
// Sort by creation time (newest first)
for i := 0; i < len(tasks)-1; i++ {
for j := i + 1; j < len(tasks); j++ {
if tasks[i].CreatedAt.Before(tasks[j].CreatedAt) {
tasks[i], tasks[j] = tasks[j], tasks[i]
}
}
}
return tasks
}
// Get returns a single task by ID
func (m *Manager) Get(id string) (*Task, error) {
m.mu.RLock()
defer m.mu.RUnlock()
task, exists := m.tasks[id]
if !exists {
return nil, fmt.Errorf("task not found: %s", id)
}
return task, nil
}
// HasRunning reports whether there is an in-memory running task of the given type.
func (m *Manager) HasRunning(taskType TaskType) bool {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range m.tasks {
if task.Type == taskType && task.Status == TaskStatusRunning {
return true
}
}
return false
}
// updateTask safely updates a task using the provided function
func (m *Manager) updateTask(id string, fn func(*Task)) {
m.mu.Lock()
defer m.mu.Unlock()
if task, exists := m.tasks[id]; exists {
fn(task)
}
}
// cleanupLoop runs periodically to remove old completed tasks
func (m *Manager) cleanupLoop() {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for range ticker.C {
m.cleanup()
}
}
// cleanup removes tasks completed more than 1 minute ago
func (m *Manager) cleanup() {
m.mu.Lock()
defer m.mu.Unlock()
cutoff := time.Now().Add(-1 * time.Minute)
for id, task := range m.tasks {
if task.DoneAt != nil && task.DoneAt.Before(cutoff) {
delete(m.tasks, id)
}
}
}