- Заменить ручное копирование на rsync --partial --append-verify - Структура на диске: <mount>/<dest_folder>/<rel path from /media> - dest_folder настраивается (default: media) - Права на диске: --no-perms --chmod=ugo=rwx - rsync добавлен в Dockerfile - Режим "удалить": удаляет только dest_folder, а не весь диск Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
254 lines
5.6 KiB
Go
254 lines
5.6 KiB
Go
package copier
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"os"
|
||
"os/exec"
|
||
"path/filepath"
|
||
"sync"
|
||
|
||
"jukebox_maker/internal/config"
|
||
"jukebox_maker/internal/db"
|
||
"jukebox_maker/internal/disk"
|
||
"jukebox_maker/internal/task"
|
||
)
|
||
|
||
type Options struct {
|
||
DiskID string
|
||
MountPath string
|
||
MediaPath string
|
||
DestFolder string // subfolder on disk, default "media"
|
||
EnabledSources []string
|
||
ReserveFreeGB float64
|
||
OverwriteMode config.OverwriteMode
|
||
FileSelectMode config.FileSelectMode
|
||
}
|
||
|
||
type Copier struct {
|
||
tasks *task.Store
|
||
|
||
mu sync.Mutex
|
||
cancel context.CancelFunc
|
||
|
||
dbMu sync.RWMutex
|
||
db *db.DB
|
||
}
|
||
|
||
func New(tasks *task.Store) *Copier {
|
||
return &Copier{tasks: tasks}
|
||
}
|
||
|
||
func (c *Copier) SetDB(d *db.DB) {
|
||
c.dbMu.Lock()
|
||
c.db = d
|
||
c.dbMu.Unlock()
|
||
}
|
||
|
||
func (c *Copier) getDB() *db.DB {
|
||
c.dbMu.RLock()
|
||
defer c.dbMu.RUnlock()
|
||
return c.db
|
||
}
|
||
|
||
func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
|
||
if _, active := c.tasks.ActiveTask(); active {
|
||
return "", errors.New("copy already running")
|
||
}
|
||
|
||
database := c.getDB()
|
||
if database == nil {
|
||
return "", errors.New("no disk database available")
|
||
}
|
||
|
||
if opts.DestFolder == "" {
|
||
opts.DestFolder = "media"
|
||
}
|
||
|
||
t := c.tasks.Create("copy")
|
||
copyCtx, cancel := context.WithCancel(ctx)
|
||
c.cancel = cancel
|
||
|
||
go c.run(copyCtx, t.ID, opts, database)
|
||
return t.ID, nil
|
||
}
|
||
|
||
func (c *Copier) Cancel() {
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
if c.cancel != nil {
|
||
c.cancel()
|
||
}
|
||
}
|
||
|
||
func (c *Copier) run(ctx context.Context, taskID string, opts Options, database *db.DB) {
|
||
setStatus := func(s task.Status, msg string, prog int) {
|
||
c.tasks.Update(taskID, func(t *task.Task) {
|
||
t.Status = s
|
||
t.Message = msg
|
||
t.Progress = prog
|
||
})
|
||
}
|
||
fail := func(err error) {
|
||
c.tasks.Update(taskID, func(t *task.Task) {
|
||
t.Status = task.StatusFailed
|
||
t.Error = err.Error()
|
||
})
|
||
}
|
||
|
||
setStatus(task.StatusRunning, "Подготовка…", 0)
|
||
|
||
destRoot := filepath.Join(opts.MountPath, opts.DestFolder)
|
||
|
||
if opts.OverwriteMode == config.OverwriteDelete {
|
||
setStatus(task.StatusRunning, "Удаление данных с диска…", 0)
|
||
if err := os.RemoveAll(destRoot); err != nil {
|
||
fail(err)
|
||
return
|
||
}
|
||
}
|
||
|
||
var copiedPaths map[string]struct{}
|
||
if opts.FileSelectMode == config.SelectNew {
|
||
setStatus(task.StatusRunning, "Загрузка истории…", 0)
|
||
var err error
|
||
copiedPaths, err = database.CopiedPaths(opts.DiskID)
|
||
if err != nil {
|
||
fail(err)
|
||
return
|
||
}
|
||
}
|
||
|
||
setStatus(task.StatusRunning, "Сканирование источников…", 0)
|
||
files, err := buildFileList(opts.MediaPath, opts.EnabledSources, copiedPaths)
|
||
if err != nil {
|
||
fail(err)
|
||
return
|
||
}
|
||
if len(files) == 0 {
|
||
setStatus(task.StatusSuccess, "Нет новых файлов для копирования.", 100)
|
||
return
|
||
}
|
||
|
||
_, free, err := disk.DiskUsage(opts.MountPath)
|
||
if err != nil {
|
||
fail(err)
|
||
return
|
||
}
|
||
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
|
||
available := free - reserveBytes
|
||
if available <= 0 {
|
||
setStatus(task.StatusSuccess, "Недостаточно свободного места на диске.", 100)
|
||
return
|
||
}
|
||
|
||
total := len(files)
|
||
copied := 0
|
||
for i, f := range files {
|
||
select {
|
||
case <-ctx.Done():
|
||
c.tasks.Update(taskID, func(t *task.Task) {
|
||
t.Status = task.StatusCanceled
|
||
t.Message = "Отменено"
|
||
})
|
||
return
|
||
default:
|
||
}
|
||
|
||
if f.size > available {
|
||
continue
|
||
}
|
||
|
||
msg := fmt.Sprintf("Копирование %s (%d/%d)", filepath.Base(f.srcAbs), i+1, total)
|
||
prog := int(float64(i+1) / float64(total) * 100)
|
||
setStatus(task.StatusRunning, msg, prog)
|
||
|
||
// destination mirrors source structure under destRoot
|
||
dstAbs := filepath.Join(destRoot, f.relPath)
|
||
|
||
if err := rsyncFile(ctx, f.srcAbs, dstAbs); err != nil {
|
||
if errors.Is(err, context.Canceled) {
|
||
c.tasks.Update(taskID, func(t *task.Task) {
|
||
t.Status = task.StatusCanceled
|
||
t.Message = "Отменено"
|
||
})
|
||
return
|
||
}
|
||
continue
|
||
}
|
||
|
||
available -= f.size
|
||
copied++
|
||
_ = database.RecordCopy(db.CopyRecord{
|
||
DiskID: opts.DiskID,
|
||
SourcePath: f.relPath,
|
||
FileSize: f.size,
|
||
})
|
||
}
|
||
|
||
setStatus(task.StatusSuccess, fmt.Sprintf("Готово. Скопировано файлов: %d.", copied), 100)
|
||
}
|
||
|
||
type fileEntry struct {
|
||
srcAbs string
|
||
relPath string // relative to /media
|
||
size int64
|
||
}
|
||
|
||
func buildFileList(mediaPath string, sources []string, skip map[string]struct{}) ([]fileEntry, error) {
|
||
var result []fileEntry
|
||
for _, src := range sources {
|
||
dir := filepath.Join(mediaPath, src)
|
||
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
|
||
if err != nil || d.IsDir() {
|
||
return nil
|
||
}
|
||
rel, _ := filepath.Rel(mediaPath, path)
|
||
if _, skipped := skip[rel]; skipped {
|
||
return nil
|
||
}
|
||
info, err := d.Info()
|
||
if err != nil {
|
||
return nil
|
||
}
|
||
result = append(result, fileEntry{srcAbs: path, relPath: rel, size: info.Size()})
|
||
return nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
// rsyncFile copies src to dst using rsync with resume support.
|
||
// --partial keeps partial files on interruption.
|
||
// --append-verify resumes partial transfers and verifies checksums.
|
||
func rsyncFile(ctx context.Context, src, dst string) error {
|
||
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
|
||
return err
|
||
}
|
||
cmd := exec.CommandContext(ctx, "rsync",
|
||
"--partial",
|
||
"--append-verify",
|
||
"--times",
|
||
"--no-perms",
|
||
"--no-owner",
|
||
"--no-group",
|
||
"--chmod=ugo=rwx",
|
||
src, dst,
|
||
)
|
||
out, err := cmd.CombinedOutput()
|
||
if err != nil {
|
||
if ctx.Err() != nil {
|
||
return ctx.Err()
|
||
}
|
||
return fmt.Errorf("rsync: %w: %s", err, out)
|
||
}
|
||
return nil
|
||
}
|