Store unfinished tasks on disks
This commit is contained in:
@@ -2,6 +2,7 @@ package copier
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
@@ -73,6 +74,15 @@ func (c *Copier) LastCopiedAt(diskID string) (time.Time, bool, error) {
|
||||
}
|
||||
|
||||
func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
|
||||
return c.startTask(ctx, "", opts)
|
||||
}
|
||||
|
||||
func (c *Copier) Resume(ctx context.Context, taskID string, opts Options) error {
|
||||
_, err := c.startTask(ctx, taskID, opts)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Copier) startTask(ctx context.Context, existingTaskID string, opts Options) (string, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
@@ -86,8 +96,13 @@ func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
|
||||
}
|
||||
|
||||
if opts.DestFolder == "" {
|
||||
opts.DestFolder = "media"
|
||||
opts.DestFolder = config.DefaultDestFolder
|
||||
}
|
||||
destFolder, err := config.NormalizeDestFolder(opts.DestFolder)
|
||||
if err != nil {
|
||||
destFolder = config.DefaultDestFolder
|
||||
}
|
||||
opts.DestFolder = destFolder
|
||||
|
||||
_, free, err := disk.DiskUsage(opts.MountPath)
|
||||
if err != nil {
|
||||
@@ -98,12 +113,39 @@ func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
|
||||
return "", errors.New("free space is below reserve threshold")
|
||||
}
|
||||
|
||||
t := c.tasks.Create("copy", opts.DiskID)
|
||||
var taskID string
|
||||
if existingTaskID == "" {
|
||||
t := c.tasks.Create("copy", opts.DiskID)
|
||||
payload, err := json.Marshal(opts)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if err := database.UpsertTask(*t, payload); err != nil {
|
||||
return "", err
|
||||
}
|
||||
taskID = t.ID
|
||||
} else {
|
||||
taskID = existingTaskID
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusQueued
|
||||
t.Phase = task.PhaseQueued
|
||||
t.Message = "Resuming after restart..."
|
||||
t.Error = ""
|
||||
t.SpeedBPS = 0
|
||||
t.ETASec = 0
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
if err := database.UpdateTask(*t); err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
copyCtx, cancel := context.WithCancel(ctx)
|
||||
c.cancels[opts.DiskID] = cancel
|
||||
|
||||
go c.run(copyCtx, t.ID, opts, database)
|
||||
return t.ID, nil
|
||||
go c.run(copyCtx, taskID, opts, database)
|
||||
return taskID, nil
|
||||
}
|
||||
|
||||
func (c *Copier) Cancel(diskID string) {
|
||||
@@ -127,20 +169,43 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database
|
||||
t.Message = msg
|
||||
t.Progress = prog
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
}
|
||||
fail := func(err error) {
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusFailed
|
||||
t.Error = err.Error()
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
}
|
||||
|
||||
setStatus(task.StatusRunning, "Preparing...", 0)
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusRunning
|
||||
t.Phase = task.PhasePreparing
|
||||
t.Message = "Preparing..."
|
||||
t.Progress = 0
|
||||
t.Error = ""
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
|
||||
destRoot := filepath.Join(opts.MountPath, opts.DestFolder)
|
||||
|
||||
if opts.OverwriteMode == config.OverwriteDelete {
|
||||
setStatus(task.StatusRunning, "Replacing destination media...", 0)
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusRunning
|
||||
t.Phase = task.PhaseReplacing
|
||||
t.Message = "Replacing destination media..."
|
||||
t.Progress = 0
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
if err := os.RemoveAll(destRoot); err != nil {
|
||||
fail(err)
|
||||
return
|
||||
@@ -149,7 +214,15 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database
|
||||
|
||||
var copiedPaths map[string]struct{}
|
||||
if opts.FileSelectMode == config.SelectNew {
|
||||
setStatus(task.StatusRunning, "Loading copy history...", 0)
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusRunning
|
||||
t.Phase = task.PhaseLoadingHistory
|
||||
t.Message = "Loading copy history..."
|
||||
t.Progress = 0
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
var err error
|
||||
copiedPaths, err = database.CopiedPaths(opts.DiskID)
|
||||
if err != nil {
|
||||
@@ -158,7 +231,15 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database
|
||||
}
|
||||
}
|
||||
|
||||
setStatus(task.StatusRunning, "Scanning sources...", 0)
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusRunning
|
||||
t.Phase = task.PhaseScanning
|
||||
t.Message = "Scanning sources..."
|
||||
t.Progress = 0
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
files, err := buildFileList(opts.MediaPath, opts.SourceRules, copiedPaths)
|
||||
if err != nil {
|
||||
fail(err)
|
||||
@@ -204,6 +285,9 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database
|
||||
t.SpeedBPS = 0
|
||||
t.ETASec = 0
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
return
|
||||
default:
|
||||
}
|
||||
@@ -227,11 +311,15 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database
|
||||
|
||||
c.tasks.Update(taskID, func(t *task.Task) {
|
||||
t.Status = task.StatusRunning
|
||||
t.Phase = task.PhaseCopying
|
||||
t.Message = msg
|
||||
t.Progress = prog
|
||||
t.SpeedBPS = speedBPS
|
||||
t.ETASec = int(etaSec)
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
|
||||
dstAbs := filepath.Join(destRoot, f.relPath)
|
||||
if err := rsyncFile(ctx, f.srcAbs, dstAbs); err != nil {
|
||||
@@ -242,6 +330,9 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database
|
||||
t.SpeedBPS = 0
|
||||
t.ETASec = 0
|
||||
})
|
||||
if t, ok := c.tasks.Get(taskID); ok {
|
||||
_ = database.UpdateTask(*t)
|
||||
}
|
||||
return
|
||||
}
|
||||
continue
|
||||
|
||||
@@ -2,8 +2,11 @@ package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"jukebox_maker/internal/task"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
@@ -18,6 +21,11 @@ type CopyRecord struct {
|
||||
CopiedAt time.Time
|
||||
}
|
||||
|
||||
type TaskRecord struct {
|
||||
Task task.Task
|
||||
Payload json.RawMessage
|
||||
}
|
||||
|
||||
func Open(path string) (*DB, error) {
|
||||
conn, err := sql.Open("sqlite", path+"?_journal=WAL&_timeout=5000")
|
||||
if err != nil {
|
||||
@@ -51,6 +59,22 @@ func (d *DB) migrate() error {
|
||||
disk_id TEXT PRIMARY KEY,
|
||||
last_copied_at DATETIME NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS tasks (
|
||||
id TEXT PRIMARY KEY,
|
||||
disk_id TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
phase TEXT NOT NULL DEFAULT 'queued',
|
||||
progress INTEGER NOT NULL DEFAULT 0,
|
||||
message TEXT NOT NULL DEFAULT '',
|
||||
speed_bps INTEGER NOT NULL DEFAULT 0,
|
||||
eta_sec INTEGER NOT NULL DEFAULT 0,
|
||||
error TEXT NOT NULL DEFAULT '',
|
||||
payload TEXT NOT NULL DEFAULT '{}',
|
||||
created_at DATETIME NOT NULL,
|
||||
updated_at DATETIME NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_tasks_status_updated ON tasks (status, updated_at);
|
||||
`)
|
||||
return err
|
||||
}
|
||||
@@ -129,3 +153,94 @@ func (d *DB) LastCopiedAt(diskID string) (time.Time, bool, error) {
|
||||
}
|
||||
return t, true, nil
|
||||
}
|
||||
|
||||
func (d *DB) UpsertTask(t task.Task, payload json.RawMessage) error {
|
||||
if payload == nil {
|
||||
payload = json.RawMessage(`{}`)
|
||||
}
|
||||
_, err := d.sql.Exec(
|
||||
`INSERT INTO tasks (id, disk_id, type, status, phase, progress, message, speed_bps, eta_sec, error, payload, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(id) DO UPDATE SET
|
||||
status=excluded.status,
|
||||
phase=excluded.phase,
|
||||
progress=excluded.progress,
|
||||
message=excluded.message,
|
||||
speed_bps=excluded.speed_bps,
|
||||
eta_sec=excluded.eta_sec,
|
||||
error=excluded.error,
|
||||
payload=excluded.payload,
|
||||
updated_at=excluded.updated_at`,
|
||||
t.ID, t.DiskID, t.Type, t.Status, t.Phase, t.Progress, t.Message, t.SpeedBPS, t.ETASec, t.Error,
|
||||
string(payload), t.CreatedAt.Format(time.RFC3339), t.UpdatedAt.Format(time.RFC3339),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *DB) UpdateTask(t task.Task) error {
|
||||
_, err := d.sql.Exec(
|
||||
`UPDATE tasks
|
||||
SET status=?, phase=?, progress=?, message=?, speed_bps=?, eta_sec=?, error=?, updated_at=?
|
||||
WHERE id=?`,
|
||||
t.Status, t.Phase, t.Progress, t.Message, t.SpeedBPS, t.ETASec, t.Error, t.UpdatedAt.Format(time.RFC3339), t.ID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *DB) ActiveTask() (*TaskRecord, bool, error) {
|
||||
row := d.sql.QueryRow(
|
||||
`SELECT id, disk_id, type, status, phase, progress, message, speed_bps, eta_sec, error, payload, created_at, updated_at
|
||||
FROM tasks
|
||||
WHERE status IN ('queued','running')
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT 1`,
|
||||
)
|
||||
rec, err := scanTaskRecord(row)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
return rec, true, nil
|
||||
}
|
||||
|
||||
type scanner interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
func scanTaskRecord(s scanner) (*TaskRecord, error) {
|
||||
var rec TaskRecord
|
||||
var payloadRaw, createdAtRaw, updatedAtRaw string
|
||||
err := s.Scan(
|
||||
&rec.Task.ID,
|
||||
&rec.Task.DiskID,
|
||||
&rec.Task.Type,
|
||||
&rec.Task.Status,
|
||||
&rec.Task.Phase,
|
||||
&rec.Task.Progress,
|
||||
&rec.Task.Message,
|
||||
&rec.Task.SpeedBPS,
|
||||
&rec.Task.ETASec,
|
||||
&rec.Task.Error,
|
||||
&payloadRaw,
|
||||
&createdAtRaw,
|
||||
&updatedAtRaw,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
createdAt, err := time.Parse(time.RFC3339, createdAtRaw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
updatedAt, err := time.Parse(time.RFC3339, updatedAtRaw)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rec.Task.CreatedAt = createdAt
|
||||
rec.Task.UpdatedAt = updatedAt
|
||||
rec.Payload = json.RawMessage(payloadRaw)
|
||||
return &rec, nil
|
||||
}
|
||||
|
||||
@@ -17,11 +17,21 @@ const (
|
||||
StatusCanceled Status = "canceled"
|
||||
)
|
||||
|
||||
const (
|
||||
PhaseQueued = "queued"
|
||||
PhasePreparing = "preparing"
|
||||
PhaseReplacing = "replacing"
|
||||
PhaseLoadingHistory = "loading_history"
|
||||
PhaseScanning = "scanning"
|
||||
PhaseCopying = "copying"
|
||||
)
|
||||
|
||||
type Task struct {
|
||||
ID string `json:"id"`
|
||||
DiskID string `json:"disk_id"`
|
||||
Type string `json:"type"`
|
||||
Status Status `json:"status"`
|
||||
Phase string `json:"phase,omitempty"`
|
||||
Progress int `json:"progress"`
|
||||
Message string `json:"message"`
|
||||
SpeedBPS int64 `json:"speed_bps"`
|
||||
@@ -45,13 +55,15 @@ func NewStore() *Store {
|
||||
}
|
||||
|
||||
func (s *Store) Create(taskType, diskID string) *Task {
|
||||
now := time.Now().UTC()
|
||||
t := &Task{
|
||||
ID: uuid.New().String(),
|
||||
DiskID: diskID,
|
||||
Type: taskType,
|
||||
Status: StatusQueued,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
UpdatedAt: time.Now().UTC(),
|
||||
Phase: PhaseQueued,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.tasks[t.ID] = t
|
||||
@@ -59,6 +71,13 @@ func (s *Store) Create(taskType, diskID string) *Task {
|
||||
return t
|
||||
}
|
||||
|
||||
func (s *Store) Upsert(t Task) {
|
||||
copy := t
|
||||
s.mu.Lock()
|
||||
s.tasks[t.ID] = ©
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Store) Get(id string) (*Task, bool) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
Reference in New Issue
Block a user