Files
jukebox_maker/internal/db/db.go

247 lines
5.9 KiB
Go

package db
import (
"database/sql"
"encoding/json"
"time"
"jukebox_maker/internal/task"
_ "modernc.org/sqlite"
)
type DB struct {
sql *sql.DB
}
type CopyRecord struct {
DiskID string
SourcePath string
FileSize int64
CopiedAt time.Time
}
type TaskRecord struct {
Task task.Task
Payload json.RawMessage
}
func Open(path string) (*DB, error) {
conn, err := sql.Open("sqlite", path+"?_journal=WAL&_timeout=5000")
if err != nil {
return nil, err
}
conn.SetMaxOpenConns(1)
d := &DB{sql: conn}
if err := d.migrate(); err != nil {
conn.Close()
return nil, err
}
return d, nil
}
func (d *DB) Close() error {
return d.sql.Close()
}
func (d *DB) migrate() error {
_, err := d.sql.Exec(`
CREATE TABLE IF NOT EXISTS copy_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
disk_id TEXT NOT NULL,
source_path TEXT NOT NULL,
file_size INTEGER NOT NULL DEFAULT 0,
copied_at DATETIME NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_copy_history_disk_path
ON copy_history (disk_id, source_path);
CREATE TABLE IF NOT EXISTS disk_stats (
disk_id TEXT PRIMARY KEY,
last_copied_at DATETIME NOT NULL
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
disk_id TEXT NOT NULL,
type TEXT NOT NULL,
status TEXT NOT NULL,
phase TEXT NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
message TEXT NOT NULL DEFAULT '',
speed_bps INTEGER NOT NULL DEFAULT 0,
eta_sec INTEGER NOT NULL DEFAULT 0,
error TEXT NOT NULL DEFAULT '',
payload TEXT NOT NULL DEFAULT '{}',
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tasks_status_updated ON tasks (status, updated_at);
`)
return err
}
func (d *DB) WasCopied(diskID, sourcePath string) (bool, error) {
var n int
err := d.sql.QueryRow(
`SELECT COUNT(*) FROM copy_history WHERE disk_id=? AND source_path=?`,
diskID, sourcePath,
).Scan(&n)
return n > 0, err
}
func (d *DB) RecordCopy(rec CopyRecord) error {
t := rec.CopiedAt
if t.IsZero() {
t = time.Now().UTC()
}
tx, err := d.sql.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.Exec(
`INSERT OR IGNORE INTO copy_history (disk_id, source_path, file_size, copied_at) VALUES (?,?,?,?)`,
rec.DiskID, rec.SourcePath, rec.FileSize, t.Format(time.RFC3339),
); err != nil {
return err
}
if _, err := tx.Exec(
`INSERT INTO disk_stats (disk_id, last_copied_at) VALUES (?, ?)
ON CONFLICT(disk_id) DO UPDATE SET last_copied_at=excluded.last_copied_at`,
rec.DiskID, t.Format(time.RFC3339),
); err != nil {
return err
}
return tx.Commit()
}
func (d *DB) CopiedPaths(diskID string) (map[string]struct{}, error) {
rows, err := d.sql.Query(
`SELECT source_path FROM copy_history WHERE disk_id=?`, diskID,
)
if err != nil {
return nil, err
}
defer rows.Close()
m := make(map[string]struct{})
for rows.Next() {
var p string
if err := rows.Scan(&p); err != nil {
return nil, err
}
m[p] = struct{}{}
}
return m, rows.Err()
}
func (d *DB) LastCopiedAt(diskID string) (time.Time, bool, error) {
var raw string
err := d.sql.QueryRow(
`SELECT last_copied_at FROM disk_stats WHERE disk_id=?`,
diskID,
).Scan(&raw)
if err == sql.ErrNoRows {
return time.Time{}, false, nil
}
if err != nil {
return time.Time{}, false, err
}
t, err := time.Parse(time.RFC3339, raw)
if err != nil {
return time.Time{}, false, err
}
return t, true, nil
}
func (d *DB) UpsertTask(t task.Task, payload json.RawMessage) error {
if payload == nil {
payload = json.RawMessage(`{}`)
}
_, err := d.sql.Exec(
`INSERT INTO tasks (id, disk_id, type, status, phase, progress, message, speed_bps, eta_sec, error, payload, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
status=excluded.status,
phase=excluded.phase,
progress=excluded.progress,
message=excluded.message,
speed_bps=excluded.speed_bps,
eta_sec=excluded.eta_sec,
error=excluded.error,
payload=excluded.payload,
updated_at=excluded.updated_at`,
t.ID, t.DiskID, t.Type, t.Status, t.Phase, t.Progress, t.Message, t.SpeedBPS, t.ETASec, t.Error,
string(payload), t.CreatedAt.Format(time.RFC3339), t.UpdatedAt.Format(time.RFC3339),
)
return err
}
func (d *DB) UpdateTask(t task.Task) error {
_, err := d.sql.Exec(
`UPDATE tasks
SET status=?, phase=?, progress=?, message=?, speed_bps=?, eta_sec=?, error=?, updated_at=?
WHERE id=?`,
t.Status, t.Phase, t.Progress, t.Message, t.SpeedBPS, t.ETASec, t.Error, t.UpdatedAt.Format(time.RFC3339), t.ID,
)
return err
}
func (d *DB) ActiveTask() (*TaskRecord, bool, error) {
row := d.sql.QueryRow(
`SELECT id, disk_id, type, status, phase, progress, message, speed_bps, eta_sec, error, payload, created_at, updated_at
FROM tasks
WHERE status IN ('queued','running')
ORDER BY updated_at DESC
LIMIT 1`,
)
rec, err := scanTaskRecord(row)
if err == sql.ErrNoRows {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
return rec, true, nil
}
type scanner interface {
Scan(dest ...any) error
}
func scanTaskRecord(s scanner) (*TaskRecord, error) {
var rec TaskRecord
var payloadRaw, createdAtRaw, updatedAtRaw string
err := s.Scan(
&rec.Task.ID,
&rec.Task.DiskID,
&rec.Task.Type,
&rec.Task.Status,
&rec.Task.Phase,
&rec.Task.Progress,
&rec.Task.Message,
&rec.Task.SpeedBPS,
&rec.Task.ETASec,
&rec.Task.Error,
&payloadRaw,
&createdAtRaw,
&updatedAtRaw,
)
if err != nil {
return nil, err
}
createdAt, err := time.Parse(time.RFC3339, createdAtRaw)
if err != nil {
return nil, err
}
updatedAt, err := time.Parse(time.RFC3339, updatedAtRaw)
if err != nil {
return nil, err
}
rec.Task.CreatedAt = createdAt
rec.Task.UpdatedAt = updatedAt
rec.Payload = json.RawMessage(payloadRaw)
return &rec, nil
}