feat(backend): add in-memory collect job manager and mock executor

This commit is contained in:
Mikhail Chusavitin
2026-02-04 10:01:51 +03:00
parent aa3c82d9ba
commit d38d0c9d30
7 changed files with 555 additions and 47 deletions

View File

@@ -0,0 +1,178 @@
package server
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)
func newCollectTestServer() (*Server, *httptest.Server) {
s := &Server{
jobManager: NewJobManager(),
}
mux := http.NewServeMux()
mux.HandleFunc("POST /api/collect", s.handleCollectStart)
mux.HandleFunc("GET /api/collect/{id}", s.handleCollectStatus)
mux.HandleFunc("POST /api/collect/{id}/cancel", s.handleCollectCancel)
return s, httptest.NewServer(mux)
}
func TestCollectLifecycleToTerminal(t *testing.T) {
_, ts := newCollectTestServer()
defer ts.Close()
body := `{"host":"bmc01.local","protocol":"redfish","port":443,"username":"admin","auth_type":"password","password":"secret","tls_mode":"strict"}`
resp, err := http.Post(ts.URL+"/api/collect", "application/json", bytes.NewBufferString(body))
if err != nil {
t.Fatalf("post collect failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("expected 202, got %d", resp.StatusCode)
}
var created CollectJobResponse
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil {
t.Fatalf("decode create response: %v", err)
}
if created.JobID == "" {
t.Fatalf("expected job id")
}
status := waitForTerminalStatus(t, ts.URL, created.JobID, 4*time.Second)
if status.Status != CollectStatusSuccess {
t.Fatalf("expected success, got %q (error=%q)", status.Status, status.Error)
}
if status.Progress == nil || *status.Progress != 100 {
t.Fatalf("expected progress 100, got %#v", status.Progress)
}
if len(status.Logs) < 4 {
t.Fatalf("expected detailed logs, got %v", status.Logs)
}
}
func TestCollectCancel(t *testing.T) {
_, ts := newCollectTestServer()
defer ts.Close()
body := `{"host":"bmc02.local","protocol":"ipmi","port":623,"username":"operator","auth_type":"token","token":"keep-me-secret","tls_mode":"insecure"}`
resp, err := http.Post(ts.URL+"/api/collect", "application/json", bytes.NewBufferString(body))
if err != nil {
t.Fatalf("post collect failed: %v", err)
}
defer resp.Body.Close()
var created CollectJobResponse
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil {
t.Fatalf("decode create response: %v", err)
}
cancelResp, err := http.Post(ts.URL+"/api/collect/"+created.JobID+"/cancel", "application/json", nil)
if err != nil {
t.Fatalf("cancel collect failed: %v", err)
}
defer cancelResp.Body.Close()
if cancelResp.StatusCode != http.StatusOK {
t.Fatalf("expected 200 cancel, got %d", cancelResp.StatusCode)
}
var canceled CollectJobStatusResponse
if err := json.NewDecoder(cancelResp.Body).Decode(&canceled); err != nil {
t.Fatalf("decode cancel response: %v", err)
}
if canceled.Status != CollectStatusCanceled {
t.Fatalf("expected canceled, got %q", canceled.Status)
}
time.Sleep(500 * time.Millisecond)
final := getCollectStatus(t, ts.URL, created.JobID, http.StatusOK)
if final.Status != CollectStatusCanceled {
t.Fatalf("expected canceled to stay terminal, got %q", final.Status)
}
}
func TestCollectNotFoundAndSecretLeak(t *testing.T) {
_, ts := newCollectTestServer()
defer ts.Close()
notFound := getCollectStatus(t, ts.URL, "job_notfound123", http.StatusNotFound)
if notFound.JobID != "" || notFound.Status != "" {
t.Fatalf("unexpected body for not found: %+v", notFound)
}
cancelResp, err := http.Post(ts.URL+"/api/collect/job_notfound123/cancel", "application/json", nil)
if err != nil {
t.Fatalf("cancel not found request failed: %v", err)
}
cancelResp.Body.Close()
if cancelResp.StatusCode != http.StatusNotFound {
t.Fatalf("expected 404 for cancel not found, got %d", cancelResp.StatusCode)
}
body := `{"host":"need-fail.local","protocol":"redfish","port":443,"username":"admin","auth_type":"password","password":"ultra-secret","tls_mode":"strict"}`
resp, err := http.Post(ts.URL+"/api/collect", "application/json", bytes.NewBufferString(body))
if err != nil {
t.Fatalf("post collect failed: %v", err)
}
defer resp.Body.Close()
var created CollectJobResponse
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil {
t.Fatalf("decode create response: %v", err)
}
status := waitForTerminalStatus(t, ts.URL, created.JobID, 4*time.Second)
if status.Status != CollectStatusFailed {
t.Fatalf("expected failed by host toggle, got %q", status.Status)
}
raw, err := json.Marshal(status)
if err != nil {
t.Fatalf("marshal status: %v", err)
}
if strings.Contains(string(raw), "ultra-secret") || strings.Contains(strings.Join(status.Logs, " "), "ultra-secret") {
t.Fatalf("secret leaked into API response or logs")
}
}
func waitForTerminalStatus(t *testing.T, baseURL, jobID string, timeout time.Duration) CollectJobStatusResponse {
t.Helper()
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
status := getCollectStatus(t, baseURL, jobID, http.StatusOK)
if status.Status == CollectStatusSuccess || status.Status == CollectStatusFailed || status.Status == CollectStatusCanceled {
return status
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("job %s did not reach terminal status before timeout", jobID)
return CollectJobStatusResponse{}
}
func getCollectStatus(t *testing.T, baseURL, jobID string, expectedCode int) CollectJobStatusResponse {
t.Helper()
resp, err := http.Get(baseURL + "/api/collect/" + jobID)
if err != nil {
t.Fatalf("get collect status failed: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != expectedCode {
t.Fatalf("expected status %d, got %d", expectedCode, resp.StatusCode)
}
if expectedCode != http.StatusOK {
return CollectJobStatusResponse{}
}
var status CollectJobStatusResponse
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
t.Fatalf("decode collect status: %v", err)
}
return status
}

View File

@@ -2,6 +2,14 @@ package server
import "time"
const (
CollectStatusQueued = "queued"
CollectStatusRunning = "running"
CollectStatusSuccess = "success"
CollectStatusFailed = "failed"
CollectStatusCanceled = "canceled"
)
type CollectRequest struct {
Host string `json:"host"`
Protocol string `json:"protocol"`
@@ -26,5 +34,50 @@ type CollectJobStatusResponse struct {
Progress *int `json:"progress,omitempty"`
Logs []string `json:"logs,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at"`
}
type CollectRequestMeta struct {
Host string `json:"host"`
Protocol string `json:"protocol"`
Port int `json:"port"`
Username string `json:"username"`
AuthType string `json:"auth_type"`
TLSMode string `json:"tls_mode"`
}
type Job struct {
ID string
Status string
Progress int
Logs []string
Error string
CreatedAt time.Time
UpdatedAt time.Time
RequestMeta CollectRequestMeta
cancel func()
}
func (j *Job) toStatusResponse() CollectJobStatusResponse {
progress := j.Progress
resp := CollectJobStatusResponse{
JobID: j.ID,
Status: j.Status,
Progress: &progress,
Logs: append([]string(nil), j.Logs...),
Error: j.Error,
CreatedAt: j.CreatedAt,
UpdatedAt: j.UpdatedAt,
}
return resp
}
func (j *Job) toJobResponse(message string) CollectJobResponse {
return CollectJobResponse{
JobID: j.ID,
Status: j.Status,
Message: message,
CreatedAt: j.CreatedAt,
}
}

View File

@@ -1,6 +1,7 @@
package server
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
@@ -572,28 +573,12 @@ func (s *Server) handleCollectStart(w http.ResponseWriter, r *http.Request) {
return
}
jobID := generateJobID()
now := time.Now().UTC()
progress := 0
s.collectMu.Lock()
s.collectJobs[jobID] = &CollectJobStatusResponse{
JobID: jobID,
Status: "queued",
Progress: &progress,
Logs: []string{"Job queued"},
UpdatedAt: now,
}
s.collectMu.Unlock()
job := s.jobManager.CreateJob(req)
s.startMockCollectionJob(job.ID, req)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(CollectJobResponse{
JobID: jobID,
Status: "queued",
Message: "Collection job accepted",
CreatedAt: now,
})
_ = json.NewEncoder(w).Encode(job.toJobResponse("Collection job accepted"))
}
func (s *Server) handleCollectStatus(w http.ResponseWriter, r *http.Request) {
@@ -603,17 +588,13 @@ func (s *Server) handleCollectStatus(w http.ResponseWriter, r *http.Request) {
return
}
s.collectMu.RLock()
job, ok := s.collectJobs[jobID]
if !ok || job == nil {
s.collectMu.RUnlock()
job, ok := s.jobManager.GetJob(jobID)
if !ok {
jsonError(w, "Collect job not found", http.StatusNotFound)
return
}
resp := *job
s.collectMu.RUnlock()
jsonResponse(w, resp)
jsonResponse(w, job.toStatusResponse())
}
func (s *Server) handleCollectCancel(w http.ResponseWriter, r *http.Request) {
@@ -623,25 +604,76 @@ func (s *Server) handleCollectCancel(w http.ResponseWriter, r *http.Request) {
return
}
s.collectMu.Lock()
job, ok := s.collectJobs[jobID]
if !ok || job == nil {
s.collectMu.Unlock()
job, ok := s.jobManager.CancelJob(jobID)
if !ok {
jsonError(w, "Collect job not found", http.StatusNotFound)
return
}
now := time.Now().UTC()
progress := 0
job.Status = "canceled"
job.Progress = &progress
job.Logs = append(job.Logs, "Job canceled by user")
job.Error = ""
job.UpdatedAt = now
resp := *job
s.collectMu.Unlock()
jsonResponse(w, job.toStatusResponse())
}
jsonResponse(w, resp)
func (s *Server) startMockCollectionJob(jobID string, req CollectRequest) {
ctx, cancel := context.WithCancel(context.Background())
if attached := s.jobManager.AttachJobCancel(jobID, cancel); !attached {
cancel()
return
}
go func() {
steps := []struct {
delay time.Duration
status string
progress int
log string
}{
{delay: 250 * time.Millisecond, status: CollectStatusRunning, progress: 20, log: "Подключение..."},
{delay: 250 * time.Millisecond, status: CollectStatusRunning, progress: 50, log: "Сбор инвентаря..."},
{delay: 250 * time.Millisecond, status: CollectStatusRunning, progress: 80, log: "Нормализация..."},
}
for _, step := range steps {
if !waitWithCancel(ctx, step.delay) {
return
}
if job, ok := s.jobManager.GetJob(jobID); !ok || isTerminalCollectStatus(job.Status) {
return
}
s.jobManager.UpdateJobStatus(jobID, step.status, step.progress, "")
s.jobManager.AppendJobLog(jobID, step.log)
}
if !waitWithCancel(ctx, 250*time.Millisecond) {
return
}
if job, ok := s.jobManager.GetJob(jobID); !ok || isTerminalCollectStatus(job.Status) {
return
}
if strings.Contains(strings.ToLower(req.Host), "fail") {
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, "Mock: не удалось завершить сбор")
s.jobManager.AppendJobLog(jobID, "Сбор завершен с ошибкой")
return
}
s.jobManager.UpdateJobStatus(jobID, CollectStatusSuccess, 100, "")
s.jobManager.AppendJobLog(jobID, "Сбор завершен")
}()
}
func waitWithCancel(ctx context.Context, d time.Duration) bool {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
func validateCollectRequest(req CollectRequest) error {

View File

@@ -0,0 +1,168 @@
package server
import (
"context"
"sync"
"time"
)
type JobManager struct {
mu sync.RWMutex
jobs map[string]*Job
}
func NewJobManager() *JobManager {
return &JobManager{
jobs: make(map[string]*Job),
}
}
func (m *JobManager) CreateJob(req CollectRequest) *Job {
now := time.Now().UTC()
job := &Job{
ID: generateJobID(),
Status: CollectStatusQueued,
Progress: 0,
Logs: []string{"Задача поставлена в очередь"},
CreatedAt: now,
UpdatedAt: now,
RequestMeta: CollectRequestMeta{
Host: req.Host,
Protocol: req.Protocol,
Port: req.Port,
Username: req.Username,
AuthType: req.AuthType,
TLSMode: req.TLSMode,
},
}
m.mu.Lock()
m.jobs[job.ID] = job
m.mu.Unlock()
return cloneJob(job)
}
func (m *JobManager) GetJob(id string) (*Job, bool) {
m.mu.RLock()
job, ok := m.jobs[id]
m.mu.RUnlock()
if !ok || job == nil {
return nil, false
}
return cloneJob(job), true
}
func (m *JobManager) CancelJob(id string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if !isTerminalCollectStatus(job.Status) {
job.Status = CollectStatusCanceled
job.Error = ""
job.UpdatedAt = time.Now().UTC()
job.Logs = append(job.Logs, "Сбор отменен пользователем")
}
cancelFn := job.cancel
job.cancel = nil
cloned := cloneJob(job)
m.mu.Unlock()
if cancelFn != nil {
cancelFn()
}
return cloned, true
}
func (m *JobManager) UpdateJobStatus(id, status string, progress int, errMsg string) (*Job, bool) {
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
if isTerminalCollectStatus(job.Status) {
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
job.Status = status
job.Progress = normalizeProgress(progress)
job.Error = errMsg
job.UpdatedAt = time.Now().UTC()
if isTerminalCollectStatus(status) {
job.cancel = nil
}
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AppendJobLog(id, message string) (*Job, bool) {
if message == "" {
return m.GetJob(id)
}
m.mu.Lock()
job, ok := m.jobs[id]
if !ok || job == nil {
m.mu.Unlock()
return nil, false
}
job.Logs = append(job.Logs, message)
job.UpdatedAt = time.Now().UTC()
cloned := cloneJob(job)
m.mu.Unlock()
return cloned, true
}
func (m *JobManager) AttachJobCancel(id string, cancelFn context.CancelFunc) bool {
m.mu.Lock()
defer m.mu.Unlock()
job, ok := m.jobs[id]
if !ok || job == nil || isTerminalCollectStatus(job.Status) {
return false
}
job.cancel = cancelFn
return true
}
func isTerminalCollectStatus(status string) bool {
switch status {
case CollectStatusSuccess, CollectStatusFailed, CollectStatusCanceled:
return true
default:
return false
}
}
func normalizeProgress(progress int) int {
if progress < 0 {
return 0
}
if progress > 100 {
return 100
}
return progress
}
func cloneJob(job *Job) *Job {
if job == nil {
return nil
}
cloned := *job
cloned.Logs = append([]string(nil), job.Logs...)
cloned.cancel = nil
return &cloned
}

View File

@@ -0,0 +1,77 @@
package server
import (
"strings"
"testing"
)
func TestJobManagerCreateGetUpdateCancel(t *testing.T) {
manager := NewJobManager()
req := CollectRequest{
Host: "bmc01.local",
Protocol: "redfish",
Port: 443,
Username: "admin",
AuthType: "password",
Password: "top-secret",
TLSMode: "strict",
}
job := manager.CreateJob(req)
if job == nil {
t.Fatalf("expected created job")
}
if job.Status != CollectStatusQueued {
t.Fatalf("expected queued status, got %q", job.Status)
}
if job.Progress != 0 {
t.Fatalf("expected progress 0, got %d", job.Progress)
}
if job.RequestMeta.Host != req.Host {
t.Fatalf("expected host in request meta")
}
if strings.Contains(strings.Join(job.Logs, " "), req.Password) {
t.Fatalf("password leaked in logs")
}
got, ok := manager.GetJob(job.ID)
if !ok {
t.Fatalf("expected job to exist")
}
if got.ID != job.ID {
t.Fatalf("wrong job id")
}
updated, ok := manager.UpdateJobStatus(job.ID, CollectStatusRunning, 42, "")
if !ok {
t.Fatalf("expected update to succeed")
}
if updated.Status != CollectStatusRunning || updated.Progress != 42 {
t.Fatalf("unexpected update snapshot: %+v", updated)
}
withLog, ok := manager.AppendJobLog(job.ID, "Сбор инвентаря...")
if !ok {
t.Fatalf("expected append to succeed")
}
if len(withLog.Logs) < 2 {
t.Fatalf("expected additional log, got %v", withLog.Logs)
}
canceled, ok := manager.CancelJob(job.ID)
if !ok {
t.Fatalf("expected cancel to succeed")
}
if canceled.Status != CollectStatusCanceled {
t.Fatalf("expected canceled status, got %q", canceled.Status)
}
canceledAgain, ok := manager.CancelJob(job.ID)
if !ok {
t.Fatalf("expected repeated cancel to succeed")
}
if canceledAgain.Status != CollectStatusCanceled {
t.Fatalf("expected canceled status after repeated cancel")
}
}

View File

@@ -29,15 +29,14 @@ type Server struct {
result *models.AnalysisResult
detectedVendor string
collectMu sync.RWMutex
collectJobs map[string]*CollectJobStatusResponse
jobManager *JobManager
}
func New(cfg Config) *Server {
s := &Server{
config: cfg,
mux: http.NewServeMux(),
collectJobs: make(map[string]*CollectJobStatusResponse),
config: cfg,
mux: http.NewServeMux(),
jobManager: NewJobManager(),
}
s.setupRoutes()
return s