Files
jukebox_maker/internal/copier/copier.go

403 lines
9.0 KiB
Go

package copier
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"os"
"os/exec"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"jukebox_maker/internal/config"
"jukebox_maker/internal/db"
"jukebox_maker/internal/disk"
"jukebox_maker/internal/task"
)
type Options struct {
DiskID string
MountPath string
MediaPath string
DestFolder string // subfolder on disk, default "media"
SourceRules []config.SourceFolder
ReserveFreeGB float64
OverwriteMode config.OverwriteMode
FileSelectMode config.FileSelectMode
}
type Copier struct {
tasks *task.Store
mu sync.Mutex
cancels map[string]context.CancelFunc
dbMu sync.RWMutex
dbs map[string]*db.DB
}
func New(tasks *task.Store) *Copier {
return &Copier{
tasks: tasks,
cancels: make(map[string]context.CancelFunc),
dbs: make(map[string]*db.DB),
}
}
func (c *Copier) SetDB(diskID string, d *db.DB) {
c.dbMu.Lock()
if d == nil {
delete(c.dbs, diskID)
} else {
c.dbs[diskID] = d
}
c.dbMu.Unlock()
}
func (c *Copier) getDB(diskID string) *db.DB {
c.dbMu.RLock()
defer c.dbMu.RUnlock()
return c.dbs[diskID]
}
func (c *Copier) LastCopiedAt(diskID string) (time.Time, bool, error) {
database := c.getDB(diskID)
if database == nil {
return time.Time{}, false, nil
}
return database.LastCopiedAt(diskID)
}
func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, active := c.cancels[opts.DiskID]; active {
return "", errors.New("copy already running")
}
database := c.getDB(opts.DiskID)
if database == nil {
return "", errors.New("no disk database available")
}
if opts.DestFolder == "" {
opts.DestFolder = "media"
}
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
return "", err
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
if free <= reserveBytes {
return "", errors.New("free space is below reserve threshold")
}
t := c.tasks.Create("copy", opts.DiskID)
copyCtx, cancel := context.WithCancel(ctx)
c.cancels[opts.DiskID] = cancel
go c.run(copyCtx, t.ID, opts, database)
return t.ID, nil
}
func (c *Copier) Cancel(diskID string) {
c.mu.Lock()
defer c.mu.Unlock()
if cancel, ok := c.cancels[diskID]; ok {
cancel()
}
}
func (c *Copier) run(ctx context.Context, taskID string, opts Options, database *db.DB) {
defer func() {
c.mu.Lock()
delete(c.cancels, opts.DiskID)
c.mu.Unlock()
}()
setStatus := func(s task.Status, msg string, prog int) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = s
t.Message = msg
t.Progress = prog
})
}
fail := func(err error) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusFailed
t.Error = err.Error()
})
}
setStatus(task.StatusRunning, "Preparing...", 0)
destRoot := filepath.Join(opts.MountPath, opts.DestFolder)
if opts.OverwriteMode == config.OverwriteDelete {
setStatus(task.StatusRunning, "Replacing destination media...", 0)
if err := os.RemoveAll(destRoot); err != nil {
fail(err)
return
}
}
var copiedPaths map[string]struct{}
if opts.FileSelectMode == config.SelectNew {
setStatus(task.StatusRunning, "Loading copy history...", 0)
var err error
copiedPaths, err = database.CopiedPaths(opts.DiskID)
if err != nil {
fail(err)
return
}
}
setStatus(task.StatusRunning, "Scanning sources...", 0)
files, err := buildFileList(opts.MediaPath, opts.SourceRules, copiedPaths)
if err != nil {
fail(err)
return
}
if len(files) == 0 {
setStatus(task.StatusSuccess, "No files to copy.", 100)
return
}
// случайный порядок — выбираем что копировать до начала копирования
rand.Shuffle(len(files), func(i, j int) { files[i], files[j] = files[j], files[i] })
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
fail(err)
return
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
available := free - reserveBytes
if available <= 0 {
setStatus(task.StatusFailed, "Free space is below the reserved threshold.", 100)
return
}
// суммарный объём для прогресса (всех файлов в списке)
var totalBytes int64
for _, f := range files {
totalBytes += f.size
}
total := len(files)
copied := 0
var doneBytes int64
startTime := time.Now()
for i, f := range files {
select {
case <-ctx.Done():
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Canceled"
t.SpeedBPS = 0
t.ETASec = 0
})
return
default:
}
if f.size > available {
continue
}
elapsed := time.Since(startTime).Seconds()
var speedBPS, etaSec int64
if elapsed > 0 && doneBytes > 0 {
speedBPS = int64(float64(doneBytes) / elapsed)
remaining := totalBytes - doneBytes
if speedBPS > 0 {
etaSec = remaining / speedBPS
}
}
prog := int(float64(doneBytes) / float64(totalBytes) * 100)
msg := fmt.Sprintf("Copying %s (%d/%d)", filepath.Base(f.srcAbs), i+1, total)
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Message = msg
t.Progress = prog
t.SpeedBPS = speedBPS
t.ETASec = int(etaSec)
})
dstAbs := filepath.Join(destRoot, f.relPath)
if err := rsyncFile(ctx, f.srcAbs, dstAbs); err != nil {
if errors.Is(err, context.Canceled) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Canceled"
t.SpeedBPS = 0
t.ETASec = 0
})
return
}
continue
}
available -= f.size
doneBytes += f.size
copied++
_ = database.RecordCopy(db.CopyRecord{
DiskID: opts.DiskID,
SourcePath: f.relPath,
FileSize: f.size,
})
}
setStatus(task.StatusSuccess, fmt.Sprintf("Done. Copied %d files.", copied), 100)
}
type fileEntry struct {
srcAbs string
relPath string // relative to /media
size int64
}
func buildFileList(mediaPath string, rules []config.SourceFolder, skip map[string]struct{}) ([]fileEntry, error) {
roots, ruleMap := normalizeSourceRules(rules)
var result []fileEntry
for _, src := range roots {
dir := filepath.Join(mediaPath, src)
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
if err != nil {
return nil
}
if path == dir {
return nil
}
rel, relErr := filepath.Rel(mediaPath, path)
if relErr != nil {
return nil
}
rel = filepath.ToSlash(rel)
if !isPathEnabled(rel, ruleMap) && !hasEnabledDescendant(rel, ruleMap) {
return filepath.SkipDir
}
return nil
}
rel, _ := filepath.Rel(mediaPath, path)
rel = filepath.ToSlash(rel)
if !isPathEnabled(rel, ruleMap) {
return nil
}
if _, skipped := skip[rel]; skipped {
return nil
}
info, err := d.Info()
if err != nil {
return nil
}
result = append(result, fileEntry{srcAbs: path, relPath: rel, size: info.Size()})
return nil
})
if err != nil {
return nil, err
}
}
return result, nil
}
func normalizeSourceRules(rules []config.SourceFolder) ([]string, map[string]bool) {
ruleMap := make(map[string]bool, len(rules))
for _, rule := range rules {
src := filepath.ToSlash(filepath.Clean(strings.TrimSpace(rule.Path)))
src = strings.TrimPrefix(src, "./")
src = strings.TrimPrefix(src, "/")
if src == "" || src == "." {
continue
}
if src == ".." || strings.HasPrefix(src, "../") {
continue
}
ruleMap[src] = rule.Enabled
}
var roots []string
for src, enabled := range ruleMap {
if !enabled || hasEnabledAncestor(src, ruleMap) {
continue
}
roots = append(roots, src)
}
sort.Strings(roots)
return roots, ruleMap
}
func hasEnabledAncestor(path string, ruleMap map[string]bool) bool {
for parent := parentSourcePath(path); parent != ""; parent = parentSourcePath(parent) {
if ruleMap[parent] {
return true
}
}
return false
}
func hasEnabledDescendant(path string, ruleMap map[string]bool) bool {
prefix := path + "/"
for other, enabled := range ruleMap {
if enabled && strings.HasPrefix(other, prefix) {
return true
}
}
return false
}
func isPathEnabled(path string, ruleMap map[string]bool) bool {
for current := path; current != ""; current = parentSourcePath(current) {
if enabled, ok := ruleMap[current]; ok {
return enabled
}
}
return false
}
func parentSourcePath(path string) string {
idx := strings.LastIndex(path, "/")
if idx < 0 {
return ""
}
return path[:idx]
}
// rsyncFile copies src to dst using rsync with resume support.
// --partial keeps partial files on interruption.
// --append-verify resumes partial transfers and verifies checksums.
func rsyncFile(ctx context.Context, src, dst string) error {
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return err
}
cmd := exec.CommandContext(ctx, "rsync",
"--partial",
"--append-verify",
"--times",
"--no-perms",
"--no-owner",
"--no-group",
"--chmod=ugo=rwx",
src, dst,
)
out, err := cmd.CombinedOutput()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("rsync: %w: %s", err, out)
}
return nil
}