Implement async manual CSV ingest, unified UI pagination/filters, and serial placeholder strategy
This commit is contained in:
298
internal/ingest/parser_manual_csv.go
Normal file
298
internal/ingest/parser_manual_csv.go
Normal file
@@ -0,0 +1,298 @@
|
||||
package ingest
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ManualCSVRow struct {
|
||||
Row int
|
||||
CollectedAt time.Time
|
||||
ServerSerial string
|
||||
Vendor string
|
||||
DevicePN string
|
||||
DeviceSN string
|
||||
LocationInServer *string
|
||||
FirmwareVersion *string
|
||||
EquipmentStatus *string
|
||||
}
|
||||
|
||||
type ManualCSVRowError struct {
|
||||
Row int `json:"row"`
|
||||
Field string `json:"field"`
|
||||
Message string `json:"message"`
|
||||
RawValue string `json:"raw_value,omitempty"`
|
||||
}
|
||||
|
||||
type ManualCSVParseResult struct {
|
||||
Rows []ManualCSVRow `json:"-"`
|
||||
Errors []ManualCSVRowError `json:"errors,omitempty"`
|
||||
TotalRows int `json:"total_rows"`
|
||||
}
|
||||
|
||||
func ParseManualCSV(reader io.Reader) (ManualCSVParseResult, error) {
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return ManualCSVParseResult{}, err
|
||||
}
|
||||
delimiter := detectManualCSVDelimiter(data)
|
||||
|
||||
r := csv.NewReader(bytes.NewReader(data))
|
||||
r.Comma = delimiter
|
||||
r.FieldsPerRecord = -1
|
||||
r.TrimLeadingSpace = true
|
||||
|
||||
header, err := r.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return ManualCSVParseResult{}, fmt.Errorf("csv is empty")
|
||||
}
|
||||
return ManualCSVParseResult{}, err
|
||||
}
|
||||
|
||||
collectedAtIdx := -1
|
||||
serverSerialIdx := -1
|
||||
vendorIdx := -1
|
||||
devicePNIdx := -1
|
||||
deviceSNIdx := -1
|
||||
locationIdx := -1
|
||||
firmwareIdx := -1
|
||||
statusIdx := -1
|
||||
|
||||
for i, raw := range header {
|
||||
switch normalizeManualCSVHeader(raw) {
|
||||
case "collected_at":
|
||||
collectedAtIdx = i
|
||||
case "server_serial":
|
||||
serverSerialIdx = i
|
||||
case "vendor":
|
||||
vendorIdx = i
|
||||
case "device_pn":
|
||||
devicePNIdx = i
|
||||
case "device_sn":
|
||||
deviceSNIdx = i
|
||||
case "location_in_server":
|
||||
locationIdx = i
|
||||
case "firmware_version":
|
||||
firmwareIdx = i
|
||||
case "equipment_status":
|
||||
statusIdx = i
|
||||
}
|
||||
}
|
||||
|
||||
if collectedAtIdx < 0 {
|
||||
return ManualCSVParseResult{}, fmt.Errorf("required column is missing: дата_осмотра/collected_at")
|
||||
}
|
||||
if serverSerialIdx < 0 {
|
||||
return ManualCSVParseResult{}, fmt.Errorf("required column is missing: серийный_номер_сервера/server_serial")
|
||||
}
|
||||
if vendorIdx < 0 {
|
||||
return ManualCSVParseResult{}, fmt.Errorf("required column is missing: вендор/vendor")
|
||||
}
|
||||
if devicePNIdx < 0 {
|
||||
return ManualCSVParseResult{}, fmt.Errorf("required column is missing: p/n_устройства/device_pn")
|
||||
}
|
||||
if deviceSNIdx < 0 {
|
||||
return ManualCSVParseResult{}, fmt.Errorf("required column is missing: s/n_устройства/device_sn")
|
||||
}
|
||||
|
||||
result := ManualCSVParseResult{
|
||||
Rows: make([]ManualCSVRow, 0),
|
||||
Errors: make([]ManualCSVRowError, 0),
|
||||
}
|
||||
|
||||
rowNumber := 1
|
||||
for {
|
||||
record, err := r.Read()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return ManualCSVParseResult{}, err
|
||||
}
|
||||
rowNumber++
|
||||
|
||||
if isManualCSVRowEmpty(record) {
|
||||
continue
|
||||
}
|
||||
result.TotalRows++
|
||||
|
||||
rowErrorsStart := len(result.Errors)
|
||||
|
||||
collectedRaw := csvField(record, collectedAtIdx)
|
||||
serverSerial := strings.TrimSpace(csvField(record, serverSerialIdx))
|
||||
vendor := strings.TrimSpace(csvField(record, vendorIdx))
|
||||
devicePN := strings.TrimSpace(csvField(record, devicePNIdx))
|
||||
deviceSN := strings.TrimSpace(csvField(record, deviceSNIdx))
|
||||
|
||||
var collectedAt time.Time
|
||||
if strings.TrimSpace(collectedRaw) == "" {
|
||||
result.Errors = append(result.Errors, ManualCSVRowError{
|
||||
Row: rowNumber,
|
||||
Field: "collected_at",
|
||||
Message: "collected_at is required",
|
||||
})
|
||||
} else {
|
||||
parsed, parseErr := parseManualCSVTime(collectedRaw)
|
||||
if parseErr != nil {
|
||||
result.Errors = append(result.Errors, ManualCSVRowError{
|
||||
Row: rowNumber,
|
||||
Field: "collected_at",
|
||||
Message: "collected_at must be RFC3339",
|
||||
RawValue: collectedRaw,
|
||||
})
|
||||
} else {
|
||||
collectedAt = parsed
|
||||
}
|
||||
}
|
||||
|
||||
if serverSerial == "" {
|
||||
result.Errors = append(result.Errors, ManualCSVRowError{
|
||||
Row: rowNumber,
|
||||
Field: "server_serial",
|
||||
Message: "server_serial is required",
|
||||
})
|
||||
}
|
||||
if vendor == "" {
|
||||
result.Errors = append(result.Errors, ManualCSVRowError{
|
||||
Row: rowNumber,
|
||||
Field: "vendor",
|
||||
Message: "vendor is required",
|
||||
})
|
||||
}
|
||||
if devicePN == "" {
|
||||
result.Errors = append(result.Errors, ManualCSVRowError{
|
||||
Row: rowNumber,
|
||||
Field: "device_pn",
|
||||
Message: "device_pn is required",
|
||||
})
|
||||
}
|
||||
if len(result.Errors) > rowErrorsStart {
|
||||
continue
|
||||
}
|
||||
|
||||
row := ManualCSVRow{
|
||||
Row: rowNumber,
|
||||
CollectedAt: collectedAt.UTC(),
|
||||
ServerSerial: serverSerial,
|
||||
Vendor: vendor,
|
||||
DevicePN: devicePN,
|
||||
DeviceSN: deviceSN,
|
||||
LocationInServer: normalizeStringPtr(csvField(record, locationIdx)),
|
||||
FirmwareVersion: normalizeStringPtr(csvField(record, firmwareIdx)),
|
||||
EquipmentStatus: normalizeStringPtr(csvField(record, statusIdx)),
|
||||
}
|
||||
if row.EquipmentStatus != nil && !isValidManualEquipmentStatus(*row.EquipmentStatus) {
|
||||
result.Errors = append(result.Errors, ManualCSVRowError{
|
||||
Row: rowNumber,
|
||||
Field: "equipment_status",
|
||||
Message: "equipment_status must be one of: Рабочий, Не рабочий, Не проверял",
|
||||
RawValue: *row.EquipmentStatus,
|
||||
})
|
||||
continue
|
||||
}
|
||||
result.Rows = append(result.Rows, row)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func detectManualCSVDelimiter(data []byte) rune {
|
||||
scanner := bufio.NewScanner(bytes.NewReader(data))
|
||||
if scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.Count(line, ";") > strings.Count(line, ",") {
|
||||
return ';'
|
||||
}
|
||||
}
|
||||
return ','
|
||||
}
|
||||
|
||||
func normalizeManualCSVHeader(value string) string {
|
||||
trimmed := strings.TrimSpace(strings.TrimPrefix(value, "\uFEFF"))
|
||||
normalized := strings.ToLower(trimmed)
|
||||
replacer := strings.NewReplacer(" ", "_", "-", "_", "/", "_", "\\", "_", ".", "")
|
||||
normalized = replacer.Replace(normalized)
|
||||
for strings.Contains(normalized, "__") {
|
||||
normalized = strings.ReplaceAll(normalized, "__", "_")
|
||||
}
|
||||
|
||||
switch normalized {
|
||||
case "дата_осмотра", "collected_at":
|
||||
return "collected_at"
|
||||
case "серийный_номер_сервера", "серийныйномерсервера", "server_serial":
|
||||
return "server_serial"
|
||||
case "вендор", "vendor":
|
||||
return "vendor"
|
||||
case "p_n_устройства", "pn_устройства", "device_pn", "device_model":
|
||||
return "device_pn"
|
||||
case "s_n_устройства", "sn_устройства", "device_sn":
|
||||
return "device_sn"
|
||||
case "локейшн_в_сервере", "location_in_server", "location", "slot":
|
||||
return "location_in_server"
|
||||
case "версия_прошивки", "firmware_version", "firmware":
|
||||
return "firmware_version"
|
||||
case "состояние_оборудования", "состояние", "equipment_status":
|
||||
return "equipment_status"
|
||||
default:
|
||||
return normalized
|
||||
}
|
||||
}
|
||||
|
||||
func csvField(record []string, index int) string {
|
||||
if index < 0 || index >= len(record) {
|
||||
return ""
|
||||
}
|
||||
return record[index]
|
||||
}
|
||||
|
||||
func isManualCSVRowEmpty(record []string) bool {
|
||||
for _, value := range record {
|
||||
if strings.TrimSpace(value) != "" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func parseManualCSVTime(value string) (time.Time, error) {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
parsed, err := time.Parse(time.RFC3339, trimmed)
|
||||
if err == nil {
|
||||
return parsed.UTC(), nil
|
||||
}
|
||||
parsed, err = time.Parse(time.RFC3339Nano, trimmed)
|
||||
if err == nil {
|
||||
return parsed.UTC(), nil
|
||||
}
|
||||
return time.Time{}, err
|
||||
}
|
||||
|
||||
func normalizeStringPtr(value string) *string {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
if trimmed == "" {
|
||||
return nil
|
||||
}
|
||||
return &trimmed
|
||||
}
|
||||
|
||||
func isValidManualEquipmentStatus(value string) bool {
|
||||
normalized := normalizeManualEquipmentStatus(value)
|
||||
switch normalized {
|
||||
case "рабочий", "нерабочий", "непроверял", "working", "notworking", "notchecked", "ok", "critical", "unknown":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeManualEquipmentStatus(value string) string {
|
||||
normalized := strings.ToLower(strings.TrimSpace(value))
|
||||
replacer := strings.NewReplacer("-", "", "_", "", " ", "")
|
||||
return replacer.Replace(normalized)
|
||||
}
|
||||
Reference in New Issue
Block a user