Add convert mode batch workflow with full progress

This commit is contained in:
2026-02-28 21:44:36 +03:00
parent bb4505a249
commit 25e3b8bb42
5 changed files with 823 additions and 84 deletions

View File

@@ -1,6 +1,7 @@
package server
import (
"archive/zip"
"bytes"
"context"
"crypto/rand"
@@ -47,7 +48,8 @@ func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
if err := r.ParseMultipartForm(uploadMultipartMaxBytes()); err != nil {
r.Body = http.MaxBytesReader(w, r.Body, uploadMultipartMaxBytes())
if err := r.ParseMultipartForm(uploadMultipartFormMemoryBytes()); err != nil {
jsonError(w, "File too large", http.StatusBadRequest)
return
}
@@ -70,61 +72,16 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
vendor string
)
if rawPkg, ok, err := parseRawExportBundle(payload); err != nil {
jsonError(w, "Failed to parse raw export bundle: "+err.Error(), http.StatusBadRequest)
result, vendor, rawPkg, err := s.analyzeUploadedFile(header.Filename, header.Header.Get("Content-Type"), payload)
if err != nil {
jsonError(w, "Failed to parse uploaded file: "+err.Error(), http.StatusBadRequest)
return
} else if ok {
replayed, replayVendor, replayErr := s.reanalyzeRawExportPackage(rawPkg)
if replayErr != nil {
jsonError(w, "Failed to reanalyze raw export package: "+replayErr.Error(), http.StatusBadRequest)
return
}
result = replayed
vendor = replayVendor
if strings.TrimSpace(vendor) == "" {
vendor = "snapshot"
}
}
if strings.TrimSpace(vendor) == "" {
vendor = "snapshot"
}
if rawPkg != nil {
s.SetRawExport(rawPkg)
} else if looksLikeJSONSnapshot(header.Filename, payload) {
if rawPkg, ok, err := parseRawExportPackage(payload); err != nil {
jsonError(w, "Failed to parse raw export package: "+err.Error(), http.StatusBadRequest)
return
} else if ok {
replayed, replayVendor, replayErr := s.reanalyzeRawExportPackage(rawPkg)
if replayErr != nil {
jsonError(w, "Failed to reanalyze raw export package: "+replayErr.Error(), http.StatusBadRequest)
return
}
result = replayed
vendor = replayVendor
if strings.TrimSpace(vendor) == "" {
vendor = "snapshot"
}
s.SetRawExport(rawPkg)
} else {
snapshotResult, snapshotErr := parseUploadedSnapshot(payload)
if snapshotErr != nil {
jsonError(w, "Failed to parse snapshot: "+snapshotErr.Error(), http.StatusBadRequest)
return
}
result = snapshotResult
vendor = strings.TrimSpace(snapshotResult.Protocol)
if vendor == "" {
vendor = "snapshot"
}
s.SetRawExport(newRawExportFromUploadedFile(header.Filename, header.Header.Get("Content-Type"), payload, result))
}
} else {
// Parse archive
p := parser.NewBMCParser()
if err := p.ParseFromReader(bytes.NewReader(payload), header.Filename); err != nil {
jsonError(w, "Failed to parse archive: "+err.Error(), http.StatusBadRequest)
return
}
result = p.Result()
applyArchiveSourceMetadata(result)
vendor = p.DetectedVendor()
s.SetRawExport(newRawExportFromUploadedFile(header.Filename, header.Header.Get("Content-Type"), payload, result))
}
s.SetResult(result)
@@ -143,13 +100,60 @@ func (s *Server) handleUpload(w http.ResponseWriter, r *http.Request) {
})
}
func (s *Server) analyzeUploadedFile(filename, mimeType string, payload []byte) (*models.AnalysisResult, string, *RawExportPackage, error) {
if rawPkg, ok, err := parseRawExportBundle(payload); err != nil {
return nil, "", nil, err
} else if ok {
result, vendor, err := s.reanalyzeRawExportPackage(rawPkg)
if err != nil {
return nil, "", nil, err
}
if strings.TrimSpace(vendor) == "" {
vendor = "snapshot"
}
return result, vendor, rawPkg, nil
}
if looksLikeJSONSnapshot(filename, payload) {
if rawPkg, ok, err := parseRawExportPackage(payload); err != nil {
return nil, "", nil, err
} else if ok {
result, vendor, err := s.reanalyzeRawExportPackage(rawPkg)
if err != nil {
return nil, "", nil, err
}
if strings.TrimSpace(vendor) == "" {
vendor = "snapshot"
}
return result, vendor, rawPkg, nil
}
snapshotResult, err := parseUploadedSnapshot(payload)
if err != nil {
return nil, "", nil, err
}
vendor := strings.TrimSpace(snapshotResult.Protocol)
if vendor == "" {
vendor = "snapshot"
}
return snapshotResult, vendor, newRawExportFromUploadedFile(filename, mimeType, payload, snapshotResult), nil
}
p := parser.NewBMCParser()
if err := p.ParseFromReader(bytes.NewReader(payload), filename); err != nil {
return nil, "", nil, err
}
result := p.Result()
applyArchiveSourceMetadata(result)
return result, p.DetectedVendor(), newRawExportFromUploadedFile(filename, mimeType, payload, result), nil
}
func uploadMultipartMaxBytes() int64 {
// Large Redfish raw bundles can easily exceed 100 MiB once raw trees and logs
// are embedded. Keep the default high but bounded for a normal workstation.
// Limit for incoming multipart request body.
const (
defMB = 512
defMB = 2048
minMB = 100
maxMB = 2048
maxMB = 8192
)
mb := defMB
if v := strings.TrimSpace(os.Getenv("LOGPILE_UPLOAD_MAX_MB")); v != "" {
@@ -166,6 +170,12 @@ func uploadMultipartMaxBytes() int64 {
return int64(mb) << 20
}
func uploadMultipartFormMemoryBytes() int64 {
// Keep a small in-memory threshold; file parts spill to temp files.
const formMemoryMB = 32
return int64(formMemoryMB) << 20
}
func (s *Server) reanalyzeRawExportPackage(pkg *RawExportPackage) (*models.AnalysisResult, string, error) {
if pkg == nil {
return nil, "", fmt.Errorf("empty package")
@@ -1076,10 +1086,320 @@ func (s *Server) handleExportReanimator(w http.ResponseWriter, r *http.Request)
}
}
func (s *Server) handleConvertReanimatorBatch(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, uploadMultipartMaxBytes())
if err := r.ParseMultipartForm(uploadMultipartFormMemoryBytes()); err != nil {
jsonError(w, "File too large", http.StatusBadRequest)
return
}
form := r.MultipartForm
if form == nil {
jsonError(w, "No files provided", http.StatusBadRequest)
return
}
files := form.File["files[]"]
if len(files) == 0 {
files = form.File["files"]
}
if len(files) == 0 {
jsonError(w, "No files provided", http.StatusBadRequest)
return
}
tempDir, err := os.MkdirTemp("", "logpile-convert-input-*")
if err != nil {
jsonError(w, "Не удалось создать временную директорию", http.StatusInternalServerError)
return
}
inputFiles := make([]convertInputFile, 0, len(files))
var skipped int
for _, fh := range files {
if fh == nil {
continue
}
if !isSupportedConvertFileName(fh.Filename) {
skipped++
continue
}
tmpFile, err := os.CreateTemp(tempDir, "input-*")
if err != nil {
continue
}
src, err := fh.Open()
if err != nil {
_ = tmpFile.Close()
_ = os.Remove(tmpFile.Name())
continue
}
_, err = io.Copy(tmpFile, src)
_ = src.Close()
_ = tmpFile.Close()
if err != nil {
_ = os.Remove(tmpFile.Name())
continue
}
mimeType := ""
if fh.Header != nil {
mimeType = fh.Header.Get("Content-Type")
}
inputFiles = append(inputFiles, convertInputFile{
Name: fh.Filename,
Path: tmpFile.Name(),
MIMEType: mimeType,
})
}
if len(inputFiles) == 0 {
_ = os.RemoveAll(tempDir)
jsonError(w, "Нет файлов поддерживаемого типа для конвертации", http.StatusBadRequest)
return
}
job := s.jobManager.CreateJob(CollectRequest{
Host: "convert.local",
Protocol: "convert",
Port: 0,
Username: "convert",
AuthType: "password",
TLSMode: "insecure",
})
s.markConvertJob(job.ID)
s.jobManager.AppendJobLog(job.ID, fmt.Sprintf("Запущена пакетная конвертация: %d файлов", len(inputFiles)))
if skipped > 0 {
s.jobManager.AppendJobLog(job.ID, fmt.Sprintf("Пропущено неподдерживаемых файлов: %d", skipped))
}
s.jobManager.UpdateJobStatus(job.ID, CollectStatusRunning, 0, "")
go s.runConvertJob(job.ID, tempDir, inputFiles, skipped, len(files))
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
_ = json.NewEncoder(w).Encode(map[string]any{
"job_id": job.ID,
"status": CollectStatusRunning,
"accepted": len(inputFiles),
"skipped": skipped,
"total_files": len(files),
})
}
type convertInputFile struct {
Name string
Path string
MIMEType string
}
func (s *Server) runConvertJob(jobID, tempDir string, inputFiles []convertInputFile, skipped, total int) {
defer os.RemoveAll(tempDir)
resultFile, err := os.CreateTemp("", "logpile-convert-result-*.zip")
if err != nil {
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, "не удалось создать zip")
return
}
resultPath := resultFile.Name()
defer resultFile.Close()
zw := zip.NewWriter(resultFile)
failures := make([]string, 0)
success := 0
totalProcess := len(inputFiles)
for i, in := range inputFiles {
s.jobManager.AppendJobLog(jobID, fmt.Sprintf("Обработка %s", in.Name))
payload, err := os.ReadFile(in.Path)
if err != nil {
failures = append(failures, fmt.Sprintf("%s: %v", in.Name, err))
continue
}
result, _, _, err := s.analyzeUploadedFile(in.Name, in.MIMEType, payload)
if err != nil {
failures = append(failures, fmt.Sprintf("%s: %v", in.Name, err))
progress := ((i + 1) * 100) / totalProcess
s.jobManager.UpdateJobStatus(jobID, CollectStatusRunning, progress, "")
continue
}
if result == nil || result.Hardware == nil {
failures = append(failures, fmt.Sprintf("%s: no hardware data", in.Name))
progress := ((i + 1) * 100) / totalProcess
s.jobManager.UpdateJobStatus(jobID, CollectStatusRunning, progress, "")
continue
}
reanimatorData, err := exporter.ConvertToReanimator(result)
if err != nil {
failures = append(failures, fmt.Sprintf("%s: %v", in.Name, err))
progress := ((i + 1) * 100) / totalProcess
s.jobManager.UpdateJobStatus(jobID, CollectStatusRunning, progress, "")
continue
}
entryPath := sanitizeZipPath(in.Name)
entry, err := zw.Create(entryPath + ".reanimator.json")
if err != nil {
failures = append(failures, fmt.Sprintf("%s: %v", in.Name, err))
progress := ((i + 1) * 100) / totalProcess
s.jobManager.UpdateJobStatus(jobID, CollectStatusRunning, progress, "")
continue
}
encoder := json.NewEncoder(entry)
encoder.SetIndent("", " ")
if err := encoder.Encode(reanimatorData); err != nil {
failures = append(failures, fmt.Sprintf("%s: %v", in.Name, err))
} else {
success++
}
progress := ((i + 1) * 100) / totalProcess
s.jobManager.UpdateJobStatus(jobID, CollectStatusRunning, progress, "")
}
if success == 0 {
_ = zw.Close()
_ = os.Remove(resultPath)
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, "Не удалось конвертировать ни один файл")
return
}
summaryLines := []string{fmt.Sprintf("Конвертировано %d из %d файлов", success, total)}
if skipped > 0 {
summaryLines = append(summaryLines, fmt.Sprintf("Пропущено неподдерживаемых: %d", skipped))
}
summaryLines = append(summaryLines, failures...)
if entry, err := zw.Create("convert-summary.txt"); err == nil {
_, _ = io.WriteString(entry, strings.Join(summaryLines, "\n"))
}
if err := zw.Close(); err != nil {
_ = os.Remove(resultPath)
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, "Не удалось упаковать результаты")
return
}
s.setConvertArtifact(jobID, ConvertArtifact{
Path: resultPath,
Summary: summaryLines[0],
})
s.jobManager.UpdateJobStatus(jobID, CollectStatusSuccess, 100, "")
}
func (s *Server) handleConvertStatus(w http.ResponseWriter, r *http.Request) {
jobID := strings.TrimSpace(r.PathValue("id"))
if !isValidCollectJobID(jobID) {
jsonError(w, "Invalid convert job id", http.StatusBadRequest)
return
}
if !s.isConvertJob(jobID) {
jsonError(w, "Convert job not found", http.StatusNotFound)
return
}
job, ok := s.jobManager.GetJob(jobID)
if !ok {
jsonError(w, "Convert job not found", http.StatusNotFound)
return
}
jsonResponse(w, job.toStatusResponse())
}
func (s *Server) handleConvertDownload(w http.ResponseWriter, r *http.Request) {
jobID := strings.TrimSpace(r.PathValue("id"))
if !isValidCollectJobID(jobID) {
jsonError(w, "Invalid convert job id", http.StatusBadRequest)
return
}
if !s.isConvertJob(jobID) {
jsonError(w, "Convert job not found", http.StatusNotFound)
return
}
job, ok := s.jobManager.GetJob(jobID)
if !ok {
jsonError(w, "Convert job not found", http.StatusNotFound)
return
}
if job.Status != CollectStatusSuccess {
jsonError(w, "Convert job is not finished yet", http.StatusConflict)
return
}
artifact, ok := s.getConvertArtifact(jobID)
if !ok || strings.TrimSpace(artifact.Path) == "" {
jsonError(w, "Convert result not found", http.StatusNotFound)
return
}
file, err := os.Open(artifact.Path)
if err != nil {
jsonError(w, "Convert result not found", http.StatusNotFound)
return
}
defer file.Close()
defer func() {
_ = os.Remove(artifact.Path)
s.clearConvertArtifact(jobID)
}()
stat, err := file.Stat()
if err != nil {
jsonError(w, "Convert result not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", "logpile-convert.zip"))
if artifact.Summary != "" {
w.Header().Set("X-Convert-Summary", artifact.Summary)
}
http.ServeContent(w, r, "logpile-convert.zip", stat.ModTime(), file)
}
func isSupportedConvertFileName(filename string) bool {
name := strings.ToLower(strings.TrimSpace(filename))
if name == "" {
return false
}
return strings.HasSuffix(name, ".zip") ||
strings.HasSuffix(name, ".tar") ||
strings.HasSuffix(name, ".tar.gz") ||
strings.HasSuffix(name, ".tgz") ||
strings.HasSuffix(name, ".json") ||
strings.HasSuffix(name, ".txt") ||
strings.HasSuffix(name, ".log")
}
func sanitizeZipPath(filename string) string {
path := filepath.Clean(filename)
if path == "." || path == "/" {
path = filepath.Base(filename)
}
path = strings.TrimPrefix(path, string(filepath.Separator))
if strings.HasPrefix(path, "..") {
path = filepath.Base(path)
}
path = filepath.ToSlash(path)
if path == "" {
path = filepath.Base(filename)
}
return path
}
func (s *Server) handleClear(w http.ResponseWriter, r *http.Request) {
s.SetResult(nil)
s.SetDetectedVendor("")
s.SetRawExport(nil)
for _, artifact := range s.clearAllConvertArtifacts() {
if strings.TrimSpace(artifact.Path) != "" {
_ = os.Remove(artifact.Path)
}
}
jsonResponse(w, map[string]string{
"status": "ok",
"message": "Data cleared",

View File

@@ -32,17 +32,26 @@ type Server struct {
result *models.AnalysisResult
detectedVendor string
rawExport *RawExportPackage
convertJobs map[string]struct{}
convertOutput map[string]ConvertArtifact
jobManager *JobManager
collectors *collector.Registry
}
type ConvertArtifact struct {
Path string
Summary string
}
func New(cfg Config) *Server {
s := &Server{
config: cfg,
mux: http.NewServeMux(),
jobManager: NewJobManager(),
collectors: collector.NewDefaultRegistry(),
config: cfg,
mux: http.NewServeMux(),
jobManager: NewJobManager(),
collectors: collector.NewDefaultRegistry(),
convertJobs: make(map[string]struct{}),
convertOutput: make(map[string]ConvertArtifact),
}
s.setupRoutes()
return s
@@ -72,6 +81,9 @@ func (s *Server) setupRoutes() {
s.mux.HandleFunc("GET /api/export/csv", s.handleExportCSV)
s.mux.HandleFunc("GET /api/export/json", s.handleExportJSON)
s.mux.HandleFunc("GET /api/export/reanimator", s.handleExportReanimator)
s.mux.HandleFunc("POST /api/convert", s.handleConvertReanimatorBatch)
s.mux.HandleFunc("GET /api/convert/{id}", s.handleConvertStatus)
s.mux.HandleFunc("GET /api/convert/{id}/download", s.handleConvertDownload)
s.mux.HandleFunc("DELETE /api/clear", s.handleClear)
s.mux.HandleFunc("POST /api/shutdown", s.handleShutdown)
s.mux.HandleFunc("POST /api/collect", s.handleCollectStart)
@@ -154,3 +166,47 @@ func (s *Server) GetDetectedVendor() string {
defer s.mu.RUnlock()
return s.detectedVendor
}
func (s *Server) markConvertJob(id string) {
s.mu.Lock()
defer s.mu.Unlock()
s.convertJobs[id] = struct{}{}
}
func (s *Server) isConvertJob(id string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.convertJobs[id]
return ok
}
func (s *Server) setConvertArtifact(id string, artifact ConvertArtifact) {
s.mu.Lock()
defer s.mu.Unlock()
s.convertOutput[id] = artifact
}
func (s *Server) getConvertArtifact(id string) (ConvertArtifact, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
artifact, ok := s.convertOutput[id]
return artifact, ok
}
func (s *Server) clearConvertArtifact(id string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.convertOutput, id)
}
func (s *Server) clearAllConvertArtifacts() []ConvertArtifact {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]ConvertArtifact, 0, len(s.convertOutput))
for _, artifact := range s.convertOutput {
out = append(out, artifact)
}
s.convertOutput = make(map[string]ConvertArtifact)
s.convertJobs = make(map[string]struct{})
return out
}