Files
core/internal/api/ingest.go

143 lines
3.5 KiB
Go

package api
import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"
"strings"
"time"
"reanimator/internal/ingest"
)
type IngestDependencies struct {
Service *ingest.Service
}
const maxLogBundleSize = 5 << 20 // 5 MiB
type ingestHandlers struct {
deps IngestDependencies
}
func RegisterIngestRoutes(mux *http.ServeMux, deps IngestDependencies) {
h := ingestHandlers{deps: deps}
mux.HandleFunc("/ingest/logbundle", h.handleLogBundle)
}
func (h ingestHandlers) handleLogBundle(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
r.Body = http.MaxBytesReader(w, r.Body, maxLogBundleSize)
body, err := io.ReadAll(r.Body)
if err != nil {
var maxErr *http.MaxBytesError
if errors.As(err, &maxErr) {
writeError(w, http.StatusRequestEntityTooLarge, "payload too large")
return
}
writeError(w, http.StatusBadRequest, "invalid body")
return
}
var req struct {
AssetID int64 `json:"asset_id"`
CollectedAt string `json:"collected_at"`
Source *string `json:"source"`
Components []struct {
VendorSerial string `json:"vendor_serial"`
Vendor *string `json:"vendor"`
Model *string `json:"model"`
LotID *int64 `json:"lot_id"`
Firmware *string `json:"firmware_version"`
} `json:"components"`
}
if err := decodeJSONBytes(body, &req); err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
if req.AssetID <= 0 {
writeError(w, http.StatusBadRequest, "asset_id is required")
return
}
if strings.TrimSpace(req.CollectedAt) == "" {
writeError(w, http.StatusBadRequest, "collected_at is required")
return
}
collectedAt, err := time.Parse(time.RFC3339, req.CollectedAt)
if err != nil {
writeError(w, http.StatusBadRequest, "collected_at must be RFC3339")
return
}
if len(req.Components) == 0 {
writeError(w, http.StatusBadRequest, "components is required")
return
}
observations := make([]ingest.ObservationInput, 0, len(req.Components))
for _, component := range req.Components {
if strings.TrimSpace(component.VendorSerial) == "" {
writeError(w, http.StatusBadRequest, "components.vendor_serial is required")
return
}
observations = append(observations, ingest.ObservationInput{
VendorSerial: strings.TrimSpace(component.VendorSerial),
Vendor: component.Vendor,
Model: component.Model,
LotID: component.LotID,
Firmware: component.Firmware,
})
}
canonicalPayload, err := json.Marshal(req)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid body")
return
}
input := ingest.LogBundleInput{
AssetID: req.AssetID,
CollectedAt: collectedAt,
ContentHash: ingest.HashPayload(canonicalPayload),
Payload: canonicalPayload,
Source: req.Source,
Observations: observations,
}
result, err := h.deps.Service.IngestLogBundle(r.Context(), input)
if err != nil {
switch err {
case ingest.ErrConflict:
writeError(w, http.StatusConflict, "ingest conflict")
default:
writeError(w, http.StatusInternalServerError, "ingest failed")
}
return
}
status := http.StatusCreated
if result.Duplicate {
status = http.StatusOK
}
writeJSON(w, status, result)
}
func decodeJSONBytes(payload []byte, dest any) error {
decoder := json.NewDecoder(bytes.NewReader(payload))
decoder.DisallowUnknownFields()
if err := decoder.Decode(dest); err != nil {
return err
}
if err := decoder.Decode(&struct{}{}); err != io.EOF {
return err
}
return nil
}