Files
jukebox_maker/internal/copier/copier.go

302 lines
6.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package copier
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
"jukebox_maker/internal/config"
"jukebox_maker/internal/db"
"jukebox_maker/internal/disk"
"jukebox_maker/internal/task"
)
type Options struct {
DiskID string
MountPath string
MediaPath string
DestFolder string // subfolder on disk, default "media"
EnabledSources []string
ReserveFreeGB float64
OverwriteMode config.OverwriteMode
FileSelectMode config.FileSelectMode
}
type Copier struct {
tasks *task.Store
mu sync.Mutex
cancels map[string]context.CancelFunc
dbMu sync.RWMutex
dbs map[string]*db.DB
}
func New(tasks *task.Store) *Copier {
return &Copier{
tasks: tasks,
cancels: make(map[string]context.CancelFunc),
dbs: make(map[string]*db.DB),
}
}
func (c *Copier) SetDB(diskID string, d *db.DB) {
c.dbMu.Lock()
if d == nil {
delete(c.dbs, diskID)
} else {
c.dbs[diskID] = d
}
c.dbMu.Unlock()
}
func (c *Copier) getDB(diskID string) *db.DB {
c.dbMu.RLock()
defer c.dbMu.RUnlock()
return c.dbs[diskID]
}
func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, active := c.cancels[opts.DiskID]; active {
return "", errors.New("copy already running")
}
database := c.getDB(opts.DiskID)
if database == nil {
return "", errors.New("no disk database available")
}
if opts.DestFolder == "" {
opts.DestFolder = "media"
}
t := c.tasks.Create("copy", opts.DiskID)
copyCtx, cancel := context.WithCancel(ctx)
c.cancels[opts.DiskID] = cancel
go c.run(copyCtx, t.ID, opts, database)
return t.ID, nil
}
func (c *Copier) Cancel(diskID string) {
c.mu.Lock()
defer c.mu.Unlock()
if cancel, ok := c.cancels[diskID]; ok {
cancel()
}
}
func (c *Copier) run(ctx context.Context, taskID string, opts Options, database *db.DB) {
defer func() {
c.mu.Lock()
delete(c.cancels, opts.DiskID)
c.mu.Unlock()
}()
setStatus := func(s task.Status, msg string, prog int) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = s
t.Message = msg
t.Progress = prog
})
}
fail := func(err error) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusFailed
t.Error = err.Error()
})
}
setStatus(task.StatusRunning, "Подготовка…", 0)
destRoot := filepath.Join(opts.MountPath, opts.DestFolder)
if opts.OverwriteMode == config.OverwriteDelete {
setStatus(task.StatusRunning, "Удаление данных с диска…", 0)
if err := os.RemoveAll(destRoot); err != nil {
fail(err)
return
}
}
var copiedPaths map[string]struct{}
if opts.FileSelectMode == config.SelectNew {
setStatus(task.StatusRunning, "Загрузка истории…", 0)
var err error
copiedPaths, err = database.CopiedPaths(opts.DiskID)
if err != nil {
fail(err)
return
}
}
setStatus(task.StatusRunning, "Сканирование источников…", 0)
files, err := buildFileList(opts.MediaPath, opts.EnabledSources, copiedPaths)
if err != nil {
fail(err)
return
}
if len(files) == 0 {
setStatus(task.StatusSuccess, "Нет новых файлов для копирования.", 100)
return
}
// случайный порядок — выбираем что копировать до начала копирования
rand.Shuffle(len(files), func(i, j int) { files[i], files[j] = files[j], files[i] })
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
fail(err)
return
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
available := free - reserveBytes
if available <= 0 {
setStatus(task.StatusSuccess, "Недостаточно свободного места на диске.", 100)
return
}
// суммарный объём для прогресса (всех файлов в списке)
var totalBytes int64
for _, f := range files {
totalBytes += f.size
}
total := len(files)
copied := 0
var doneBytes int64
startTime := time.Now()
for i, f := range files {
select {
case <-ctx.Done():
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Отменено"
t.SpeedBPS = 0
t.ETASec = 0
})
return
default:
}
if f.size > available {
continue
}
elapsed := time.Since(startTime).Seconds()
var speedBPS, etaSec int64
if elapsed > 0 && doneBytes > 0 {
speedBPS = int64(float64(doneBytes) / elapsed)
remaining := totalBytes - doneBytes
if speedBPS > 0 {
etaSec = remaining / speedBPS
}
}
prog := int(float64(doneBytes) / float64(totalBytes) * 100)
msg := fmt.Sprintf("Копирование %s (%d/%d)", filepath.Base(f.srcAbs), i+1, total)
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Message = msg
t.Progress = prog
t.SpeedBPS = speedBPS
t.ETASec = int(etaSec)
})
dstAbs := filepath.Join(destRoot, f.relPath)
if err := rsyncFile(ctx, f.srcAbs, dstAbs); err != nil {
if errors.Is(err, context.Canceled) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Отменено"
t.SpeedBPS = 0
t.ETASec = 0
})
return
}
continue
}
available -= f.size
doneBytes += f.size
copied++
_ = database.RecordCopy(db.CopyRecord{
DiskID: opts.DiskID,
SourcePath: f.relPath,
FileSize: f.size,
})
}
setStatus(task.StatusSuccess, fmt.Sprintf("Готово. Скопировано файлов: %d.", copied), 100)
}
type fileEntry struct {
srcAbs string
relPath string // relative to /media
size int64
}
func buildFileList(mediaPath string, sources []string, skip map[string]struct{}) ([]fileEntry, error) {
var result []fileEntry
for _, src := range sources {
dir := filepath.Join(mediaPath, src)
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
return nil
}
rel, _ := filepath.Rel(mediaPath, path)
if _, skipped := skip[rel]; skipped {
return nil
}
info, err := d.Info()
if err != nil {
return nil
}
result = append(result, fileEntry{srcAbs: path, relPath: rel, size: info.Size()})
return nil
})
if err != nil {
return nil, err
}
}
return result, nil
}
// rsyncFile copies src to dst using rsync with resume support.
// --partial keeps partial files on interruption.
// --append-verify resumes partial transfers and verifies checksums.
func rsyncFile(ctx context.Context, src, dst string) error {
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return err
}
cmd := exec.CommandContext(ctx, "rsync",
"--partial",
"--append-verify",
"--times",
"--no-perms",
"--no-owner",
"--no-group",
"--chmod=ugo=rwx",
src, dst,
)
out, err := cmd.CombinedOutput()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("rsync: %w: %s", err, out)
}
return nil
}