package api import ( "bytes" "database/sql" "encoding/json" "fmt" "net/http" "net/http/httptest" "os" "sync/atomic" "testing" "time" "reanimator/internal/ingest" "reanimator/internal/repository" "reanimator/internal/repository/migrate" ) func TestIngestLogBundleIdempotent(t *testing.T) { dsn := os.Getenv("DATABASE_DSN") if dsn == "" { t.Skip("DATABASE_DSN not set") } db, err := repository.Open(dsn) if err != nil { t.Fatalf("open db: %v", err) } defer db.Close() if err := applyMigrations(db); err != nil { t.Fatalf("apply migrations: %v", err) } if err := cleanupRegistry(db); err != nil { t.Fatalf("cleanup: %v", err) } projectID := insertProject(t, db, "Core") assetID := insertAsset(t, db, projectID, "server-01", "ASSET-01") mux := http.NewServeMux() RegisterIngestRoutes(mux, IngestDependencies{Service: ingest.NewService(db)}) server := httptest.NewServer(mux) defer server.Close() collectedAt := time.Now().UTC().Format(time.RFC3339) payload := map[string]any{ "asset_id": assetID, "collected_at": collectedAt, "components": []map[string]any{ {"vendor_serial": "VSN-001"}, {"vendor_serial": "VSN-002"}, }, } body, err := json.Marshal(payload) if err != nil { t.Fatalf("marshal payload: %v", err) } resp, err := http.Post(server.URL+"/ingest/logbundle", "application/json", bytes.NewReader(body)) if err != nil { t.Fatalf("post: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusCreated { t.Fatalf("expected 201, got %d", resp.StatusCode) } resp, err = http.Post(server.URL+"/ingest/logbundle", "application/json", bytes.NewReader(body)) if err != nil { t.Fatalf("post duplicate: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { t.Fatalf("expected 200 on duplicate, got %d", resp.StatusCode) } assertCount(t, db, "log_bundles", 1) assertCount(t, db, "observations", 2) assertCountQuery(t, db, "SELECT COUNT(*) FROM installations WHERE removed_at IS NULL", 2) assertCountQuery(t, db, "SELECT COUNT(*) FROM parts WHERE first_seen_at IS NOT NULL", 2) } func applyMigrations(db *sql.DB) error { return migrate.EnsureSchema(db, "migrations") } func cleanupRegistry(db *sql.DB) error { statements := []string{ "DELETE FROM failure_events", "DELETE FROM machine_firmware_states", "DELETE FROM timeline_events", "DELETE FROM observations", "DELETE FROM log_bundles", "DELETE FROM installations", "DELETE FROM parts", "DELETE FROM machines", "DELETE FROM projects", "DELETE FROM id_sequences", "DELETE FROM schema_migrations", } for _, stmt := range statements { if _, err := db.Exec(stmt); err != nil { return err } } return nil } var testIDSeq uint64 func insertProject(t *testing.T, db *sql.DB, name string) string { t.Helper() id := nextTestID("PJ") if _, err := db.Exec(`INSERT INTO projects (id, name) VALUES (?, ?)`, id, name); err != nil { t.Fatalf("insert project: %v", err) } return id } func insertAsset(t *testing.T, db *sql.DB, projectID string, name, serial string) string { t.Helper() id := nextTestID("ME") if _, err := db.Exec(`INSERT INTO machines (id, project_id, name, vendor_serial) VALUES (?, ?, ?, ?)`, id, projectID, name, serial); err != nil { t.Fatalf("insert asset: %v", err) } return id } func nextTestID(prefix string) string { next := atomic.AddUint64(&testIDSeq, 1) return fmt.Sprintf("%s-%07d", prefix, next) } func assertCount(t *testing.T, db *sql.DB, table string, expected int) { t.Helper() query := "SELECT COUNT(*) FROM " + table assertCountQuery(t, db, query, expected) } func assertCountQuery(t *testing.T, db *sql.DB, query string, expected int) { t.Helper() var count int if err := db.QueryRow(query).Scan(&count); err != nil { t.Fatalf("count query: %v", err) } if count != expected { t.Fatalf("expected %d, got %d for query %q", expected, count, query) } }