Files
octopus/myproject/customers/services/import_export.py
Andrey Smakotin 95036ed285 Добавлен настраиваемый экспорт клиентов с выбором полей и форматов
Реализован полностью новый функционал экспорта клиентов с возможностью
выбора полей, формата файла (CSV/XLSX) и сохранением предпочтений.

Ключевые изменения:

1. CustomerExporter (import_export.py):
   - Полностью переписан класс с поддержкой динамического выбора полей
   - Добавлена конфигурация AVAILABLE_FIELDS с метаданными полей
   - Реализован метод get_available_fields() для фильтрации по ролям
   - Новый метод export_to_xlsx() с автоподстройкой ширины столбцов
   - Форматирование ContactChannel с переводами строк
   - Поддержка фильтрации queryset

2. CustomerExportForm (forms.py):
   - Динамическое создание checkbox полей на основе роли пользователя
   - Выбор формата файла (CSV/XLSX) через radio buttons
   - Валидация выбора хотя бы одного поля

3. View customer_export (views.py):
   - КРИТИЧНО: Изменён декоратор с @manager_or_owner_required на @owner_required
   - Обработка GET (редирект) и POST запросов
   - Применение фильтров CustomerFilter из списка клиентов
   - Оптимизация с prefetch_related('contact_channels')
   - Сохранение настроек экспорта в session

4. UI изменения:
   - Создан шаблон customer_export_modal.html с модальным окном
   - Обновлён customer_list.html: кнопка экспорта с проверкой роли
   - JavaScript для восстановления сохранённых настроек из session
   - Отображение количества экспортируемых клиентов
   - Бейдж "Только для владельца" на поле баланса кошелька

Безопасность:
- Экспорт доступен ТОЛЬКО владельцу тенанта (OWNER) и superuser
- Поле "Баланс кошелька" скрыто от менеджеров на уровне формы
- Двойная проверка роли при экспорте баланса
- Кнопка экспорта скрыта в UI для всех кроме owner/superuser

Функциональность:
- Выбор полей: ID, имя, email, телефон, заметки, каналы связи, баланс, дата создания
- Форматы: CSV (с BOM для Excel) и XLSX
- Учёт текущих фильтров и поиска из списка клиентов
- Сохранение предпочтений между экспортами в session
- Исключение системного клиента из экспорта

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-03 21:12:08 +03:00

