Files
jukebox_maker/internal/copier/copier.go

659 lines
15 KiB
Go

package copier
import (
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"io"
"math/rand/v2"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"jukebox_maker/internal/config"
"jukebox_maker/internal/db"
"jukebox_maker/internal/disk"
"jukebox_maker/internal/task"
)
type Options struct {
DiskID string
MountPath string
MediaPath string
DestFolder string // subfolder on disk, default "media"
SourceRules []config.SourceFolder
AllowedExtensions []string
ReserveFreeGB float64
OverwriteMode config.OverwriteMode
FileSelectMode config.FileSelectMode
}
type Copier struct {
tasks *task.Store
mu sync.Mutex
cancels map[string]context.CancelFunc
dbMu sync.RWMutex
dbs map[string]*db.DB
}
func New(tasks *task.Store) *Copier {
return &Copier{
tasks: tasks,
cancels: make(map[string]context.CancelFunc),
dbs: make(map[string]*db.DB),
}
}
func (c *Copier) SetDB(diskID string, d *db.DB) {
c.dbMu.Lock()
if d == nil {
delete(c.dbs, diskID)
} else {
c.dbs[diskID] = d
}
c.dbMu.Unlock()
}
func (c *Copier) getDB(diskID string) *db.DB {
c.dbMu.RLock()
defer c.dbMu.RUnlock()
return c.dbs[diskID]
}
func (c *Copier) LastCopiedAt(diskID string) (time.Time, bool, error) {
database := c.getDB(diskID)
if database == nil {
return time.Time{}, false, nil
}
return database.LastCopiedAt(diskID)
}
func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
return c.startTask(ctx, "", opts)
}
func (c *Copier) Resume(ctx context.Context, taskID string, opts Options) error {
_, err := c.startTask(ctx, taskID, opts)
return err
}
func (c *Copier) startTask(ctx context.Context, existingTaskID string, opts Options) (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, active := c.cancels[opts.DiskID]; active {
return "", errors.New("copy already running")
}
database := c.getDB(opts.DiskID)
if database == nil {
return "", errors.New("no disk database available")
}
if opts.DestFolder == "" {
opts.DestFolder = config.DefaultDestFolder
}
destFolder, err := config.NormalizeDestFolder(opts.DestFolder)
if err != nil {
destFolder = config.DefaultDestFolder
}
opts.DestFolder = destFolder
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
return "", err
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
if free <= reserveBytes {
return "", errors.New("free space is below reserve threshold")
}
var taskID string
if existingTaskID == "" {
t := c.tasks.Create("copy", opts.DiskID)
payload, err := json.Marshal(opts)
if err != nil {
return "", err
}
if err := database.UpsertTask(*t, payload); err != nil {
return "", err
}
taskID = t.ID
} else {
taskID = existingTaskID
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusQueued
t.Phase = task.PhaseQueued
t.Message = "Resuming after restart..."
t.Error = ""
t.SpeedBPS = 0
t.ETASec = 0
})
if t, ok := c.tasks.Get(taskID); ok {
if err := database.UpdateTask(*t); err != nil {
return "", err
}
}
}
copyCtx, cancel := context.WithCancel(ctx)
c.cancels[opts.DiskID] = cancel
go c.run(copyCtx, taskID, opts, database)
return taskID, nil
}
func (c *Copier) Cancel(diskID string) {
c.mu.Lock()
defer c.mu.Unlock()
if cancel, ok := c.cancels[diskID]; ok {
cancel()
}
}
func (c *Copier) run(ctx context.Context, taskID string, opts Options, database *db.DB) {
defer func() {
c.mu.Lock()
delete(c.cancels, opts.DiskID)
c.mu.Unlock()
}()
setStatus := func(s task.Status, msg string, prog int) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = s
t.Message = msg
t.Progress = prog
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
}
fail := func(err error) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusFailed
t.Error = err.Error()
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
}
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhasePreparing
t.Message = "Preparing..."
t.Progress = 0
t.Error = ""
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
destRoot := filepath.Join(opts.MountPath, opts.DestFolder)
if opts.OverwriteMode == config.OverwriteDelete {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseReplacing
t.Message = "Replacing destination media..."
t.Progress = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
if err := os.RemoveAll(destRoot); err != nil {
fail(err)
return
}
}
var copiedPaths map[string]struct{}
if opts.FileSelectMode == config.SelectNew {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseLoadingHistory
t.Message = "Loading copy history..."
t.Progress = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
var err error
copiedPaths, err = database.CopiedPaths(opts.DiskID)
if err != nil {
fail(err)
return
}
}
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseScanning
t.Message = "Scanning sources..."
t.Progress = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
files, err := buildFileList(opts.MediaPath, opts.SourceRules, copiedPaths, opts.AllowedExtensions)
if err != nil {
fail(err)
return
}
if len(files) == 0 {
setStatus(task.StatusSuccess, "No files to copy.", 100)
return
}
// случайный порядок — выбираем что копировать до начала копирования
rand.Shuffle(len(files), func(i, j int) { files[i], files[j] = files[j], files[i] })
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
fail(err)
return
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
available := free - reserveBytes
if available <= 0 {
setStatus(task.StatusFailed, "Free space is below the reserved threshold.", 100)
return
}
// суммарный объём для прогресса (всех файлов в списке)
var totalBytes int64
for _, f := range files {
totalBytes += f.size
}
total := len(files)
copied := 0
var doneBytes int64
startTime := time.Now()
for i, f := range files {
select {
case <-ctx.Done():
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Canceled"
t.SpeedBPS = 0
t.ETASec = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
return
default:
}
if f.size > available {
continue
}
elapsed := time.Since(startTime).Seconds()
var speedBPS, etaSec int64
if elapsed > 0 && doneBytes > 0 {
speedBPS = int64(float64(doneBytes) / elapsed)
remaining := totalBytes - doneBytes
if speedBPS > 0 {
etaSec = remaining / speedBPS
}
}
prog := int(float64(doneBytes) / float64(totalBytes) * 100)
msg := fmt.Sprintf("Copying %s (%d/%d)", filepath.Base(f.srcAbs), i+1, total)
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseCopying
t.Message = msg
t.Progress = prog
t.SpeedBPS = speedBPS
t.ETASec = int(etaSec)
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
dstAbs := filepath.Join(destRoot, f.relPath)
if err := copyFile(ctx, f.srcAbs, dstAbs); err != nil {
if errors.Is(err, context.Canceled) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Canceled"
t.SpeedBPS = 0
t.ETASec = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
return
}
continue
}
available -= f.size
doneBytes += f.size
copied++
_ = database.RecordCopy(db.CopyRecord{
DiskID: opts.DiskID,
SourcePath: f.relPath,
FileSize: f.size,
})
}
setStatus(task.StatusSuccess, fmt.Sprintf("Done. Copied %d files.", copied), 100)
}
type fileEntry struct {
srcAbs string
relPath string // relative to /media
size int64
}
func buildFileList(mediaPath string, rules []config.SourceFolder, skip map[string]struct{}, allowedExtensions []string) ([]fileEntry, error) {
_ = mediaPath
roots, selectedRoots, ruleMap := normalizeSourceRules(rules)
aliases := sourceAliases(roots)
allowedExts := makeAllowedExtensionSet(allowedExtensions)
var result []fileEntry
for _, src := range selectedRoots {
root := owningRoot(src, roots)
if root == "" {
root = src
}
alias := aliases[root]
if alias == "" {
alias = filepath.Base(root)
if alias == "." || alias == "" || alias == string(filepath.Separator) {
alias = "source-" + shortHash(root)
}
}
dir := src
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
if err != nil {
return nil
}
if path == dir {
return nil
}
rel, relErr := filepath.Rel(root, path)
if relErr != nil {
return nil
}
rel = filepath.ToSlash(rel)
if !isPathEnabled(path, ruleMap) && !hasEnabledDescendant(path, ruleMap) {
return filepath.SkipDir
}
return nil
}
if !isPathEnabled(path, ruleMap) {
return nil
}
if !isExtensionAllowed(path, allowedExts) {
return nil
}
rel, _ := filepath.Rel(root, path)
rel = filepath.ToSlash(rel)
destRel := filepath.ToSlash(filepath.Join(alias, rel))
if _, skipped := skip[rel]; skipped {
return nil
}
if _, skipped := skip[destRel]; skipped {
return nil
}
info, err := d.Info()
if err != nil {
return nil
}
result = append(result, fileEntry{srcAbs: path, relPath: destRel, size: info.Size()})
return nil
})
if err != nil {
return nil, err
}
}
return result, nil
}
func makeAllowedExtensionSet(items []string) map[string]struct{} {
normalized := config.NormalizeExtensions(items)
if len(normalized) == 0 {
normalized = config.DefaultAllowedExtensions()
}
result := make(map[string]struct{}, len(normalized))
for _, item := range normalized {
result[item] = struct{}{}
}
return result
}
func isExtensionAllowed(path string, allowed map[string]struct{}) bool {
ext := strings.ToLower(filepath.Ext(path))
if ext == "" {
return false
}
_, ok := allowed[ext]
return ok
}
func normalizeSourceRules(rules []config.SourceFolder) ([]string, []string, map[string]bool) {
ruleMap := make(map[string]bool, len(rules))
rootSet := make(map[string]struct{})
for _, rule := range rules {
src := filepath.Clean(strings.TrimSpace(rule.Path))
if src == "" || src == "." {
continue
}
ruleMap[src] = rule.Enabled
if rule.Root {
rootSet[src] = struct{}{}
}
}
var roots []string
for src := range rootSet {
roots = append(roots, src)
}
sort.Strings(roots)
var selectedRoots []string
for src, enabled := range ruleMap {
if !enabled || hasEnabledAncestor(src, ruleMap) {
continue
}
selectedRoots = append(selectedRoots, src)
}
sort.Strings(selectedRoots)
return roots, selectedRoots, ruleMap
}
func hasEnabledAncestor(path string, ruleMap map[string]bool) bool {
for parent := parentSourcePath(path); parent != ""; parent = parentSourcePath(parent) {
if ruleMap[parent] {
return true
}
}
return false
}
func hasEnabledDescendant(path string, ruleMap map[string]bool) bool {
for other, enabled := range ruleMap {
if enabled && isPathInside(path, other) && other != path {
return true
}
}
return false
}
func isPathEnabled(path string, ruleMap map[string]bool) bool {
for current := path; current != ""; current = parentSourcePath(current) {
if enabled, ok := ruleMap[current]; ok {
return enabled
}
}
return false
}
func parentSourcePath(path string) string {
parent := filepath.Dir(path)
if parent == "." || parent == path {
return ""
}
return parent
}
func owningRoot(path string, roots []string) string {
var best string
for _, root := range roots {
if isPathInside(root, path) {
if len(root) > len(best) {
best = root
}
}
}
return best
}
func sourceAliases(roots []string) map[string]string {
counts := make(map[string]int, len(roots))
for _, root := range roots {
counts[strings.ToLower(filepath.Base(root))]++
}
aliases := make(map[string]string, len(roots))
for _, root := range roots {
base := filepath.Base(root)
if base == "." || base == string(filepath.Separator) || base == "" {
base = "source"
}
key := strings.ToLower(base)
if counts[key] > 1 {
base = fmt.Sprintf("%s-%s", base, shortHash(root))
}
aliases[root] = base
}
return aliases
}
func shortHash(value string) string {
h := fnv.New32a()
_, _ = h.Write([]byte(value))
return fmt.Sprintf("%08x", h.Sum32())[:6]
}
func isPathInside(base, candidate string) bool {
if candidate == base {
return true
}
rel, err := filepath.Rel(base, candidate)
if err != nil {
return false
}
return rel != "." && rel != ".." && !strings.HasPrefix(rel, ".."+string(os.PathSeparator))
}
func copyFile(ctx context.Context, src, dst string) error {
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return err
}
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()
srcInfo, err := srcFile.Stat()
if err != nil {
return err
}
offset := int64(0)
if dstInfo, err := os.Stat(dst); err == nil {
switch {
case dstInfo.Size() < srcInfo.Size():
offset = dstInfo.Size()
case dstInfo.Size() == srcInfo.Size():
return os.Chtimes(dst, srcInfo.ModTime(), srcInfo.ModTime())
default:
if err := os.Remove(dst); err != nil {
return err
}
}
} else if !errors.Is(err, os.ErrNotExist) {
return err
}
if offset > 0 {
if _, err := srcFile.Seek(offset, io.SeekStart); err != nil {
return err
}
}
dstFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer dstFile.Close()
if offset > 0 {
if _, err := dstFile.Seek(offset, io.SeekStart); err != nil {
return err
}
} else if err := dstFile.Truncate(0); err != nil {
return err
}
buf := make([]byte, 1024*1024)
for {
if err := ctx.Err(); err != nil {
return err
}
nr, readErr := srcFile.Read(buf)
if nr > 0 {
nw, writeErr := dstFile.Write(buf[:nr])
if writeErr != nil {
return writeErr
}
if nw != nr {
return io.ErrShortWrite
}
}
if readErr != nil {
if errors.Is(readErr, io.EOF) {
break
}
return readErr
}
}
if err := dstFile.Sync(); err != nil {
return err
}
if err := os.Chtimes(dst, srcInfo.ModTime(), srcInfo.ModTime()); err != nil {
return err
}
dstInfo, err := os.Stat(dst)
if err != nil {
return err
}
if dstInfo.Size() != srcInfo.Size() {
return fmt.Errorf("copied size mismatch for %s", filepath.Base(src))
}
return nil
}