diff --git a/cmd/jukebox/main.go b/cmd/jukebox/main.go index f501653..be2299e 100644 --- a/cmd/jukebox/main.go +++ b/cmd/jukebox/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "flag" "log" "net/http" @@ -39,6 +40,33 @@ func main() { activeDBs := make(map[string]*db.DB) mountToDiskID := make(map[string]string) + resumeDiskTask := func(info disk.DiskInfo, database *db.DB) { + rec, ok, err := database.ActiveTask() + if err != nil { + log.Printf("load active task for %s: %v", info.DiskID, err) + return + } + if !ok || rec.Task.Type != "copy" { + return + } + + var opts copier.Options + if err := json.Unmarshal(rec.Payload, &opts); err != nil { + log.Printf("decode task payload for %s: %v", info.DiskID, err) + return + } + opts.DiskID = info.DiskID + opts.MountPath = info.MountPath + if rec.Task.Phase != task.PhaseQueued && rec.Task.Phase != task.PhasePreparing && rec.Task.Phase != task.PhaseReplacing && opts.OverwriteMode == config.OverwriteDelete { + opts.OverwriteMode = config.OverwriteSkip + } + + taskStore.Upsert(rec.Task) + if err := cp.Resume(context.Background(), rec.Task.ID, opts); err != nil { + log.Printf("resume task %s for %s: %v", rec.Task.ID, info.DiskID, err) + } + } + openDiskDB := func(info disk.DiskInfo) { if info.DiskID == "" { return @@ -65,6 +93,7 @@ func main() { activeDBs[info.DiskID] = d cp.SetDB(info.DiskID, d) log.Printf("disk DB opened for %s", info.DiskID) + resumeDiskTask(info, d) } closeDiskDB := func(info disk.DiskInfo) { diff --git a/internal/copier/copier.go b/internal/copier/copier.go index b130a34..441e356 100644 --- a/internal/copier/copier.go +++ b/internal/copier/copier.go @@ -2,6 +2,7 @@ package copier import ( "context" + "encoding/json" "errors" "fmt" "math/rand/v2" @@ -73,6 +74,15 @@ func (c *Copier) LastCopiedAt(diskID string) (time.Time, bool, error) { } func (c *Copier) Start(ctx context.Context, opts Options) (string, error) { + return c.startTask(ctx, "", opts) +} + +func (c *Copier) Resume(ctx context.Context, taskID string, opts Options) error { + _, err := c.startTask(ctx, taskID, opts) + return err +} + +func (c *Copier) startTask(ctx context.Context, existingTaskID string, opts Options) (string, error) { c.mu.Lock() defer c.mu.Unlock() @@ -86,8 +96,13 @@ func (c *Copier) Start(ctx context.Context, opts Options) (string, error) { } if opts.DestFolder == "" { - opts.DestFolder = "media" + opts.DestFolder = config.DefaultDestFolder } + destFolder, err := config.NormalizeDestFolder(opts.DestFolder) + if err != nil { + destFolder = config.DefaultDestFolder + } + opts.DestFolder = destFolder _, free, err := disk.DiskUsage(opts.MountPath) if err != nil { @@ -98,12 +113,39 @@ func (c *Copier) Start(ctx context.Context, opts Options) (string, error) { return "", errors.New("free space is below reserve threshold") } - t := c.tasks.Create("copy", opts.DiskID) + var taskID string + if existingTaskID == "" { + t := c.tasks.Create("copy", opts.DiskID) + payload, err := json.Marshal(opts) + if err != nil { + return "", err + } + if err := database.UpsertTask(*t, payload); err != nil { + return "", err + } + taskID = t.ID + } else { + taskID = existingTaskID + c.tasks.Update(taskID, func(t *task.Task) { + t.Status = task.StatusQueued + t.Phase = task.PhaseQueued + t.Message = "Resuming after restart..." + t.Error = "" + t.SpeedBPS = 0 + t.ETASec = 0 + }) + if t, ok := c.tasks.Get(taskID); ok { + if err := database.UpdateTask(*t); err != nil { + return "", err + } + } + } + copyCtx, cancel := context.WithCancel(ctx) c.cancels[opts.DiskID] = cancel - go c.run(copyCtx, t.ID, opts, database) - return t.ID, nil + go c.run(copyCtx, taskID, opts, database) + return taskID, nil } func (c *Copier) Cancel(diskID string) { @@ -127,20 +169,43 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database t.Message = msg t.Progress = prog }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } } fail := func(err error) { c.tasks.Update(taskID, func(t *task.Task) { t.Status = task.StatusFailed t.Error = err.Error() }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } } - setStatus(task.StatusRunning, "Preparing...", 0) + c.tasks.Update(taskID, func(t *task.Task) { + t.Status = task.StatusRunning + t.Phase = task.PhasePreparing + t.Message = "Preparing..." + t.Progress = 0 + t.Error = "" + }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } destRoot := filepath.Join(opts.MountPath, opts.DestFolder) if opts.OverwriteMode == config.OverwriteDelete { - setStatus(task.StatusRunning, "Replacing destination media...", 0) + c.tasks.Update(taskID, func(t *task.Task) { + t.Status = task.StatusRunning + t.Phase = task.PhaseReplacing + t.Message = "Replacing destination media..." + t.Progress = 0 + }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } if err := os.RemoveAll(destRoot); err != nil { fail(err) return @@ -149,7 +214,15 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database var copiedPaths map[string]struct{} if opts.FileSelectMode == config.SelectNew { - setStatus(task.StatusRunning, "Loading copy history...", 0) + c.tasks.Update(taskID, func(t *task.Task) { + t.Status = task.StatusRunning + t.Phase = task.PhaseLoadingHistory + t.Message = "Loading copy history..." + t.Progress = 0 + }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } var err error copiedPaths, err = database.CopiedPaths(opts.DiskID) if err != nil { @@ -158,7 +231,15 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database } } - setStatus(task.StatusRunning, "Scanning sources...", 0) + c.tasks.Update(taskID, func(t *task.Task) { + t.Status = task.StatusRunning + t.Phase = task.PhaseScanning + t.Message = "Scanning sources..." + t.Progress = 0 + }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } files, err := buildFileList(opts.MediaPath, opts.SourceRules, copiedPaths) if err != nil { fail(err) @@ -204,6 +285,9 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database t.SpeedBPS = 0 t.ETASec = 0 }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } return default: } @@ -227,11 +311,15 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database c.tasks.Update(taskID, func(t *task.Task) { t.Status = task.StatusRunning + t.Phase = task.PhaseCopying t.Message = msg t.Progress = prog t.SpeedBPS = speedBPS t.ETASec = int(etaSec) }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } dstAbs := filepath.Join(destRoot, f.relPath) if err := rsyncFile(ctx, f.srcAbs, dstAbs); err != nil { @@ -242,6 +330,9 @@ func (c *Copier) run(ctx context.Context, taskID string, opts Options, database t.SpeedBPS = 0 t.ETASec = 0 }) + if t, ok := c.tasks.Get(taskID); ok { + _ = database.UpdateTask(*t) + } return } continue diff --git a/internal/db/db.go b/internal/db/db.go index ac29317..db05721 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -2,8 +2,11 @@ package db import ( "database/sql" + "encoding/json" "time" + "jukebox_maker/internal/task" + _ "modernc.org/sqlite" ) @@ -18,6 +21,11 @@ type CopyRecord struct { CopiedAt time.Time } +type TaskRecord struct { + Task task.Task + Payload json.RawMessage +} + func Open(path string) (*DB, error) { conn, err := sql.Open("sqlite", path+"?_journal=WAL&_timeout=5000") if err != nil { @@ -51,6 +59,22 @@ func (d *DB) migrate() error { disk_id TEXT PRIMARY KEY, last_copied_at DATETIME NOT NULL ); + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + disk_id TEXT NOT NULL, + type TEXT NOT NULL, + status TEXT NOT NULL, + phase TEXT NOT NULL DEFAULT 'queued', + progress INTEGER NOT NULL DEFAULT 0, + message TEXT NOT NULL DEFAULT '', + speed_bps INTEGER NOT NULL DEFAULT 0, + eta_sec INTEGER NOT NULL DEFAULT 0, + error TEXT NOT NULL DEFAULT '', + payload TEXT NOT NULL DEFAULT '{}', + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_tasks_status_updated ON tasks (status, updated_at); `) return err } @@ -129,3 +153,94 @@ func (d *DB) LastCopiedAt(diskID string) (time.Time, bool, error) { } return t, true, nil } + +func (d *DB) UpsertTask(t task.Task, payload json.RawMessage) error { + if payload == nil { + payload = json.RawMessage(`{}`) + } + _, err := d.sql.Exec( + `INSERT INTO tasks (id, disk_id, type, status, phase, progress, message, speed_bps, eta_sec, error, payload, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + status=excluded.status, + phase=excluded.phase, + progress=excluded.progress, + message=excluded.message, + speed_bps=excluded.speed_bps, + eta_sec=excluded.eta_sec, + error=excluded.error, + payload=excluded.payload, + updated_at=excluded.updated_at`, + t.ID, t.DiskID, t.Type, t.Status, t.Phase, t.Progress, t.Message, t.SpeedBPS, t.ETASec, t.Error, + string(payload), t.CreatedAt.Format(time.RFC3339), t.UpdatedAt.Format(time.RFC3339), + ) + return err +} + +func (d *DB) UpdateTask(t task.Task) error { + _, err := d.sql.Exec( + `UPDATE tasks + SET status=?, phase=?, progress=?, message=?, speed_bps=?, eta_sec=?, error=?, updated_at=? + WHERE id=?`, + t.Status, t.Phase, t.Progress, t.Message, t.SpeedBPS, t.ETASec, t.Error, t.UpdatedAt.Format(time.RFC3339), t.ID, + ) + return err +} + +func (d *DB) ActiveTask() (*TaskRecord, bool, error) { + row := d.sql.QueryRow( + `SELECT id, disk_id, type, status, phase, progress, message, speed_bps, eta_sec, error, payload, created_at, updated_at + FROM tasks + WHERE status IN ('queued','running') + ORDER BY updated_at DESC + LIMIT 1`, + ) + rec, err := scanTaskRecord(row) + if err == sql.ErrNoRows { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + return rec, true, nil +} + +type scanner interface { + Scan(dest ...any) error +} + +func scanTaskRecord(s scanner) (*TaskRecord, error) { + var rec TaskRecord + var payloadRaw, createdAtRaw, updatedAtRaw string + err := s.Scan( + &rec.Task.ID, + &rec.Task.DiskID, + &rec.Task.Type, + &rec.Task.Status, + &rec.Task.Phase, + &rec.Task.Progress, + &rec.Task.Message, + &rec.Task.SpeedBPS, + &rec.Task.ETASec, + &rec.Task.Error, + &payloadRaw, + &createdAtRaw, + &updatedAtRaw, + ) + if err != nil { + return nil, err + } + + createdAt, err := time.Parse(time.RFC3339, createdAtRaw) + if err != nil { + return nil, err + } + updatedAt, err := time.Parse(time.RFC3339, updatedAtRaw) + if err != nil { + return nil, err + } + rec.Task.CreatedAt = createdAt + rec.Task.UpdatedAt = updatedAt + rec.Payload = json.RawMessage(payloadRaw) + return &rec, nil +} diff --git a/internal/task/task.go b/internal/task/task.go index 593583c..9089e87 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -17,11 +17,21 @@ const ( StatusCanceled Status = "canceled" ) +const ( + PhaseQueued = "queued" + PhasePreparing = "preparing" + PhaseReplacing = "replacing" + PhaseLoadingHistory = "loading_history" + PhaseScanning = "scanning" + PhaseCopying = "copying" +) + type Task struct { ID string `json:"id"` DiskID string `json:"disk_id"` Type string `json:"type"` Status Status `json:"status"` + Phase string `json:"phase,omitempty"` Progress int `json:"progress"` Message string `json:"message"` SpeedBPS int64 `json:"speed_bps"` @@ -45,13 +55,15 @@ func NewStore() *Store { } func (s *Store) Create(taskType, diskID string) *Task { + now := time.Now().UTC() t := &Task{ ID: uuid.New().String(), DiskID: diskID, Type: taskType, Status: StatusQueued, - CreatedAt: time.Now().UTC(), - UpdatedAt: time.Now().UTC(), + Phase: PhaseQueued, + CreatedAt: now, + UpdatedAt: now, } s.mu.Lock() s.tasks[t.ID] = t @@ -59,6 +71,13 @@ func (s *Store) Create(taskType, diskID string) *Task { return t } +func (s *Store) Upsert(t Task) { + copy := t + s.mu.Lock() + s.tasks[t.ID] = © + s.mu.Unlock() +} + func (s *Store) Get(id string) (*Task, bool) { s.mu.RLock() defer s.mu.RUnlock()