Add pluggable live collectors and simplify API connect form

This commit is contained in:
Mikhail Chusavitin
2026-02-04 19:00:03 +03:00
parent 60c52b18b1
commit c89ee0118f
15 changed files with 939 additions and 212 deletions

View File

@@ -13,6 +13,7 @@ import (
"strings"
"time"
"git.mchus.pro/mchus/logpile/internal/collector"
"git.mchus.pro/mchus/logpile/internal/exporter"
"git.mchus.pro/mchus/logpile/internal/models"
"git.mchus.pro/mchus/logpile/internal/parser"
@@ -592,7 +593,7 @@ func (s *Server) handleCollectStart(w http.ResponseWriter, r *http.Request) {
}
job := s.jobManager.CreateJob(req)
s.startMockCollectionJob(job.ID, req)
s.startCollectionJob(job.ID, req)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
@@ -631,7 +632,7 @@ func (s *Server) handleCollectCancel(w http.ResponseWriter, r *http.Request) {
jsonResponse(w, job.toStatusResponse())
}
func (s *Server) startMockCollectionJob(jobID string, req CollectRequest) {
func (s *Server) startCollectionJob(jobID string, req CollectRequest) {
ctx, cancel := context.WithCancel(context.Background())
if attached := s.jobManager.AttachJobCancel(jobID, cancel); !attached {
cancel()
@@ -639,31 +640,37 @@ func (s *Server) startMockCollectionJob(jobID string, req CollectRequest) {
}
go func() {
steps := []struct {
delay time.Duration
status string
progress int
log string
}{
{delay: 250 * time.Millisecond, status: CollectStatusRunning, progress: 20, log: "Подключение..."},
{delay: 250 * time.Millisecond, status: CollectStatusRunning, progress: 50, log: "Сбор инвентаря..."},
{delay: 250 * time.Millisecond, status: CollectStatusRunning, progress: 80, log: "Нормализация..."},
connector, ok := s.getCollector(req.Protocol)
if !ok {
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, "Коннектор для протокола не зарегистрирован")
s.jobManager.AppendJobLog(jobID, "Сбор завершен с ошибкой")
return
}
for _, step := range steps {
if !waitWithCancel(ctx, step.delay) {
return
}
emitProgress := func(update collector.Progress) {
if job, ok := s.jobManager.GetJob(jobID); !ok || isTerminalCollectStatus(job.Status) {
return
}
s.jobManager.UpdateJobStatus(jobID, step.status, step.progress, "")
s.jobManager.AppendJobLog(jobID, step.log)
status := update.Status
if status == "" {
status = CollectStatusRunning
}
s.jobManager.UpdateJobStatus(jobID, status, update.Progress, "")
if update.Message != "" {
s.jobManager.AppendJobLog(jobID, update.Message)
}
}
if !waitWithCancel(ctx, 250*time.Millisecond) {
result, err := connector.Collect(ctx, toCollectorRequest(req), emitProgress)
if err != nil {
if ctx.Err() != nil {
return
}
if job, ok := s.jobManager.GetJob(jobID); !ok || isTerminalCollectStatus(job.Status) {
return
}
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, err.Error())
s.jobManager.AppendJobLog(jobID, "Сбор завершен с ошибкой")
return
}
@@ -671,31 +678,14 @@ func (s *Server) startMockCollectionJob(jobID string, req CollectRequest) {
return
}
if strings.Contains(strings.ToLower(req.Host), "fail") {
s.jobManager.UpdateJobStatus(jobID, CollectStatusFailed, 100, "Mock: не удалось завершить сбор")
s.jobManager.AppendJobLog(jobID, "Сбор завершен с ошибкой")
return
}
applyCollectSourceMetadata(result, req)
s.jobManager.UpdateJobStatus(jobID, CollectStatusSuccess, 100, "")
s.jobManager.AppendJobLog(jobID, "Сбор завершен")
s.SetResult(newAPIResult(req))
s.SetResult(result)
s.SetDetectedVendor("")
}()
}
func waitWithCancel(ctx context.Context, d time.Duration) bool {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-timer.C:
return true
}
}
func validateCollectRequest(req CollectRequest) error {
if strings.TrimSpace(req.Host) == "" {
return fmt.Errorf("field 'host' is required")
@@ -756,16 +746,34 @@ func applyArchiveSourceMetadata(result *models.AnalysisResult) {
result.CollectedAt = time.Now().UTC()
}
func newAPIResult(req CollectRequest) *models.AnalysisResult {
return &models.AnalysisResult{
SourceType: models.SourceTypeAPI,
Protocol: req.Protocol,
TargetHost: req.Host,
CollectedAt: time.Now().UTC(),
Events: make([]models.Event, 0),
FRU: make([]models.FRUInfo, 0),
Sensors: make([]models.SensorReading, 0),
func applyCollectSourceMetadata(result *models.AnalysisResult, req CollectRequest) {
if result == nil {
return
}
result.SourceType = models.SourceTypeAPI
result.Protocol = req.Protocol
result.TargetHost = req.Host
result.CollectedAt = time.Now().UTC()
}
func toCollectorRequest(req CollectRequest) collector.Request {
return collector.Request{
Host: req.Host,
Protocol: req.Protocol,
Port: req.Port,
Username: req.Username,
AuthType: req.AuthType,
Password: req.Password,
Token: req.Token,
TLSMode: req.TLSMode,
}
}
func (s *Server) getCollector(protocol string) (collector.Connector, bool) {
if s.collectors == nil {
s.collectors = collector.NewDefaultRegistry()
}
return s.collectors.Get(protocol)
}
func jsonResponse(w http.ResponseWriter, data interface{}) {