Files
jukebox_maker/internal/copier/copier.go
Michael Chus 839ff494a4 Switch dashboard to watcher-based multi-disk view, fix transcoding FPS display
- dashboard.html: remove standalone "Mounted Disk" input panel; show all disks
  from GET /api/disks (watcher), auto-refresh every 5s
- detect.go: use avg_frame_rate when r_frame_rate is unrealistic (>120 fps or 0),
  fixes MJPEG/mjpeg showing 90000fps
- transcoder.go: parse fps= from ffmpeg progress output and expose in Progress struct
- copier.go: update task message with real-time encoding fps (@ 45.3 fps),
  clear speed_bps/eta during transcoding to avoid showing stale copy speed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 21:51:39 +03:00

779 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package copier
import (
"context"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"io"
"math/rand/v2"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"jukebox_maker/internal/config"
"jukebox_maker/internal/db"
"jukebox_maker/internal/disk"
"jukebox_maker/internal/task"
"jukebox_maker/internal/transcoder"
)
type Options struct {
DiskID string
MountPath string
MediaPath string
DestFolder string // subfolder on disk, default "media"
SourceRules []config.SourceFolder
AllowedExtensions []string
ReserveFreeGB float64
OverwriteMode config.OverwriteMode
FileSelectMode config.FileSelectMode
Transcode *disk.TranscodeProfile // nil = не транскодировать
// ShuffleDepth: -1=выкл, 0=файлы вразнобой, 1+=группировка по папке на глубине N
ShuffleDepth int
}
type Copier struct {
tasks *task.Store
mu sync.Mutex
cancels map[string]context.CancelFunc
dbMu sync.RWMutex
dbs map[string]*db.DB
}
func New(tasks *task.Store) *Copier {
return &Copier{
tasks: tasks,
cancels: make(map[string]context.CancelFunc),
dbs: make(map[string]*db.DB),
}
}
func (c *Copier) SetDB(diskID string, d *db.DB) {
c.dbMu.Lock()
if d == nil {
delete(c.dbs, diskID)
} else {
c.dbs[diskID] = d
}
c.dbMu.Unlock()
}
func (c *Copier) getDB(diskID string) *db.DB {
c.dbMu.RLock()
defer c.dbMu.RUnlock()
return c.dbs[diskID]
}
func (c *Copier) LastCopiedAt(diskID string) (time.Time, bool, error) {
database := c.getDB(diskID)
if database == nil {
return time.Time{}, false, nil
}
return database.LastCopiedAt(diskID)
}
func (c *Copier) Start(ctx context.Context, opts Options) (string, error) {
return c.startTask(ctx, "", opts)
}
func (c *Copier) Resume(ctx context.Context, taskID string, opts Options) error {
_, err := c.startTask(ctx, taskID, opts)
return err
}
func (c *Copier) startTask(ctx context.Context, existingTaskID string, opts Options) (string, error) {
c.mu.Lock()
defer c.mu.Unlock()
if _, active := c.cancels[opts.DiskID]; active {
return "", errors.New("copy already running")
}
database := c.getDB(opts.DiskID)
if database == nil {
return "", errors.New("no disk database available")
}
if opts.DestFolder == "" {
opts.DestFolder = config.DefaultDestFolder
}
destFolder, err := config.NormalizeDestFolder(opts.DestFolder)
if err != nil {
destFolder = config.DefaultDestFolder
}
opts.DestFolder = destFolder
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
return "", err
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
if free <= reserveBytes {
return "", errors.New("free space is below reserve threshold")
}
var taskID string
if existingTaskID == "" {
t := c.tasks.Create("copy", opts.DiskID)
payload, err := json.Marshal(opts)
if err != nil {
return "", err
}
if err := database.UpsertTask(*t, payload); err != nil {
return "", err
}
taskID = t.ID
} else {
taskID = existingTaskID
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusQueued
t.Phase = task.PhaseQueued
t.Message = "Resuming after restart..."
t.Error = ""
t.SpeedBPS = 0
t.ETASec = 0
})
if t, ok := c.tasks.Get(taskID); ok {
if err := database.UpdateTask(*t); err != nil {
return "", err
}
}
}
copyCtx, cancel := context.WithCancel(ctx)
c.cancels[opts.DiskID] = cancel
go c.run(copyCtx, taskID, opts, database)
return taskID, nil
}
func (c *Copier) Cancel(diskID string) {
c.mu.Lock()
defer c.mu.Unlock()
if cancel, ok := c.cancels[diskID]; ok {
cancel()
}
}
func (c *Copier) run(ctx context.Context, taskID string, opts Options, database *db.DB) {
defer func() {
c.mu.Lock()
delete(c.cancels, opts.DiskID)
c.mu.Unlock()
}()
setStatus := func(s task.Status, msg string, prog int) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = s
t.Message = msg
t.Progress = prog
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
}
fail := func(err error) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusFailed
t.Error = err.Error()
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
}
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhasePreparing
t.Message = "Preparing..."
t.Progress = 0
t.Error = ""
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
destRoot := filepath.Join(opts.MountPath, opts.DestFolder)
if opts.OverwriteMode == config.OverwriteDelete {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseReplacing
t.Message = "Replacing destination media..."
t.Progress = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
if err := os.RemoveAll(destRoot); err != nil {
fail(err)
return
}
}
var copiedPaths map[string]struct{}
if opts.FileSelectMode == config.SelectNew {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseLoadingHistory
t.Message = "Loading copy history..."
t.Progress = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
var err error
copiedPaths, err = database.CopiedPaths(opts.DiskID)
if err != nil {
fail(err)
return
}
}
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseScanning
t.Message = "Scanning sources..."
t.Progress = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
files, err := buildFileList(opts.MediaPath, opts.SourceRules, copiedPaths, opts.AllowedExtensions)
if err != nil {
fail(err)
return
}
if len(files) == 0 {
setStatus(task.StatusSuccess, "No files to copy.", 100)
return
}
files = applyShuffleDepth(files, opts.ShuffleDepth)
_, free, err := disk.DiskUsage(opts.MountPath)
if err != nil {
fail(err)
return
}
reserveBytes := int64(opts.ReserveFreeGB * 1e9)
available := free - reserveBytes
if available <= 0 {
setStatus(task.StatusFailed, "Free space is below the reserved threshold.", 100)
return
}
// суммарный объём для прогресса (всех файлов в списке)
var totalBytes int64
for _, f := range files {
totalBytes += f.size
}
total := len(files)
copied := 0
var doneBytes int64
startTime := time.Now()
for i, f := range files {
select {
case <-ctx.Done():
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Canceled"
t.SpeedBPS = 0
t.ETASec = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
return
default:
}
if f.size > available {
continue
}
elapsed := time.Since(startTime).Seconds()
var speedBPS, etaSec int64
if elapsed > 0 && doneBytes > 0 {
speedBPS = int64(float64(doneBytes) / elapsed)
remaining := totalBytes - doneBytes
if speedBPS > 0 {
etaSec = remaining / speedBPS
}
}
prog := int(float64(doneBytes) / float64(totalBytes) * 100)
msg := fmt.Sprintf("Copying %s (%d/%d)", filepath.Base(f.srcAbs), i+1, total)
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusRunning
t.Phase = task.PhaseCopying
t.Message = msg
t.Progress = prog
t.SpeedBPS = speedBPS
t.ETASec = int(etaSec)
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
dstAbs := filepath.Join(destRoot, f.relPath)
var fileErr error
if opts.Transcode != nil && isVideoFile(f.srcAbs) {
fileErr = c.processVideo(ctx, taskID, database, opts.Transcode, f.srcAbs, dstAbs)
} else {
fileErr = copyFile(ctx, f.srcAbs, dstAbs)
}
if fileErr != nil {
if errors.Is(fileErr, context.Canceled) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Status = task.StatusCanceled
t.Message = "Canceled"
t.SpeedBPS = 0
t.ETASec = 0
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
return
}
continue
}
available -= f.size
doneBytes += f.size
copied++
_ = database.RecordCopy(db.CopyRecord{
DiskID: opts.DiskID,
SourcePath: f.relPath,
FileSize: f.size,
})
}
setStatus(task.StatusSuccess, fmt.Sprintf("Done. Copied %d files.", copied), 100)
}
// videoExtensions — расширения видеофайлов из встроенного справочника.
var videoExtensions = func() map[string]struct{} {
exts := config.BuiltInMediaTypeExtensions()[config.MediaTypeVideo]
set := make(map[string]struct{}, len(exts))
for _, e := range exts {
set[e] = struct{}{}
}
return set
}()
func isVideoFile(path string) bool {
_, ok := videoExtensions[strings.ToLower(filepath.Ext(path))]
return ok
}
// processVideo определяет: транскодировать или скопировать файл.
func (c *Copier) processVideo(ctx context.Context, taskID string, database *db.DB, profile *disk.TranscodeProfile, src, dst string) error {
info, err := transcoder.ProbeVideo(src)
if err != nil {
// Не смогли зондировать — просто копируем
return copyFile(ctx, src, dst)
}
if !transcoder.NeedsTranscode(info, profile) {
return copyFile(ctx, src, dst)
}
// Меняем расширение выходного файла под формат контейнера
ext := transcoder.OutputExt(profile.OutputFormat)
dstTranscoded := strings.TrimSuffix(dst, filepath.Ext(dst)) + ext
srcInfo := fmt.Sprintf("%s/%dch/%.0ffps", info.Codec, info.AudioChannels, info.FPS)
dstInfo := fmt.Sprintf("%s/%s/%dfps %s", profile.VideoCodec, profile.AudioCodec, profile.MaxFPS, profile.OutputFormat)
baseMsg := fmt.Sprintf("Transcoding %s (%s → %s)", filepath.Base(src), srcInfo, dstInfo)
c.tasks.Update(taskID, func(t *task.Task) {
t.Phase = task.PhaseTranscoding
t.Message = baseMsg
})
if t, ok := c.tasks.Get(taskID); ok {
_ = database.UpdateTask(*t)
}
progressFn := func(p transcoder.Progress) {
c.tasks.Update(taskID, func(t *task.Task) {
t.Progress = int(p.Pct * 100)
t.SpeedBPS = 0
t.ETASec = 0
if p.EncodeFPS > 0 {
t.Message = fmt.Sprintf("%s @ %.1f fps", baseMsg, p.EncodeFPS)
}
})
}
return transcoder.Transcode(ctx, transcoder.Options{
Input: src,
Output: dstTranscoded,
Profile: profile,
SourceInfo: info,
}, progressFn)
}
type fileEntry struct {
srcAbs string
relPath string // relative to /media
size int64
}
func buildFileList(mediaPath string, rules []config.SourceFolder, skip map[string]struct{}, allowedExtensions []string) ([]fileEntry, error) {
_ = mediaPath
roots, selectedRoots, ruleMap := normalizeSourceRules(rules)
aliases := sourceAliases(roots)
allowedExts := makeAllowedExtensionSet(allowedExtensions)
var result []fileEntry
for _, src := range selectedRoots {
root := owningRoot(src, roots)
if root == "" {
root = src
}
alias := aliases[root]
if alias == "" {
alias = filepath.Base(root)
if alias == "." || alias == "" || alias == string(filepath.Separator) {
alias = "source-" + shortHash(root)
}
}
dir := src
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
if err != nil {
return nil
}
if path == dir {
return nil
}
rel, relErr := filepath.Rel(root, path)
if relErr != nil {
return nil
}
rel = filepath.ToSlash(rel)
if !isPathEnabled(path, ruleMap) && !hasEnabledDescendant(path, ruleMap) {
return filepath.SkipDir
}
return nil
}
if !isPathEnabled(path, ruleMap) {
return nil
}
if !isExtensionAllowed(path, allowedExts) {
return nil
}
rel, _ := filepath.Rel(root, path)
rel = filepath.ToSlash(rel)
destRel := filepath.ToSlash(filepath.Join(alias, rel))
if _, skipped := skip[rel]; skipped {
return nil
}
if _, skipped := skip[destRel]; skipped {
return nil
}
info, err := d.Info()
if err != nil {
return nil
}
result = append(result, fileEntry{srcAbs: path, relPath: destRel, size: info.Size()})
return nil
})
if err != nil {
return nil, err
}
}
return result, nil
}
func makeAllowedExtensionSet(items []string) map[string]struct{} {
normalized := config.NormalizeExtensions(items)
if len(normalized) == 0 {
normalized = config.DefaultAllowedExtensions()
}
result := make(map[string]struct{}, len(normalized))
for _, item := range normalized {
result[item] = struct{}{}
}
return result
}
func isExtensionAllowed(path string, allowed map[string]struct{}) bool {
ext := strings.ToLower(filepath.Ext(path))
if ext == "" {
return false
}
_, ok := allowed[ext]
return ok
}
func normalizeSourceRules(rules []config.SourceFolder) ([]string, []string, map[string]bool) {
ruleMap := make(map[string]bool, len(rules))
rootSet := make(map[string]struct{})
for _, rule := range rules {
src := filepath.Clean(strings.TrimSpace(rule.Path))
if src == "" || src == "." {
continue
}
ruleMap[src] = rule.Enabled
if rule.Root {
rootSet[src] = struct{}{}
}
}
var roots []string
for src := range rootSet {
roots = append(roots, src)
}
sort.Strings(roots)
var selectedRoots []string
for src, enabled := range ruleMap {
if !enabled || hasEnabledAncestor(src, ruleMap) {
continue
}
selectedRoots = append(selectedRoots, src)
}
sort.Strings(selectedRoots)
return roots, selectedRoots, ruleMap
}
func hasEnabledAncestor(path string, ruleMap map[string]bool) bool {
for parent := parentSourcePath(path); parent != ""; parent = parentSourcePath(parent) {
if ruleMap[parent] {
return true
}
}
return false
}
func hasEnabledDescendant(path string, ruleMap map[string]bool) bool {
for other, enabled := range ruleMap {
if enabled && isPathInside(path, other) && other != path {
return true
}
}
return false
}
func isPathEnabled(path string, ruleMap map[string]bool) bool {
for current := path; current != ""; current = parentSourcePath(current) {
if enabled, ok := ruleMap[current]; ok {
return enabled
}
}
return false
}
func parentSourcePath(path string) string {
parent := filepath.Dir(path)
if parent == "." || parent == path {
return ""
}
return parent
}
func owningRoot(path string, roots []string) string {
var best string
for _, root := range roots {
if isPathInside(root, path) {
if len(root) > len(best) {
best = root
}
}
}
return best
}
func sourceAliases(roots []string) map[string]string {
counts := make(map[string]int, len(roots))
for _, root := range roots {
counts[strings.ToLower(filepath.Base(root))]++
}
aliases := make(map[string]string, len(roots))
for _, root := range roots {
base := filepath.Base(root)
if base == "." || base == string(filepath.Separator) || base == "" {
base = "source"
}
key := strings.ToLower(base)
if counts[key] > 1 {
base = fmt.Sprintf("%s-%s", base, shortHash(root))
}
aliases[root] = base
}
return aliases
}
func shortHash(value string) string {
h := fnv.New32a()
_, _ = h.Write([]byte(value))
return fmt.Sprintf("%08x", h.Sum32())[:6]
}
func isPathInside(base, candidate string) bool {
if candidate == base {
return true
}
rel, err := filepath.Rel(base, candidate)
if err != nil {
return false
}
return rel != "." && rel != ".." && !strings.HasPrefix(rel, ".."+string(os.PathSeparator))
}
func copyFile(ctx context.Context, src, dst string) error {
if err := os.MkdirAll(filepath.Dir(dst), 0o755); err != nil {
return err
}
srcFile, err := os.Open(src)
if err != nil {
return err
}
defer srcFile.Close()
srcInfo, err := srcFile.Stat()
if err != nil {
return err
}
offset := int64(0)
if dstInfo, err := os.Stat(dst); err == nil {
switch {
case dstInfo.Size() < srcInfo.Size():
offset = dstInfo.Size()
case dstInfo.Size() == srcInfo.Size():
return os.Chtimes(dst, srcInfo.ModTime(), srcInfo.ModTime())
default:
if err := os.Remove(dst); err != nil {
return err
}
}
} else if !errors.Is(err, os.ErrNotExist) {
return err
}
if offset > 0 {
if _, err := srcFile.Seek(offset, io.SeekStart); err != nil {
return err
}
}
dstFile, err := os.OpenFile(dst, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return err
}
defer dstFile.Close()
if offset > 0 {
if _, err := dstFile.Seek(offset, io.SeekStart); err != nil {
return err
}
} else if err := dstFile.Truncate(0); err != nil {
return err
}
buf := make([]byte, 1024*1024)
for {
if err := ctx.Err(); err != nil {
return err
}
nr, readErr := srcFile.Read(buf)
if nr > 0 {
nw, writeErr := dstFile.Write(buf[:nr])
if writeErr != nil {
return writeErr
}
if nw != nr {
return io.ErrShortWrite
}
}
if readErr != nil {
if errors.Is(readErr, io.EOF) {
break
}
return readErr
}
}
if err := dstFile.Sync(); err != nil {
return err
}
if err := os.Chtimes(dst, srcInfo.ModTime(), srcInfo.ModTime()); err != nil {
return err
}
dstInfo, err := os.Stat(dst)
if err != nil {
return err
}
if dstInfo.Size() != srcInfo.Size() {
return fmt.Errorf("copied size mismatch for %s", filepath.Base(src))
}
return nil
}
// applyShuffleDepth упорядочивает файлы по заданной глубине шафлера.
// depth < 0 → оригинальный порядок (без шафла)
// depth == 0 → все файлы перемешиваются случайно
// depth >= 1 → файлы группируются по папке на уровне depth от корня /media,
// группы перемешиваются, внутри каждой группы порядок сохраняется.
func applyShuffleDepth(files []fileEntry, depth int) []fileEntry {
if depth < 0 || len(files) == 0 {
return files
}
if depth == 0 {
rand.Shuffle(len(files), func(i, j int) { files[i], files[j] = files[j], files[i] })
return files
}
type group struct {
key string
files []fileEntry
}
groupMap := make(map[string]*group, 64)
var order []string
for _, f := range files {
key := folderKeyAtDepth(f.relPath, depth)
if _, ok := groupMap[key]; !ok {
groupMap[key] = &group{key: key}
order = append(order, key)
}
groupMap[key].files = append(groupMap[key].files, f)
}
rand.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] })
result := make([]fileEntry, 0, len(files))
for _, key := range order {
result = append(result, groupMap[key].files...)
}
return result
}
// folderKeyAtDepth возвращает путь к папке глубины depth из relPath.
// relPath вида "anime/Naruto/Season1/ep01.mkv", depth=2 → "anime/Naruto"
func folderKeyAtDepth(relPath string, depth int) string {
relPath = filepath.ToSlash(relPath)
parts := strings.Split(relPath, "/")
maxDepth := len(parts) - 1 // последний элемент — имя файла
if depth >= maxDepth {
depth = maxDepth
}
return strings.Join(parts[:depth], "/")
}