193 lines
4.4 KiB
Go
193 lines
4.4 KiB
Go
package tasks
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// TaskFunc is the function signature for background tasks
|
|
type TaskFunc func(ctx context.Context, progressCb func(progress int, message string)) (map[string]interface{}, error)
|
|
|
|
// Manager handles background task execution and lifecycle
|
|
type Manager struct {
|
|
tasks map[string]*Task
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewManager creates a new task manager with automatic cleanup
|
|
func NewManager() *Manager {
|
|
m := &Manager{
|
|
tasks: make(map[string]*Task),
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
go m.cleanupLoop()
|
|
|
|
return m
|
|
}
|
|
|
|
// Submit creates and starts a new background task
|
|
func (m *Manager) Submit(taskType TaskType, fn TaskFunc) string {
|
|
taskID := uuid.New().String()
|
|
|
|
task := &Task{
|
|
ID: taskID,
|
|
Type: taskType,
|
|
Status: TaskStatusRunning,
|
|
Progress: 0,
|
|
Message: "Starting...",
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
m.mu.Lock()
|
|
m.tasks[taskID] = task
|
|
m.mu.Unlock()
|
|
|
|
slog.Info("Task submitted", "task_id", taskID, "type", taskType)
|
|
|
|
// Start task in goroutine with panic recovery
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
slog.Error("Task panicked", "task_id", taskID, "type", taskType, "panic", r)
|
|
m.updateTask(taskID, func(t *Task) {
|
|
t.Status = TaskStatusError
|
|
t.Error = fmt.Sprintf("Task panicked: %v", r)
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
})
|
|
}
|
|
}()
|
|
|
|
// Create progress callback
|
|
progressCb := func(progress int, message string) {
|
|
slog.Debug("Task progress", "task_id", taskID, "progress", progress, "message", message)
|
|
m.updateTask(taskID, func(t *Task) {
|
|
t.Progress = progress
|
|
t.Message = message
|
|
})
|
|
}
|
|
|
|
// Execute task
|
|
slog.Info("Task starting execution", "task_id", taskID, "type", taskType)
|
|
ctx := context.Background()
|
|
result, err := fn(ctx, progressCb)
|
|
|
|
// Update final status
|
|
m.updateTask(taskID, func(t *Task) {
|
|
now := time.Now()
|
|
t.DoneAt = &now
|
|
t.Progress = 100
|
|
|
|
if err != nil {
|
|
slog.Error("Task failed", "task_id", taskID, "type", taskType, "error", err)
|
|
t.Status = TaskStatusError
|
|
t.Error = err.Error()
|
|
} else {
|
|
slog.Info("Task completed successfully", "task_id", taskID, "type", taskType, "result", result)
|
|
t.Status = TaskStatusCompleted
|
|
t.Result = result
|
|
if t.Message == "Starting..." {
|
|
t.Message = "Completed"
|
|
}
|
|
}
|
|
})
|
|
}()
|
|
|
|
return taskID
|
|
}
|
|
|
|
// List returns running tasks and recently completed tasks (last 30 seconds)
|
|
func (m *Manager) List() []*Task {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
// Keep completed tasks only for 30 seconds (enough to show toast notification)
|
|
cutoff := time.Now().Add(-30 * time.Second)
|
|
tasks := make([]*Task, 0, len(m.tasks))
|
|
|
|
for _, task := range m.tasks {
|
|
// Include running tasks or recently completed tasks (within 30 seconds)
|
|
if task.Status == TaskStatusRunning ||
|
|
(task.DoneAt != nil && task.DoneAt.After(cutoff)) {
|
|
tasks = append(tasks, task)
|
|
}
|
|
}
|
|
|
|
// Sort by creation time (newest first)
|
|
for i := 0; i < len(tasks)-1; i++ {
|
|
for j := i + 1; j < len(tasks); j++ {
|
|
if tasks[i].CreatedAt.Before(tasks[j].CreatedAt) {
|
|
tasks[i], tasks[j] = tasks[j], tasks[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
return tasks
|
|
}
|
|
|
|
// Get returns a single task by ID
|
|
func (m *Manager) Get(id string) (*Task, error) {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
task, exists := m.tasks[id]
|
|
if !exists {
|
|
return nil, fmt.Errorf("task not found: %s", id)
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
// HasRunning reports whether there is an in-memory running task of the given type.
|
|
func (m *Manager) HasRunning(taskType TaskType) bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for _, task := range m.tasks {
|
|
if task.Type == taskType && task.Status == TaskStatusRunning {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// updateTask safely updates a task using the provided function
|
|
func (m *Manager) updateTask(id string, fn func(*Task)) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
if task, exists := m.tasks[id]; exists {
|
|
fn(task)
|
|
}
|
|
}
|
|
|
|
// cleanupLoop runs periodically to remove old completed tasks
|
|
func (m *Manager) cleanupLoop() {
|
|
ticker := time.NewTicker(2 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
m.cleanup()
|
|
}
|
|
}
|
|
|
|
// cleanup removes tasks completed more than 1 minute ago
|
|
func (m *Manager) cleanup() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
cutoff := time.Now().Add(-1 * time.Minute)
|
|
|
|
for id, task := range m.tasks {
|
|
if task.DoneAt != nil && task.DoneAt.Before(cutoff) {
|
|
delete(m.tasks, id)
|
|
}
|
|
}
|
|
}
|