Files
PriceForge/internal/tasks/manager.go
Michael Chus f64c4fd6b2 feat: optimize background tasks and fix warehouse pricelist workflow
Optimize task retention from 5 minutes to 30 seconds to reduce polling overhead since toast notifications are shown only once. Add conditional warehouse pricelist creation via checkbox. Fix category storage in warehouse pricelists to properly load from lot table. Replace SSE with task polling for all long operations. Add comprehensive logging for debugging while minimizing noise from polling endpoints.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-16 11:08:10 +03:00

180 lines
4.1 KiB
Go

package tasks
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
)
// TaskFunc is the function signature for background tasks
type TaskFunc func(ctx context.Context, progressCb func(progress int, message string)) (map[string]interface{}, error)
// Manager handles background task execution and lifecycle
type Manager struct {
tasks map[string]*Task
mu sync.RWMutex
}
// NewManager creates a new task manager with automatic cleanup
func NewManager() *Manager {
m := &Manager{
tasks: make(map[string]*Task),
}
// Start cleanup goroutine
go m.cleanupLoop()
return m
}
// Submit creates and starts a new background task
func (m *Manager) Submit(taskType TaskType, fn TaskFunc) string {
taskID := uuid.New().String()
task := &Task{
ID: taskID,
Type: taskType,
Status: TaskStatusRunning,
Progress: 0,
Message: "Starting...",
CreatedAt: time.Now(),
}
m.mu.Lock()
m.tasks[taskID] = task
m.mu.Unlock()
slog.Info("Task submitted", "task_id", taskID, "type", taskType)
// Start task in goroutine with panic recovery
go func() {
defer func() {
if r := recover(); r != nil {
slog.Error("Task panicked", "task_id", taskID, "type", taskType, "panic", r)
m.updateTask(taskID, func(t *Task) {
t.Status = TaskStatusError
t.Error = fmt.Sprintf("Task panicked: %v", r)
now := time.Now()
t.DoneAt = &now
})
}
}()
// Create progress callback
progressCb := func(progress int, message string) {
slog.Debug("Task progress", "task_id", taskID, "progress", progress, "message", message)
m.updateTask(taskID, func(t *Task) {
t.Progress = progress
t.Message = message
})
}
// Execute task
slog.Info("Task starting execution", "task_id", taskID, "type", taskType)
ctx := context.Background()
result, err := fn(ctx, progressCb)
// Update final status
m.updateTask(taskID, func(t *Task) {
now := time.Now()
t.DoneAt = &now
t.Progress = 100
if err != nil {
slog.Error("Task failed", "task_id", taskID, "type", taskType, "error", err)
t.Status = TaskStatusError
t.Error = err.Error()
} else {
slog.Info("Task completed successfully", "task_id", taskID, "type", taskType, "result", result)
t.Status = TaskStatusCompleted
t.Result = result
if t.Message == "Starting..." {
t.Message = "Completed"
}
}
})
}()
return taskID
}
// List returns running tasks and recently completed tasks (last 30 seconds)
func (m *Manager) List() []*Task {
m.mu.RLock()
defer m.mu.RUnlock()
// Keep completed tasks only for 30 seconds (enough to show toast notification)
cutoff := time.Now().Add(-30 * time.Second)
tasks := make([]*Task, 0, len(m.tasks))
for _, task := range m.tasks {
// Include running tasks or recently completed tasks (within 30 seconds)
if task.Status == TaskStatusRunning ||
(task.DoneAt != nil && task.DoneAt.After(cutoff)) {
tasks = append(tasks, task)
}
}
// Sort by creation time (newest first)
for i := 0; i < len(tasks)-1; i++ {
for j := i + 1; j < len(tasks); j++ {
if tasks[i].CreatedAt.Before(tasks[j].CreatedAt) {
tasks[i], tasks[j] = tasks[j], tasks[i]
}
}
}
return tasks
}
// Get returns a single task by ID
func (m *Manager) Get(id string) (*Task, error) {
m.mu.RLock()
defer m.mu.RUnlock()
task, exists := m.tasks[id]
if !exists {
return nil, fmt.Errorf("task not found: %s", id)
}
return task, nil
}
// updateTask safely updates a task using the provided function
func (m *Manager) updateTask(id string, fn func(*Task)) {
m.mu.Lock()
defer m.mu.Unlock()
if task, exists := m.tasks[id]; exists {
fn(task)
}
}
// cleanupLoop runs periodically to remove old completed tasks
func (m *Manager) cleanupLoop() {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for range ticker.C {
m.cleanup()
}
}
// cleanup removes tasks completed more than 1 minute ago
func (m *Manager) cleanup() {
m.mu.Lock()
defer m.mu.Unlock()
cutoff := time.Now().Add(-1 * time.Minute)
for id, task := range m.tasks {
if task.DoneAt != nil && task.DoneAt.Before(cutoff) {
delete(m.tasks, id)
}
}
}