996 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Сервис для импорта и экспорта клиентов.
Этот модуль содержит логику импорта/экспорта клиентов в различных форматах (CSV, Excel).
Разделение на отдельный модуль улучшает организацию кода и следует принципам SRP.
"""
import csv
import io
from django.http import HttpResponse
from django.utils import timezone
from ..models import Customer
try:
# Для чтения .xlsx файлов
from openpyxl import load_workbook
except ImportError:
load_workbook = None
try:
import phonenumbers
from phonenumbers import NumberParseException
except ImportError:
phonenumbers = None
NumberParseException = Exception
import re
class CustomerExporter:
"""
Класс для экспорта клиентов в различные форматы (CSV/XLSX).
Поддерживает выбор полей и фильтрацию по ролям.
"""
# Конфигурация доступных полей с метаданными
AVAILABLE_FIELDS = {
'id': {'label': 'ID', 'owner_only': False},
'name': {'label': 'Имя', 'owner_only': False},
'email': {'label': 'Email', 'owner_only': False},
'phone': {'label': 'Телефон', 'owner_only': False},
'notes': {'label': 'Заметки', 'owner_only': False},
'contact_channels': {'label': 'Каналы связи', 'owner_only': False},
'wallet_balance': {'label': 'Баланс кошелька', 'owner_only': True},
'created_at': {'label': 'Дата создания', 'owner_only': False},
}
DEFAULT_FIELDS = ['id', 'name', 'email', 'phone']
@classmethod
def get_available_fields(cls, user):
"""
Получить поля доступные для пользователя на основе роли.
Args:
user: Объект пользователя
Returns:
dict: Словарь доступных полей с метаданными
"""
fields = {}
is_owner = user.is_superuser or user.is_owner
for field_key, field_info in cls.AVAILABLE_FIELDS.items():
if field_info['owner_only'] and not is_owner:
continue
fields[field_key] = field_info
return fields
def __init__(self, queryset, selected_fields, user):
"""
Инициализация экспортера.
Args:
queryset: QuerySet клиентов (уже отфильтрованный)
selected_fields: Список ключей полей для экспорта
user: Текущий пользователь (для проверки прав)
"""
self.queryset = queryset
self.selected_fields = selected_fields
self.user = user
def _get_headers(self):
"""Генерация заголовков на основе выбранных полей"""
return [
self.AVAILABLE_FIELDS[field]['label']
for field in self.selected_fields
]
def _get_field_value(self, customer, field_key):
"""
Получить отформатированное значение для конкретного поля.
Args:
customer: Объект Customer
field_key: Ключ поля
Returns:
str: Форматированное значение
"""
if field_key == 'id':
return customer.id
elif field_key == 'name':
return customer.name or ''
elif field_key == 'email':
return customer.email or ''
elif field_key == 'phone':
return str(customer.phone) if customer.phone else ''
elif field_key == 'notes':
return customer.notes or ''
elif field_key == 'contact_channels':
return self._get_contact_channels_display(customer)
elif field_key == 'wallet_balance':
# Двойная защита: проверка роли
if not (self.user.is_superuser or self.user.is_owner):
return 'N/A'
return str(customer.wallet_balance)
elif field_key == 'created_at':
return customer.created_at.strftime('%Y-%m-%d %H:%M:%S')
return ''
def _get_contact_channels_display(self, customer):
"""
Форматирование каналов связи для экспорта.
Объединяет все каналы в одну строку с переводами строк.
Args:
customer: Объект Customer
Returns:
str: Форматированная строка каналов связи
"""
channels = customer.contact_channels.all()
if not channels:
return ''
from ..models import ContactChannel
lines = []
for channel in channels:
channel_name = dict(ContactChannel.CHANNEL_TYPES).get(
channel.channel_type,
channel.channel_type
)
lines.append(f"{channel_name}: {channel.value}")
return '\n'.join(lines)
def export_to_csv(self):
"""
Экспорт в CSV с выбранными полями.
Returns:
HttpResponse: CSV файл для скачивания
"""
response = HttpResponse(content_type='text/csv; charset=utf-8')
timestamp = timezone.now().strftime("%Y%m%d_%H%M%S")
response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.csv"'
# BOM для корректного открытия в Excel
response.write('\ufeff')
writer = csv.writer(response)
# Динамические заголовки
writer.writerow(self._get_headers())
# Данные
for customer in self.queryset:
row = [
self._get_field_value(customer, field)
for field in self.selected_fields
]
writer.writerow(row)
return response
def export_to_xlsx(self):
"""
Экспорт в XLSX используя openpyxl.
Returns:
HttpResponse: XLSX файл для скачивания
"""
try:
from openpyxl import Workbook
except ImportError:
# Fallback to CSV если openpyxl не установлен
return self.export_to_csv()
wb = Workbook()
ws = wb.active
ws.title = "Клиенты"
# Заголовки
ws.append(self._get_headers())
# Данные
for customer in self.queryset:
row = [
self._get_field_value(customer, field)
for field in self.selected_fields
]
ws.append(row)
# Автоподстройка ширины столбцов
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(cell.value)
except:
pass
adjusted_width = min(max_length + 2, 50) # Максимум 50
ws.column_dimensions[column_letter].width = adjusted_width
# Сохранение в BytesIO
output = io.BytesIO()
wb.save(output)
output.seek(0)
# Создание response
response = HttpResponse(
output.read(),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
timestamp = timezone.now().strftime("%Y%m%d_%H%M%S")
response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.xlsx"'
return response
class CustomerImporter:
"""
Простой универсальный импорт клиентов из CSV/XLSX.
Поддерживаемые форматы:
- CSV (UTF-8, заголовок в первой строке)
- XLSX (первая строка — заголовки)
Алгоритм:
- Читаем файл → получаем headers и list[dict] строк
- По headers строим маппинг на поля Customer: name/email/phone/notes
- Для каждой строки:
- пропускаем полностью пустые строки
- ищем клиента по email, потом по телефону
- если update_existing=False и клиент найден → пропуск
- если update_existing=True → обновляем найденного
- если не найден → создаём нового
- Вся валидация/нормализация делается через Customer.full_clean()
"""
FIELD_ALIASES = {
"name": ["name", "имя", "фио", "клиент", "фамилияимя"],
"email": ["email", "e-mail", "e_mail", "почта", "элпочта", "электроннаяпочта"],
"phone": ["phone", "телефон", "тел", "моб", "мобильный", "номер"],
"notes": ["notes", "заметки", "комментарий", "comment", "примечание"],
}
def __init__(self):
self.errors = []
self.success_count = 0
self.update_count = 0
self.enriched_count = 0 # Дополнены пустые поля
self.conflicts_resolved = 0 # Альтернативные контакты через ContactChannel
self.skip_count = 0
# Отслеживание уже обработанных email/phone для дедупликации внутри файла
self.processed_emails = set()
self.processed_phones = set()
# Отдельный список для реальных ошибок (не дублей из БД)
self.real_errors = []
# Сохраняем исходные данные для генерации error-файла
self.original_headers = []
self.original_rows = []
self.file_format = None
def import_from_file(self, file, update_existing: bool = False) -> dict:
"""
Импорт клиентов из загруженного файла.
Args:
file: UploadedFile
update_existing: обновлять ли существующих клиентов (по email/телефону)
Returns:
dict: результат импорта
"""
file_format = self._detect_format(file)
if file_format is None:
return {
"success": False,
"message": "Неподдерживаемый формат файла. Ожидается CSV или XLSX.",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": "Unsupported file type"}],
}
if file_format == "xlsx" and load_workbook is None:
return {
"success": False,
"message": "Для импорта XLSX необходим пакет openpyxl. Установите его и повторите попытку.",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": "openpyxl is not installed"}],
}
# Сохраняем формат файла
self.file_format = file_format
try:
if file_format == "csv":
headers, rows = self._read_csv(file)
else:
headers, rows = self._read_xlsx(file)
# Сохраняем исходные данные
self.original_headers = headers
self.original_rows = rows
except Exception as exc:
return {
"success": False,
"message": f"Ошибка чтения файла: {exc}",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": str(exc)}],
}
if not headers:
return {
"success": False,
"message": "В файле не найдены заголовки.",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": "Empty header row"}],
}
mapping = self._build_mapping(headers)
if not any(field in mapping for field in ("name", "email", "phone")):
return {
"success": False,
"message": "Не удалось сопоставить обязательные поля (имя, email или телефон).",
"created": 0,
"updated": 0,
"skipped": len(rows),
"errors": [
{
"row": None,
"reason": "No required fields (name/email/phone) mapped from headers",
}
],
}
for index, row in enumerate(rows, start=2): # первая строка — заголовки
self._process_row(index, row, mapping, update_existing)
total_errors = len(self.errors)
duplicate_count = len([e for e in self.errors if e.get('is_duplicate', False)])
real_error_count = len(self.real_errors)
success = (self.success_count + self.update_count) > 0
if success and total_errors == 0:
message = "Импорт завершён успешно."
elif success and total_errors > 0:
message = "Импорт завершён с ошибками."
else:
message = "Не удалось импортировать данные."
return {
"success": success,
"message": message,
"created": self.success_count,
"updated": self.update_count,
"enriched": self.enriched_count,
"conflicts_resolved": self.conflicts_resolved,
"skipped": self.skip_count,
"errors": self.errors,
"real_errors": self.real_errors, # Только невалидные данные, без дублей из БД
"duplicate_count": duplicate_count,
"real_error_count": real_error_count,
}
def _detect_format(self, file) -> str | None:
name = (getattr(file, "name", None) or "").lower()
if name.endswith(".csv"):
return "csv"
if name.endswith(".xlsx") or name.endswith(".xls"):
return "xlsx"
return None
def _read_csv(self, file):
file.seek(0)
raw = file.read()
if isinstance(raw, bytes):
text = raw.decode("utf-8-sig")
else:
text = raw
f = io.StringIO(text)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
rows = list(reader)
return headers, rows
def _read_xlsx(self, file):
file.seek(0)
wb = load_workbook(file, read_only=True, data_only=True)
ws = wb.active
headers = []
rows = []
first_row = True
for row in ws.iter_rows(values_only=True):
if first_row:
headers = [str(v).strip() if v is not None else "" for v in row]
first_row = False
continue
if not any(row):
continue
row_dict = {}
for idx, value in enumerate(row):
if idx < len(headers):
header = headers[idx] or f"col_{idx}"
row_dict[header] = value
rows.append(row_dict)
return headers, rows
def _normalize_header(self, header: str) -> str:
if header is None:
return ""
cleaned = "".join(ch for ch in str(header).strip().lower() if ch.isalnum())
return cleaned
def _build_mapping(self, headers):
mapping = {}
normalized_aliases = {
field: {self._normalize_header(a) for a in aliases}
for field, aliases in self.FIELD_ALIASES.items()
}
for header in headers:
norm = self._normalize_header(header)
if not norm:
continue
for field, alias_set in normalized_aliases.items():
if norm in alias_set and field not in mapping:
mapping[field] = header
break
return mapping
def _clean_value(self, value):
if value is None:
return ""
return str(value).strip()
def _normalize_phone(self, raw_phone: str) -> str | None:
"""
Умная нормализация телефона с попыткой различных регионов.
Стратегии:
1. Если начинается с '+' — парсим как международный
2. Если начинается с '8' и 11 цифр — пробуем BY, потом RU
3. Пробуем распространённые регионы: BY, RU, PL, DE, US
4. Если всё не удалось — возвращаем None
Returns:
Нормализованный телефон в E.164 формате или None
"""
if not raw_phone or not phonenumbers:
return None
# Убираем все символы кроме цифр и +
cleaned = re.sub(r'[^\d+]', '', str(raw_phone))
if not cleaned:
return None
# Проверка длины
if len(cleaned) < 10 or len(cleaned) > 15:
return None
# Умное добавление кода страны
if not cleaned.startswith('+'):
# Белорусские номера (375...)
if cleaned.startswith('375'):
cleaned = '+' + cleaned
# Российские номера (7XXXXXXXXXX)
elif cleaned.startswith('7') and len(cleaned) == 11:
cleaned = '+' + cleaned
# Украинские номера (380...)
elif cleaned.startswith('380'):
cleaned = '+' + cleaned
# Старый формат 8XXXXXXXXXX -> +7XXXXXXXXXX
elif cleaned.startswith('8') and len(cleaned) == 11:
cleaned = '+7' + cleaned[1:]
# 9 цифр - предполагаем Беларусь
elif len(cleaned) == 9:
cleaned = '+375' + cleaned
# Стратегия 1: Международный формат (начинается с +)
if cleaned.startswith('+'):
try:
parsed = phonenumbers.parse(cleaned, None)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
pass
# Стратегия 2: Пробуем распространённые регионы
for region in ['BY', 'RU', 'UA', 'PL', 'DE']:
try:
test_number = cleaned if cleaned.startswith('+') else f'+{cleaned}'
parsed = phonenumbers.parse(test_number, region)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
continue
try:
parsed = phonenumbers.parse(cleaned, region)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
continue
return None
def _preprocess_email(self, email: str) -> str | None:
"""
Предобработка email перед валидацией.
Исправляет типичные ошибки и опечатки.
Args:
email: Исходный email
Returns:
str | None: Очищенный email или None если пустой
"""
if not email or not isinstance(email, str):
return None
email = email.strip()
if not email:
return None
# Удаляем лишний текст в конце (фото, комментарии)
email = re.sub(r'\s+фото.*$', '', email, flags=re.IGNORECASE)
email = re.sub(r'\s+ф\d+$', '', email)
# Убираем пробелы внутри email
email = email.replace(' ', '')
# Двойные @@ -> одинарный @
email = email.replace('@@', '@')
# Исправляем пробелы вокруг @
email = re.sub(r'\s*@\s*', '@', email)
# Исправляем типичные опечатки в доменах
email = email.replace('.ry', '.ru')
email = email.replace('mail ru', 'mail.ru')
email = email.replace('ya ru', 'ya.ru')
email = email.replace('tut by', 'tut.by')
email = email.replace('gmail.co.m', 'gmail.com')
email = email.replace('/com', '.com')
# Если нет @, но есть gmail/mail/etc - пытаемся добавить
if '@' not in email:
if 'gmail' in email.lower():
email = re.sub(r'gmail', '@gmail', email, flags=re.IGNORECASE)
elif 'mail.ru' in email.lower():
email = re.sub(r'mail\.ru', '@mail.ru', email, flags=re.IGNORECASE)
elif 'yandex' in email.lower():
email = re.sub(r'yandex', '@yandex', email, flags=re.IGNORECASE)
# Исправляем домены без точки
if '@' in email:
parts = email.split('@')
if len(parts) == 2:
local, domain = parts
domain_lower = domain.lower()
# Если домен слишком короткий - невалидный
if len(domain) < 4:
return None
# Если нет точки в домене - пытаемся исправить
if '.' not in domain:
domain_map = {
'gmail': 'gmail.com',
'mailru': 'mail.ru',
'yaru': 'ya.ru',
'tutby': 'tut.by',
'yandexru': 'yandex.ru',
}
domain = domain_map.get(domain_lower, None)
if not domain:
# Неизвестный домен без точки
return None
email = f"{local}@{domain}"
return email.lower() if email else None
def _is_valid_email(self, email: str) -> bool:
"""
Проверка валидности email через Django EmailValidator.
Args:
email: Email для проверки
Returns:
bool: True если email валиден
"""
from django.core.validators import validate_email
from django.core.exceptions import ValidationError
if not email:
return False
try:
validate_email(email)
return True
except ValidationError:
return False
def _get_better_name(self, existing_name: str, new_name: str) -> str:
"""
Выбирает более полное/информативное имя.
Приоритеты:
1. Если одно пустое - возвращаем непустое
2. Если новое длиннее И содержит старое - берём новое
3. Если в новом есть пробелы (ФИО), а в старом нет - берём новое
4. Иначе оставляем старое
Returns:
str: Лучшее имя и флаг конфликта
"""
if not existing_name:
return new_name
if not new_name:
return existing_name
# Если новое имя длиннее и содержит старое - используем новое
if len(new_name) > len(existing_name) and existing_name.lower() in new_name.lower():
return new_name
# Если в новом есть пробелы (ФИО), а в старом нет - берём новое
if ' ' in new_name and ' ' not in existing_name:
return new_name
# Иначе оставляем существующее
return existing_name
def _create_alternative_contact(self, customer, channel_type: str, value: str, source: str = 'Импорт'):
"""
Создаёт альтернативный контакт через ContactChannel.
Args:
customer: Объект Customer
channel_type: Тип канала ('email' или 'phone')
value: Значение контакта
source: Источник (для notes)
Returns:
bool: True если создан, False если уже существует
"""
from ..models import ContactChannel
# Проверяем, не существует ли уже такой контакт
exists = ContactChannel.objects.filter(
channel_type=channel_type,
value=value
).exists()
if exists:
return False
try:
ContactChannel.objects.create(
customer=customer,
channel_type=channel_type,
value=value,
is_primary=False,
notes=f'Из {source}'
)
return True
except Exception:
# Если не удалось создать - пропускаем
return False
def _process_row(self, row_number: int, row: dict, mapping: dict, update_existing: bool):
name = self._clean_value(row.get(mapping.get("name", ""), ""))
email = self._clean_value(row.get(mapping.get("email", ""), ""))
phone_raw = self._clean_value(row.get(mapping.get("phone", ""), ""))
notes = self._clean_value(row.get(mapping.get("notes", ""), ""))
if not any([name, email, phone_raw, notes]):
self.skip_count += 1
return
# Нормализуем email (с предобработкой)
if email:
email = self._preprocess_email(email)
# Проверка на дубли внутри файла
if email and email in self.processed_emails:
self.skip_count += 1
self.errors.append(
{
"row": row_number,
"email": email,
"phone": phone_raw or None,
"reason": "Дубль email внутри файла (уже обработан в предыдущей строке).",
"is_duplicate": True, # Дубликат внутри файла
}
)
return
# Умная нормализация телефона
phone = None
phone_normalization_failed = False
if phone_raw:
phone = self._normalize_phone(phone_raw)
if not phone:
phone_normalization_failed = True
else:
# Проверка на дубли внутри файла
if phone in self.processed_phones:
self.skip_count += 1
self.errors.append(
{
"row": row_number,
"email": email or None,
"phone": phone_raw,
"reason": "Дубль телефона внутри файла (уже обработан в предыдущей строке).",
"is_duplicate": True, # Дубликат внутри файла
}
)
return
# Если телефон не удалось нормализовать, сохраняем в notes
if phone_normalization_failed:
note_addition = f"Исходный телефон из импорта (невалидный): {phone_raw}"
if notes:
notes = f"{notes}\n{note_addition}"
else:
notes = note_addition
# Проверка: есть ли хотя бы ОДИН валидный контакт
has_valid_email = self._is_valid_email(email)
has_valid_phone = phone is not None # phone уже нормализован или None
if not has_valid_email and not has_valid_phone:
# Нет валидных контактов - в ошибки
self.skip_count += 1
error_record = {
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": "Требуется хотя бы один: email или телефон",
"is_duplicate": False,
}
self.errors.append(error_record)
self.real_errors.append(error_record)
return
# Пытаемся найти существующего клиента
existing = None
if email:
existing = Customer.objects.filter(email=email).first()
if existing is None and phone:
existing = Customer.objects.filter(phone=phone).first()
if existing and not update_existing:
self.skip_count += 1
self.errors.append(
{
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": "Клиент с таким email/телефоном уже существует, обновление отключено.",
"is_duplicate": True, # Помечаем как дубликат из БД
}
)
return
if existing and update_existing:
# Умное слияние (smart merge)
was_enriched = False # Флаг дополнения пустых полей
# 1. Имя - выбираем лучшее
if name:
better_name = self._get_better_name(existing.name or '', name)
if not existing.name:
was_enriched = True
existing.name = better_name
elif better_name != existing.name:
# Конфликт имен - добавляем в notes
if name != existing.name and name.lower() not in existing.name.lower():
alt_name_note = f"Также известен как: {name}"
if existing.notes:
if alt_name_note not in existing.notes:
existing.notes = f"{existing.notes}\n{alt_name_note}"
else:
existing.notes = alt_name_note
existing.name = better_name
# 2. Email - дополняем или создаём ContactChannel
if email:
if not existing.email:
# Пустое поле - дополняем
was_enriched = True
existing.email = email
elif existing.email != email:
# Конфликт - создаём альтернативный контакт
if self._create_alternative_contact(existing, 'email', email):
self.conflicts_resolved += 1
# 3. Телефон - дополняем или создаём ContactChannel
if phone:
if not existing.phone:
# Пустое поле - дополняем
was_enriched = True
existing.phone = phone
elif str(existing.phone) != phone:
# Конфликт - создаём альтернативный контакт
if self._create_alternative_contact(existing, 'phone', phone):
self.conflicts_resolved += 1
# 4. Заметки - ВСЕГДА дописываем
if notes:
if existing.notes:
existing.notes = f"{existing.notes}\n{notes}"
else:
existing.notes = notes
try:
existing.full_clean()
existing.save()
# Счётчики
if was_enriched:
self.enriched_count += 1
else:
self.update_count += 1
if email:
self.processed_emails.add(email)
if phone:
self.processed_phones.add(phone)
except Exception as exc:
self.skip_count += 1
error_record = {
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": str(exc),
"is_duplicate": False,
}
self.errors.append(error_record)
self.real_errors.append(error_record) # Реальная ошибка валидации
return
# Создание нового клиента
customer = Customer(
name=name or "",
email=email or None,
phone=phone or None, # Если не удалось нормализовать — будет None
notes=notes or "",
)
try:
customer.full_clean()
customer.save()
self.success_count += 1
if email:
self.processed_emails.add(email)
if phone:
self.processed_phones.add(phone)
except Exception as exc:
self.skip_count += 1
error_record = {
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": str(exc),
"is_duplicate": False,
}
self.errors.append(error_record)
self.real_errors.append(error_record) # Реальная ошибка валидации
def generate_error_file(self) -> tuple[bytes, str] | None:
"""
Генерирует файл с ошибочными строками (только real_errors).
Возвращает тот же формат, что был загружен (CSV или XLSX).
Добавляет колонку "Ошибка" с описанием проблемы.
Returns:
tuple[bytes, str]: (file_content, filename) или None если нет ошибок
"""
if not self.real_errors or not self.original_headers:
return None
# Создаём mapping row_number -> error
error_map = {err['row']: err for err in self.real_errors if err.get('row')}
if not error_map:
return None
# Собираем ошибочные строки
error_rows = []
for index, row in enumerate(self.original_rows, start=2): # start=2 т.к. первая строка - заголовки
if index in error_map:
error_info = error_map[index]
# Добавляем исходную строку + колонку с ошибкой
row_with_error = dict(row) # копируем
row_with_error['Ошибка'] = error_info['reason']
error_rows.append(row_with_error)
if not error_rows:
return None
# Заголовки + колонка "Ошибка"
headers_with_error = list(self.original_headers) + ['Ошибка']
# Генерируем файл в зависимости от формата
timestamp = timezone.now().strftime("%Y%m%d_%H%M%S")
if self.file_format == 'csv':
return self._generate_csv_error_file(headers_with_error, error_rows, timestamp)
else:
return self._generate_xlsx_error_file(headers_with_error, error_rows, timestamp)
def _generate_csv_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str]:
"""
Генерирует CSV файл с ошибками (с BOM для Excel).
"""
output = io.StringIO()
# BOM для корректного открытия в Excel
output.write('\ufeff')
writer = csv.DictWriter(output, fieldnames=headers, extrasaction='ignore')
writer.writeheader()
writer.writerows(rows)
content = output.getvalue().encode('utf-8')
filename = f'customer_import_errors_{timestamp}.csv'
return content, filename
def _generate_xlsx_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str] | None:
"""
Генерирует XLSX файл с ошибками.
"""
if load_workbook is None:
# Fallback to CSV если openpyxl не установлен
return self._generate_csv_error_file(headers, rows, timestamp)
try:
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
ws.title = "Ошибки импорта"
# Заголовки
ws.append(headers)
# Данные
for row_dict in rows:
row_data = [row_dict.get(h, '') for h in headers]
ws.append(row_data)
# Сохраняем в BytesIO
output = io.BytesIO()
wb.save(output)
output.seek(0)
content = output.read()
filename = f'customer_import_errors_{timestamp}.xlsx'
return content, filename
except Exception:
# Fallback to CSV при любой ошибке
return self._generate_csv_error_file(headers, rows, timestamp)