- Added background task manager with goroutine execution and panic recovery - Replaced SSE streaming with background task execution for: * Price recalculation (RecalculateAll) * Stock import (ImportStockLog) * Pricelist creation (CreateWithProgress) - Implemented unified polling for task status and DB connection in frontend - Added task indicator in top bar showing running tasks count - Added toast notifications for task completion/error - Tasks automatically cleaned up after 10 minutes - Tasks show progress (0-100%) with descriptive messages - Updated handler constructors to receive task manager - Added API endpoints for task status (/api/tasks, /api/tasks/:id) Fixes issue with SSE disconnection on slow connections during long-running operations
171 lines
3.5 KiB
Go
171 lines
3.5 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// TaskFunc is the function signature for background tasks
|
|
type TaskFunc func(ctx context.Context, progressCb func(progress int, message string)) (map[string]interface{}, error)
|
|
|
|
// Manager handles background task execution and lifecycle
|
|
type Manager struct {
|
|
tasks map[string]*Task
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewManager creates a new task manager with automatic cleanup
|
|
func NewManager() *Manager {
|
|
m := &Manager{
|
|
tasks: make(map[string]*Task),
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
go m.cleanupLoop()
|
|
|
|
return m
|
|
}
|
|
|
|
// Submit creates and starts a new background task
|
|
func (m *Manager) Submit(taskType TaskType, fn TaskFunc) string {
|
|
taskID := uuid.New().String()
|
|
|
|
task := &Task{
|
|
ID: taskID,
|
|
Type: taskType,
|
|
Status: TaskStatusRunning,
|
|
Progress: 0,
|
|
Message: "Starting...",
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.tasks[taskID] = task
|
|
m.mu.Unlock()
|
|
|
|
// Start task in goroutine with panic recovery
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
m.updateTask(taskID, func(t *Task) {
|
|
t.Status = TaskStatusError
|
|
t.Error = fmt.Sprintf("Task panicked: %v", r)
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
})
|
|
}
|
|
}()
|
|
|
|
// Create progress callback
|
|
progressCb := func(progress int, message string) {
|
|
m.updateTask(taskID, func(t *Task) {
|
|
t.Progress = progress
|
|
t.Message = message
|
|
})
|
|
}
|
|
|
|
// Execute task
|
|
ctx := context.Background()
|
|
result, err := fn(ctx, progressCb)
|
|
|
|
// Update final status
|
|
m.updateTask(taskID, func(t *Task) {
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
t.Progress = 100
|
|
|
|
if err != nil {
|
|
t.Status = TaskStatusError
|
|
t.Error = err.Error()
|
|
} else {
|
|
t.Status = TaskStatusCompleted
|
|
t.Result = result
|
|
if t.Message == "Starting..." {
|
|
t.Message = "Completed"
|
|
}
|
|
}
|
|
})
|
|
}()
|
|
|
|
return taskID
|
|
}
|
|
|
|
// List returns running tasks and completed tasks from the last 5 minutes
|
|
func (m *Manager) List() []*Task {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
cutoff := time.Now().Add(-5 * time.Minute)
|
|
tasks := make([]*Task, 0, len(m.tasks))
|
|
|
|
for _, task := range m.tasks {
|
|
// Include running tasks or recently completed tasks
|
|
if task.Status == TaskStatusRunning ||
|
|
(task.DoneAt != nil && task.DoneAt.After(cutoff)) {
|
|
tasks = append(tasks, task)
|
|
}
|
|
}
|
|
|
|
// Sort by creation time (newest first)
|
|
for i := 0; i < len(tasks)-1; i++ {
|
|
for j := i + 1; j < len(tasks); j++ {
|
|
if tasks[i].CreatedAt.Before(tasks[j].CreatedAt) {
|
|
tasks[i], tasks[j] = tasks[j], tasks[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
return tasks
|
|
}
|
|
|
|
// Get returns a single task by ID
|
|
func (m *Manager) Get(id string) (*Task, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
task, exists := m.tasks[id]
|
|
if !exists {
|
|
return nil, fmt.Errorf("task not found: %s", id)
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
// updateTask safely updates a task using the provided function
|
|
func (m *Manager) updateTask(id string, fn func(*Task)) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if task, exists := m.tasks[id]; exists {
|
|
fn(task)
|
|
}
|
|
}
|
|
|
|
// cleanupLoop runs periodically to remove old completed tasks
|
|
func (m *Manager) cleanupLoop() {
|
|
ticker := time.NewTicker(2 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
m.cleanup()
|
|
}
|
|
}
|
|
|
|
// cleanup removes tasks completed more than 10 minutes ago
|
|
func (m *Manager) cleanup() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
cutoff := time.Now().Add(-10 * time.Minute)
|
|
|
|
for id, task := range m.tasks {
|
|
if task.DoneAt != nil && task.DoneAt.Before(cutoff) {
|
|
delete(m.tasks, id)
|
|
}
|
|
}
|
|
}
|