244 lines
6.2 KiB
Go
244 lines
6.2 KiB
Go
package api
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"reanimator/internal/ingest"
|
|
)
|
|
|
|
type IngestDependencies struct {
|
|
Service *ingest.Service
|
|
}
|
|
|
|
const maxLogBundleSize = 5 << 20 // 5 MiB
|
|
|
|
type ingestHandlers struct {
|
|
deps IngestDependencies
|
|
}
|
|
|
|
func RegisterIngestRoutes(mux *http.ServeMux, deps IngestDependencies) {
|
|
h := ingestHandlers{deps: deps}
|
|
mux.HandleFunc("/ingest/logbundle", h.handleLogBundle)
|
|
mux.HandleFunc("/ingest/hardware", h.handleHardware)
|
|
}
|
|
|
|
func (h ingestHandlers) handleLogBundle(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
r.Body = http.MaxBytesReader(w, r.Body, maxLogBundleSize)
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
var maxErr *http.MaxBytesError
|
|
if errors.As(err, &maxErr) {
|
|
writeError(w, http.StatusRequestEntityTooLarge, "payload too large")
|
|
return
|
|
}
|
|
writeError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
AssetID string `json:"asset_id"`
|
|
CollectedAt string `json:"collected_at"`
|
|
Source *string `json:"source"`
|
|
Components []struct {
|
|
VendorSerial string `json:"vendor_serial"`
|
|
Vendor *string `json:"vendor"`
|
|
Model *string `json:"model"`
|
|
Firmware *string `json:"firmware_version"`
|
|
} `json:"components"`
|
|
}
|
|
|
|
if err := decodeJSONBytes(body, &req); err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
if strings.TrimSpace(req.AssetID) == "" {
|
|
writeError(w, http.StatusBadRequest, "asset_id is required")
|
|
return
|
|
}
|
|
if strings.TrimSpace(req.CollectedAt) == "" {
|
|
writeError(w, http.StatusBadRequest, "collected_at is required")
|
|
return
|
|
}
|
|
collectedAt, err := time.Parse(time.RFC3339, req.CollectedAt)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "collected_at must be RFC3339")
|
|
return
|
|
}
|
|
if len(req.Components) == 0 {
|
|
writeError(w, http.StatusBadRequest, "components is required")
|
|
return
|
|
}
|
|
|
|
observations := make([]ingest.ObservationInput, 0, len(req.Components))
|
|
for _, component := range req.Components {
|
|
if strings.TrimSpace(component.VendorSerial) == "" {
|
|
writeError(w, http.StatusBadRequest, "components.vendor_serial is required")
|
|
return
|
|
}
|
|
observations = append(observations, ingest.ObservationInput{
|
|
VendorSerial: strings.TrimSpace(component.VendorSerial),
|
|
Vendor: component.Vendor,
|
|
Model: component.Model,
|
|
Firmware: component.Firmware,
|
|
})
|
|
}
|
|
|
|
canonicalPayload, err := json.Marshal(req)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
|
|
input := ingest.LogBundleInput{
|
|
AssetID: req.AssetID,
|
|
CollectedAt: collectedAt,
|
|
ContentHash: ingest.HashPayload(canonicalPayload),
|
|
Payload: canonicalPayload,
|
|
Source: req.Source,
|
|
Observations: observations,
|
|
}
|
|
|
|
result, err := h.deps.Service.IngestLogBundle(r.Context(), input)
|
|
if err != nil {
|
|
switch err {
|
|
case ingest.ErrConflict:
|
|
writeError(w, http.StatusConflict, "ingest conflict")
|
|
default:
|
|
writeError(w, http.StatusInternalServerError, "ingest failed")
|
|
}
|
|
return
|
|
}
|
|
|
|
status := http.StatusCreated
|
|
if result.Duplicate {
|
|
status = http.StatusOK
|
|
}
|
|
writeJSON(w, status, result)
|
|
}
|
|
|
|
func (h ingestHandlers) handleHardware(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
r.Body = http.MaxBytesReader(w, r.Body, maxLogBundleSize)
|
|
body, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
var maxErr *http.MaxBytesError
|
|
if errors.As(err, &maxErr) {
|
|
writeError(w, http.StatusRequestEntityTooLarge, "payload too large")
|
|
return
|
|
}
|
|
writeError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
|
|
var req ingest.HardwareIngestRequest
|
|
if err := decodeJSONBytes(body, &req); err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
targetHost := strings.TrimSpace(req.TargetHost)
|
|
if strings.TrimSpace(req.CollectedAt) == "" {
|
|
writeValidationError(w, "collected_at", "collected_at is required")
|
|
return
|
|
}
|
|
collectedAt, err := time.Parse(time.RFC3339, req.CollectedAt)
|
|
if err != nil {
|
|
writeValidationError(w, "collected_at", "collected_at must be RFC3339")
|
|
return
|
|
}
|
|
boardSerial := strings.TrimSpace(req.Hardware.Board.SerialNumber)
|
|
if boardSerial == "" {
|
|
writeValidationError(w, "hardware.board.serial_number", "serial_number is required")
|
|
return
|
|
}
|
|
|
|
components, firmware := ingest.FlattenHardwareComponents(req.Hardware)
|
|
canonicalPayload, err := json.Marshal(req)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
|
|
input := ingest.HardwareInput{
|
|
Filename: req.Filename,
|
|
SourceType: req.SourceType,
|
|
Protocol: req.Protocol,
|
|
TargetHost: targetHost,
|
|
CollectedAt: collectedAt,
|
|
ContentHash: ingest.HashPayload(canonicalPayload),
|
|
Payload: canonicalPayload,
|
|
Board: req.Hardware.Board,
|
|
Components: components,
|
|
Firmware: firmware,
|
|
}
|
|
|
|
result, err := h.deps.Service.IngestHardware(r.Context(), input)
|
|
if err != nil {
|
|
switch err {
|
|
case ingest.ErrConflict:
|
|
writeError(w, http.StatusConflict, "ingest conflict")
|
|
default:
|
|
writeError(w, http.StatusInternalServerError, "hardware ingest failed")
|
|
}
|
|
return
|
|
}
|
|
|
|
status := http.StatusCreated
|
|
if result.Duplicate {
|
|
status = http.StatusOK
|
|
}
|
|
response := map[string]any{
|
|
"status": "success",
|
|
"bundle_id": result.BundleID,
|
|
"asset_id": result.AssetID,
|
|
"collected_at": result.CollectedAt.Format(time.RFC3339),
|
|
"duplicate": result.Duplicate,
|
|
}
|
|
if result.Summary != nil {
|
|
response["summary"] = result.Summary
|
|
}
|
|
if result.Message != nil {
|
|
response["message"] = *result.Message
|
|
}
|
|
|
|
writeJSON(w, status, response)
|
|
}
|
|
|
|
func decodeJSONBytes(payload []byte, dest any) error {
|
|
decoder := json.NewDecoder(bytes.NewReader(payload))
|
|
if err := decoder.Decode(dest); err != nil {
|
|
return err
|
|
}
|
|
if err := decoder.Decode(&struct{}{}); err != io.EOF {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func writeValidationError(w http.ResponseWriter, field, message string) {
|
|
writeJSON(w, http.StatusBadRequest, map[string]any{
|
|
"status": "error",
|
|
"error": "validation_failed",
|
|
"details": map[string]string{
|
|
"field": field,
|
|
"message": message,
|
|
},
|
|
})
|
|
}
|