- Реализован универсальный поиск по имени, email и телефону в одной строке - Добавлен счетчик общего количества клиентов - Поиск работает по нажатию Enter или кнопке 'Поиск' - Удалены неиспользуемые фильтры django-filter - Упрощен интерфейс списка клиентов - Добавлена кнопка 'Очистить' для сброса поиска
159 lines
7.2 KiB
Python
159 lines
7.2 KiB
Python
"""
|
||
Management-команда для тестового импорта клиентов из XLSX/CSV файлов.
|
||
|
||
Использование:
|
||
python manage.py test_import путь/к/файлу.xlsx --schema=anatol [--update] [--export-errors]
|
||
|
||
Примеры:
|
||
python manage.py test_import ../customers_mixflowers.by_2025-12-14_20-35-36.xlsx --schema=anatol
|
||
python manage.py test_import ../customers.csv --schema=anatol --update
|
||
python manage.py test_import ../file.xlsx --schema=anatol --export-errors
|
||
"""
|
||
from django.core.management.base import BaseCommand
|
||
from django.core.files import File
|
||
from django_tenants.utils import schema_context
|
||
from customers.services.import_export import CustomerImporter
|
||
import os
|
||
|
||
try:
|
||
from openpyxl import Workbook
|
||
from openpyxl.styles import Font, PatternFill, Alignment
|
||
except ImportError:
|
||
Workbook = None
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = 'Тестовый импорт клиентов из XLSX/CSV файла'
|
||
|
||
def add_arguments(self, parser):
|
||
parser.add_argument('file_path', type=str, help='Путь к файлу для импорта')
|
||
parser.add_argument(
|
||
'--schema',
|
||
type=str,
|
||
required=True,
|
||
help='Имя схемы БД тенанта (пример: anatol)'
|
||
)
|
||
parser.add_argument(
|
||
'--update',
|
||
action='store_true',
|
||
help='Обновлять существующих клиентов (по email/телефону)',
|
||
)
|
||
parser.add_argument(
|
||
'--export-errors',
|
||
action='store_true',
|
||
help='Экспортировать все проблемные строки в отдельный XLSX файл',
|
||
)
|
||
|
||
def handle(self, *args, **options):
|
||
file_path = options['file_path']
|
||
schema_name = options['schema']
|
||
update_existing = options['update']
|
||
export_errors = options.get('export_errors', False)
|
||
|
||
if not os.path.exists(file_path):
|
||
self.stdout.write(self.style.ERROR(f'Файл не найден: {file_path}'))
|
||
return
|
||
|
||
self.stdout.write(f'Импорт из файла: {file_path}')
|
||
self.stdout.write(f'Схема тенанта: {schema_name}')
|
||
self.stdout.write(f'Режим обновления: {"ВКЛ" if update_existing else "ВЫКЛ"}')
|
||
self.stdout.write('-' * 60)
|
||
|
||
# Выполняем импорт в контексте схемы тенанта
|
||
with schema_context(schema_name):
|
||
importer = CustomerImporter()
|
||
|
||
with open(file_path, 'rb') as f:
|
||
# Создаём простой объект-обёртку для файла
|
||
class FakeUploadedFile:
|
||
def __init__(self, file_obj, name):
|
||
self.file = file_obj
|
||
self.name = name
|
||
|
||
def __getattr__(self, attr):
|
||
# Делегируем все остальные методы внутреннему файловому объекту
|
||
return getattr(self.file, attr)
|
||
|
||
fake_file = FakeUploadedFile(f, os.path.basename(file_path))
|
||
result = importer.import_from_file(fake_file, update_existing=update_existing)
|
||
|
||
self.stdout.write(self.style.SUCCESS(f"\n{result['message']}"))
|
||
self.stdout.write('-' * 60)
|
||
self.stdout.write(f"Создано: {result['created']}")
|
||
self.stdout.write(f"Обновлено: {result['updated']}")
|
||
self.stdout.write(f"Пропущено: {result['skipped']}")
|
||
self.stdout.write(f"Ошибок: {len(result['errors'])}")
|
||
|
||
if result['errors']:
|
||
self.stdout.write('\n' + self.style.WARNING('ОШИБКИ:'))
|
||
# Показываем первые 20 ошибок, остальные — просто счётчик
|
||
for idx, error in enumerate(result['errors'][:20], 1):
|
||
row = error.get('row', '?')
|
||
email = error.get('email', '')
|
||
phone = error.get('phone', '')
|
||
reason = error.get('reason', '')
|
||
self.stdout.write(
|
||
f" [{idx}] Строка {row}: {email or phone or '(пусто)'} - {reason}"
|
||
)
|
||
|
||
if len(result['errors']) > 20:
|
||
self.stdout.write(f" ... и ещё {len(result['errors']) - 20} ошибок")
|
||
|
||
# Экспорт ошибок в XLSX
|
||
if export_errors:
|
||
self._export_errors_to_xlsx(file_path, result['real_errors'])
|
||
|
||
def _export_errors_to_xlsx(self, original_file_path, errors):
|
||
"""
|
||
Экспортирует все проблемные строки в отдельный XLSX файл.
|
||
"""
|
||
if Workbook is None:
|
||
self.stdout.write(self.style.ERROR('\nНевозможно экспортировать ошибки: openpyxl не установлен'))
|
||
return
|
||
|
||
# Формируем имя файла для ошибок
|
||
base_name = os.path.splitext(os.path.basename(original_file_path))[0]
|
||
error_file = f"{base_name}_ERRORS.xlsx"
|
||
error_path = os.path.join(os.path.dirname(original_file_path) or '.', error_file)
|
||
|
||
# Создаём новую книгу Excel
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = "Ошибки импорта"
|
||
|
||
# Заголовки с форматированием
|
||
headers = ['Строка', 'Email', 'Телефон', 'Причина ошибки']
|
||
for col_num, header in enumerate(headers, 1):
|
||
cell = ws.cell(row=1, column=col_num, value=header)
|
||
cell.font = Font(bold=True, color="FFFFFF")
|
||
cell.fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
||
cell.alignment = Alignment(horizontal="center", vertical="center")
|
||
|
||
# Данные об ошибках
|
||
for idx, error in enumerate(errors, 2):
|
||
ws.cell(row=idx, column=1, value=error.get('row', ''))
|
||
ws.cell(row=idx, column=2, value=error.get('email', ''))
|
||
ws.cell(row=idx, column=3, value=error.get('phone', ''))
|
||
ws.cell(row=idx, column=4, value=error.get('reason', ''))
|
||
|
||
# Автоподбор ширины колонок
|
||
for column in ws.columns:
|
||
max_length = 0
|
||
column_letter = column[0].column_letter
|
||
for cell in column:
|
||
try:
|
||
if cell.value:
|
||
max_length = max(max_length, len(str(cell.value)))
|
||
except:
|
||
pass
|
||
adjusted_width = min(max_length + 2, 80)
|
||
ws.column_dimensions[column_letter].width = adjusted_width
|
||
|
||
# Сохраняем файл
|
||
try:
|
||
wb.save(error_path)
|
||
self.stdout.write(self.style.SUCCESS(f"\n✓ Файл с ошибками сохранён: {error_path}"))
|
||
self.stdout.write(f" Всего строк с ошибками: {len(errors)}")
|
||
except Exception as e:
|
||
self.stdout.write(self.style.ERROR(f"\n✗ Ошибка при сохранении файла: {e}"))
|