Files
turborfq/src/DataService.php
Michael Chus 2189805015 feat: улучшена информативность ошибок при импорте CSV
- В сообщениях об ошибках теперь отображается содержимое проблемной строки
- Добавлена специальная обработка ошибки Data truncated (обрезание данных)
- Показывается конкретное значение поля, вызвавшего ошибку
- Добавлена информация о типе и размере столбца при truncation
- Улучшено форматирование вывода ошибок (JSON для больших данных)
- Номер строки теперь соответствует номеру в исходном CSV файле (+2 для учета заголовка)
2026-01-21 04:29:56 +03:00

720 lines
27 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
<?php
namespace App;
use PDO;
class DataService
{
public function __construct(private PDO $pdo) {}
public function fetchData(
string $schema,
string $table,
array $columns,
int $page,
int $pageSize,
array $filters,
?array $sort
): array {
$offset = ($page - 1) * $pageSize;
$colNames = array_map(fn($c) => $c['COLUMN_NAME'], $columns);
$quotedColumns = array_map(fn($name) => "`$name`", $colNames);
$selectList = implode(', ', $quotedColumns);
$whereParts = [];
$params = [];
foreach ($filters as $i => $f) {
$field = $f['field'] ?? null;
$value = $f['value'] ?? null;
if (!$field || !in_array($field, $colNames, true) || $value === null || $value === '') {
continue;
}
$param = ":f{$i}";
$whereParts[] = "`$field` LIKE $param";
$params[$param] = '%' . $value . '%';
}
$whereSql = $whereParts ? 'WHERE ' . implode(' AND ', $whereParts) : '';
$orderSql = '';
if ($sort && !empty($sort['field']) && in_array($sort['field'], $colNames, true)) {
$dir = strtoupper($sort['dir'] ?? 'ASC');
if (!in_array($dir, ['ASC', 'DESC'], true)) {
$dir = 'ASC';
}
$orderSql = "ORDER BY `{$sort['field']}` $dir";
}
$sql = "SELECT $selectList
FROM `{$schema}`.`{$table}`
$whereSql
$orderSql
LIMIT :limit OFFSET :offset";
$stmt = $this->pdo->prepare($sql);
foreach ($params as $k => $v) {
$stmt->bindValue($k, $v);
}
$stmt->bindValue(':limit', $pageSize, PDO::PARAM_INT);
$stmt->bindValue(':offset', $offset, PDO::PARAM_INT);
$stmt->execute();
$rows = $stmt->fetchAll();
$countSql = "SELECT COUNT(*) AS cnt
FROM `{$schema}`.`{$table}` $whereSql";
$countStmt = $this->pdo->prepare($countSql);
foreach ($params as $k => $v) {
$countStmt->bindValue($k, $v);
}
$countStmt->execute();
$total = (int)$countStmt->fetchColumn();
$lastPage = max(1, (int)ceil($total / $pageSize));
return [
'data' => $rows,
'last_page' => $lastPage,
'total' => $total,
'current_page' => $page
];
}
public function insertRow(string $schema, string $table, array $row, array $columns): array
{
$insertCols = [];
$placeholders = [];
$params = [];
foreach ($columns as $c) {
$name = $c['COLUMN_NAME'];
$extra = $c['EXTRA'] ?? '';
if (str_contains($extra, 'auto_increment')) {
continue;
}
if (array_key_exists($name, $row)) {
$insertCols[] = "`$name`";
$placeholders[] = ":$name";
$params[":$name"] = $row[$name];
}
}
if (empty($insertCols)) {
$sql = "INSERT INTO `{$schema}`.`{$table}` () VALUES ()";
$stmt = $this->pdo->prepare($sql);
try {
$stmt->execute();
return ['inserted' => true, 'id' => $this->pdo->lastInsertId()];
} catch (\PDOException $e) {
throw new \RuntimeException($this->formatPDOError($e, $schema, $table));
}
}
$sql = sprintf(
"INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
$schema, $table,
implode(',', $insertCols),
implode(',', $placeholders)
);
$stmt = $this->pdo->prepare($sql);
try {
$stmt->execute($params);
return ['inserted' => true, 'id' => $this->pdo->lastInsertId()];
} catch (\PDOException $e) {
throw new \RuntimeException($this->formatPDOError($e, $schema, $table, $params));
}
}
private function formatPDOError(\PDOException $e, string $schema, string $table, array $params = []): string
{
$code = $e->getCode();
$message = $e->getMessage();
// Data truncated (обрезание данных)
if ($code === '01000' && str_contains($message, 'Data truncated')) {
if (preg_match('/column \'([^\']+)\'/', $message, $matches)) {
$field = $matches[1];
$value = 'N/A';
// Пытаемся найти значение в параметрах
foreach ($params as $paramName => $paramValue) {
if (str_contains($paramName, $field)) {
$value = $paramValue;
break;
}
}
if (is_numeric($value)) {
return "Ошибка: поле '{$field}' - значение '{$value}' слишком большое или имеет неверный формат.";
}
return "Ошибка: поле '{$field}' - данные обрезаны для значения '{$value}'.";
}
return "Ошибка: данные обрезаны при вставке.";
}
if ($code === '23000' && str_contains($message, 'foreign key constraint')) {
if (preg_match('/FOREIGN KEY $$`([^`]+)`$$/', $message, $matches)) {
$field = $matches[1];
return "Ошибка: поле '{$field}' должно содержать существующее значение из связанной таблицы.";
}
return "Ошибка: нарушена связь с другой таблицей. Проверьте правильность заполнения полей-ссылок.";
}
if ($code === '23000' && str_contains($message, 'Duplicate entry')) {
if (preg_match('/Duplicate entry \'([^\']+)\' for key \'([^\']+)\'/', $message, $matches)) {
$value = $matches[1];
$key = $matches[2];
return "Ошибка: значение '{$value}' уже существует (ключ '{$key}').";
}
return "Ошибка: запись с таким значением уже существует.";
}
if (str_contains($message, "cannot be null") || str_contains($message, "doesn't have a default value")) {
if (preg_match('/Column \'([^\']+)\'/', $message, $matches)) {
$field = $matches[1];
return "Ошибка: поле '{$field}' обязательно для заполнения.";
}
return "Ошибка: не заполнены обязательные поля.";
}
return "Ошибка БД: " . $message;
}
public function updateRow(string $schema, string $table, array $row, array $columns, array $pk): array
{
if (empty($pk)) {
throw new \RuntimeException('No primary key — update disabled');
}
$sets = [];
$params = [];
foreach ($columns as $c) {
$name = $c['COLUMN_NAME'];
if (in_array($name, $pk, true)) {
continue;
}
if (array_key_exists($name, $row)) {
$sets[] = "`$name` = :v_$name";
$params[":v_$name"] = $row[$name];
}
}
if (empty($sets)) {
return ['updated' => 0, 'message' => 'No changes'];
}
$whereParts = [];
foreach ($pk as $name) {
if (!array_key_exists($name, $row)) {
throw new \RuntimeException("Missing PK value: $name");
}
$whereParts[] = "`$name` = :pk_$name";
$params[":pk_$name"] = $row[$name];
}
$sql = sprintf(
"UPDATE `%s`.`%s` SET %s WHERE %s",
$schema, $table,
implode(', ', $sets),
implode(' AND ', $whereParts)
);
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
return ['updated' => $stmt->rowCount()];
}
public function deleteRow(string $schema, string $table, array $row, array $pk): array
{
if (empty($pk)) {
throw new \RuntimeException('No primary key — delete disabled');
}
$whereParts = [];
$params = [];
foreach ($pk as $name) {
if (!array_key_exists($name, $row)) {
throw new \RuntimeException("Missing PK value: $name");
}
$whereParts[] = "`$name` = :pk_$name";
$params[":pk_$name"] = $row[$name];
}
$sql = sprintf(
"DELETE FROM `%s`.`%s` WHERE %s",
$schema, $table,
implode(' AND ', $whereParts)
);
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
return ['deleted' => $stmt->rowCount()];
}
public function insertMultipleRows(string $schema, string $table, array $rows, array $columns): array
{
error_log("=== ИМПОРТ CSV: Schema=$schema, Table=$table, Строк=" . count($rows) . " ===");
if (empty($rows)) {
return ['inserted' => 0, 'errors' => 0, 'message' => 'No rows provided'];
}
$inserted = 0;
$errors = 0;
$errorMessages = [];
// Собираем информацию о всех столбцах таблицы
$validColumns = [];
$columnTypes = []; // Для детальной информации об ошибках
// Предварительная проверка данных
$warnings = [];
foreach ($rows as $index => $row) {
foreach ($row as $field => $value) {
if (isset($columnTypes[$field]) && is_numeric($value)) {
$type = $columnTypes[$field]['data_type'];
// Проверка для INT типов
if (str_contains($type, 'int')) {
if ($value > 2147483647) { // INT max value
$warnings[] = "Строка CSV #" . ($index + 2) . ": поле '$field' = '$value' превышает максимальное значение для типа INT";
}
}
// Проверка для DECIMAL/FLOAT
if (str_contains($type, 'decimal') || str_contains($type, 'float')) {
if (strlen((string)$value) > 15) {
$warnings[] = "Строка CSV #" . ($index + 2) . ": поле '$field' = '$value' может быть слишком большим";
}
}
}
}
}
if (!empty($warnings) && count($warnings) <= 20) {
error_log("Предупреждения перед импортом:\n" . implode("\n", $warnings));
}
foreach ($columns as $c) {
$name = $c['COLUMN_NAME'];
$extra = $c['EXTRA'] ?? '';
$dataType = strtolower($c['DATA_TYPE'] ?? '');
$isNullable = ($c['IS_NULLABLE'] === true || $c['IS_NULLABLE'] === 'YES');
// Сохраняем информацию о типе столбца
$columnTypes[$name] = [
'type' => $c['COLUMN_TYPE'] ?? $dataType,
'data_type' => $dataType
];
// Пропускаем только auto_increment поля
if (!str_contains($extra, 'auto_increment')) {
$validColumns[$name] = [
'nullable' => $isNullable,
'has_default' => !empty($c['COLUMN_DEFAULT']) || $c['COLUMN_DEFAULT'] === '0',
'default' => $c['COLUMN_DEFAULT'] ?? null,
'data_type' => $dataType
];
}
}
$this->pdo->beginTransaction();
try {
foreach ($rows as $index => $row) {
try {
$insertCols = [];
$placeholders = [];
$params = [];
// Перебираем ВСЕ столбцы таблицы (кроме auto_increment)
foreach ($validColumns as $name => $info) {
$value = null;
// Если столбец есть в CSV строке
if (array_key_exists($name, $row)) {
$value = $row[$name];
// Обрабатываем пустые значения как NULL
if ($value === null || $value === '' || in_array($value, ['NULL', 'null'], true)) {
$value = null;
} else {
// Конвертируем форматы дат для date/datetime полей
if (in_array($info['data_type'], ['date', 'datetime', 'timestamp'])) {
$value = $this->convertDateFormat($value);
}
}
} else {
// Столбца нет в CSV
if ($info['has_default']) {
continue;
}
if ($info['nullable']) {
$value = null;
} else {
throw new \PDOException("Missing required field: $name");
}
}
$insertCols[] = "`$name`";
$placeholders[] = ":$name";
$params[":$name"] = $value;
}
if (empty($insertCols)) {
continue;
}
$sql = sprintf(
"INSERT INTO `%s`.`%s` (%s) VALUES (%s)",
$schema, $table,
implode(',', $insertCols),
implode(',', $placeholders)
);
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
$inserted++;
} catch (\PDOException $e) {
$errors++;
// ✅ Формируем детальное сообщение об ошибке с данными строки
$csvLineNumber = $index + 2; // +2 потому что: +1 для человеческой нумерации, +1 для строки заголовка
$errorMsg = $this->formatImportError($e, $csvLineNumber, $row, $columnTypes);
$errorMessages[] = $errorMsg;
// Логируем только первые 10 ошибок
if ($errors <= 10) {
error_log("Ошибка импорта строки CSV #$csvLineNumber: " . $e->getMessage());
error_log("Данные строки: " . json_encode($row, JSON_UNESCAPED_UNICODE));
}
}
}
$this->pdo->commit();
error_log("=== ИМПОРТ ЗАВЕРШЁН: вставлено=$inserted, ошибок=$errors ===");
} catch (\Exception $e) {
$this->pdo->rollBack();
error_log("=== КРИТИЧЕСКАЯ ОШИБКА ИМПОРТА: " . $e->getMessage() . " ===");
throw new \RuntimeException('Import failed: ' . $e->getMessage());
}
return [
'inserted' => $inserted,
'errors' => $errors,
'errorMessages' => $errorMessages
];
}
/**
* Форматирует ошибку импорта с детальной информацией
*/
private function formatImportError(\PDOException $e, int $lineNumber, array $rowData, array $columnTypes): string
{
$code = $e->getCode();
$message = $e->getMessage();
$baseMsg = "Строка CSV #{$lineNumber}: ";
// Обработка Data truncated (обрезание данных)
if ($code === '01000' && str_contains($message, 'Data truncated')) {
if (preg_match('/column \'([^\']+)\'/', $message, $matches)) {
$field = $matches[1];
$value = $rowData[$field] ?? 'N/A';
$columnType = $columnTypes[$field]['type'] ?? 'unknown';
// Проверяем, не слишком ли большое число
if (is_numeric($value)) {
$valueLength = strlen((string)$value);
return $baseMsg . "Поле '{$field}' ({$columnType}): значение '{$value}' (длина: {$valueLength}) слишком большое или имеет неверный формат. " .
"Данные строки: " . $this->formatRowData($rowData);
}
return $baseMsg . "Поле '{$field}' ({$columnType}): данные обрезаны для значения '{$value}'. " .
"Данные строки: " . $this->formatRowData($rowData);
}
return $baseMsg . "Данные обрезаны. Строка: " . $this->formatRowData($rowData);
}
// Foreign Key constraint
if ($code === '23000' && str_contains($message, 'foreign key constraint')) {
if (preg_match('/FOREIGN KEY $$`([^`]+)`$$/', $message, $matches)) {
$field = $matches[1];
$value = $rowData[$field] ?? 'N/A';
return $baseMsg . "Поле '{$field}' = '{$value}': значение не существует в связанной таблице. " .
"Данные: " . $this->formatRowData($rowData);
}
return $baseMsg . "Нарушена связь с другой таблицей. Данные: " . $this->formatRowData($rowData);
}
// Duplicate key
if ($code === '23000' && str_contains($message, 'Duplicate entry')) {
if (preg_match('/Duplicate entry \'([^\']+)\' for key \'([^\']+)\'/', $message, $matches)) {
$value = $matches[1];
$key = $matches[2];
return $baseMsg . "Дубликат: значение '{$value}' (ключ '{$key}') уже существует. " .
"Данные: " . $this->formatRowData($rowData);
}
return $baseMsg . "Дубликат записи. Данные: " . $this->formatRowData($rowData);
}
// NOT NULL constraint
if (str_contains($message, "cannot be null") || str_contains($message, "doesn't have a default value")) {
if (preg_match('/Column \'([^\']+)\'/', $message, $matches)) {
$field = $matches[1];
return $baseMsg . "Поле '{$field}' обязательно для заполнения. " .
"Данные: " . $this->formatRowData($rowData);
}
return $baseMsg . "Не заполнены обязательные поля. Данные: " . $this->formatRowData($rowData);
}
// Общая ошибка БД
return $baseMsg . "Ошибка БД: " . $message . ". Данные: " . $this->formatRowData($rowData);
}
/**
* Форматирует данные строки для вывода (сокращает длинные значения)
*/
private function formatRowData(array $rowData): string
{
$formatted = [];
foreach ($rowData as $key => $value) {
if ($value === null) {
$formatted[] = "$key=NULL";
} else {
// Ограничиваем длину значения для удобства чтения
$strValue = (string)$value;
if (strlen($strValue) > 50) {
$strValue = substr($strValue, 0, 50) . '...';
}
$formatted[] = "$key='$strValue'";
}
}
return implode(', ', $formatted);
}
/**
* Конвертирует различные форматы дат в формат MySQL (YYYY-MM-DD)
*/
private function convertDateFormat(string $date): string
{
$date = trim($date);
// Уже в нужном формате YYYY-MM-DD
if (preg_match('/^\d{4}-\d{2}-\d{2}/', $date)) {
return substr($date, 0, 10);
}
// Формат DD.MM.YYYY или DD/MM/YYYY или DD-MM-YYYY
if (preg_match('/^(\d{1,2})[\.\/-](\d{1,2})[\.\/-](\d{4})/', $date, $matches)) {
$day = str_pad($matches[1], 2, '0', STR_PAD_LEFT);
$month = str_pad($matches[2], 2, '0', STR_PAD_LEFT);
$year = $matches[3];
return "$year-$month-$day";
}
// Пробуем распарсить через strtotime
$timestamp = strtotime($date);
if ($timestamp !== false) {
return date('Y-m-d', $timestamp);
}
// Если не удалось распарсить - возвращаем как есть
return $date;
}
public function exportCSV(
string $schema,
string $table,
array $columns,
array $filters,
?array $sort
): array {
$colNames = array_map(fn($c) => $c['COLUMN_NAME'], $columns);
$quotedColumns = array_map(fn($name) => "`$name`", $colNames);
$selectList = implode(', ', $quotedColumns);
$whereParts = [];
$params = [];
foreach ($filters as $i => $f) {
$field = $f['field'] ?? null;
$value = $f['value'] ?? null;
if (!$field || !in_array($field, $colNames, true) || $value === null) {
continue;
}
$param = ":f{$i}";
$whereParts[] = "`$field` LIKE $param";
$params[$param] = '%' . $value . '%';
}
$whereSql = $whereParts ? 'WHERE ' . implode(' AND ', $whereParts) : '';
$orderSql = '';
if ($sort && !empty($sort['field']) && in_array($sort['field'], $colNames, true)) {
$dir = strtoupper($sort['dir'] ?? 'ASC');
if (!in_array($dir, ['ASC', 'DESC'], true)) {
$dir = 'ASC';
}
$orderSql = "ORDER BY `{$sort['field']}` $dir";
}
$sql = "SELECT $selectList
FROM `{$schema}`.`{$table}`
$whereSql
$orderSql";
$stmt = $this->pdo->prepare($sql);
foreach ($params as $k => $v) {
$stmt->bindValue($k, $v);
}
$stmt->execute();
$rows = $stmt->fetchAll();
$csv = $this->arrayToCSV($colNames, $rows);
return [
'csv' => $csv,
'rowCount' => count($rows)
];
}
/**
* Массовое удаление строк (оптимизированное)
*/
public function deleteMultipleRows(string $schema, string $table, array $rows, array $pk): array
{
if (empty($rows)) {
return ['deleted' => 0, 'errors' => 0, 'message' => 'No rows provided'];
}
if (empty($pk)) {
throw new \RuntimeException('No primary key — delete disabled');
}
error_log("=== МАССОВОЕ УДАЛЕНИЕ: Schema=$schema, Table=$table, Строк=" . count($rows) . " ===");
$deleted = 0;
$errors = 0;
$errorMessages = [];
$this->pdo->beginTransaction();
try {
// Если PK состоит из одного поля - используем WHERE IN (самый быстрый способ)
if (count($pk) === 1) {
$pkField = $pk[0];
// Группируем по батчам (по 500 строк) для избежания превышения лимитов MySQL
$batchSize = 500;
$batches = array_chunk($rows, $batchSize);
foreach ($batches as $batchIndex => $batch) {
$pkValues = [];
foreach ($batch as $row) {
if (!array_key_exists($pkField, $row)) {
$errors++;
$errorMessages[] = "Строка не содержит PK поле: $pkField";
continue;
}
$pkValues[] = $row[$pkField];
}
if (empty($pkValues)) {
continue;
}
// Удаляем батч за один запрос
$placeholders = implode(',', array_fill(0, count($pkValues), '?'));
$sql = "DELETE FROM `{$schema}`.`{$table}` WHERE `{$pkField}` IN ($placeholders)";
$stmt = $this->pdo->prepare($sql);
$stmt->execute($pkValues);
$deletedInBatch = $stmt->rowCount();
$deleted += $deletedInBatch;
error_log("Батч " . ($batchIndex + 1) . ": удалено $deletedInBatch строк");
}
} else {
// Составной PK - удаляем по одной строке (но в одной транзакции)
foreach ($rows as $index => $row) {
try {
$whereParts = [];
$params = [];
foreach ($pk as $name) {
if (!array_key_exists($name, $row)) {
throw new \RuntimeException("Missing PK value: $name");
}
$whereParts[] = "`$name` = :pk_$name";
$params[":pk_$name"] = $row[$name];
}
$sql = sprintf(
"DELETE FROM `%s`.`%s` WHERE %s",
$schema, $table,
implode(' AND ', $whereParts)
);
$stmt = $this->pdo->prepare($sql);
$stmt->execute($params);
$deleted += $stmt->rowCount();
} catch (\PDOException $e) {
$errors++;
$errorMessages[] = "Строка " . ($index + 1) . ": " . $e->getMessage();
if ($errors <= 10) {
error_log("Ошибка удаления строки " . ($index + 1) . ": " . $e->getMessage());
}
}
}
}
$this->pdo->commit();
error_log("=== УДАЛЕНИЕ ЗАВЕРШЕНО: удалено=$deleted, ошибок=$errors ===");
} catch (\Exception $e) {
$this->pdo->rollBack();
error_log("=== КРИТИЧЕСКАЯ ОШИБКА УДАЛЕНИЯ: " . $e->getMessage() . " ===");
throw new \RuntimeException('Delete failed: ' . $e->getMessage());
}
return [
'deleted' => $deleted,
'errors' => $errors,
'errorMessages' => $errorMessages
];
}
private function arrayToCSV(array $headers, array $rows): string
{
$output = fopen('php://temp', 'r+');
fputcsv($output, $headers, ';');
foreach ($rows as $row) {
$rowData = [];
foreach ($headers as $header) {
$rowData[] = $row[$header] ?? '';
}
fputcsv($output, $rowData, ';');
}
rewind($output);
$csv = stream_get_contents($output);
fclose($output);
return $csv;
}
}