""" Сервис для импорта и экспорта клиентов. Этот модуль содержит логику импорта/экспорта клиентов в различных форматах (CSV, Excel). Разделение на отдельный модуль улучшает организацию кода и следует принципам SRP. """ import csv import io from django.http import HttpResponse from django.utils import timezone from ..models import Customer try: # Для чтения .xlsx файлов from openpyxl import load_workbook except ImportError: load_workbook = None try: import phonenumbers from phonenumbers import NumberParseException except ImportError: phonenumbers = None NumberParseException = Exception class CustomerExporter: """ Класс для экспорта клиентов в различные форматы. """ @staticmethod def export_to_csv(): """ Экспортирует всех клиентов (кроме системного) в CSV файл. Поля экспорта: - ID - Имя - Email - Телефон - Дата создания Примечание: Баланс кошелька НЕ экспортируется (требование безопасности). Returns: HttpResponse: HTTP ответ с CSV файлом для скачивания """ # Создаём HTTP ответ с CSV файлом response = HttpResponse(content_type='text/csv; charset=utf-8') timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.csv"' # Добавляем BOM для корректного открытия в Excel response.write('\ufeff') writer = csv.writer(response) # Заголовки writer.writerow([ 'ID', 'Имя', 'Email', 'Телефон', 'Дата создания', ]) # Данные (исключаем системного клиента) customers = Customer.objects.filter(is_system_customer=False).order_by('-created_at') for customer in customers: writer.writerow([ customer.id, customer.name or '', customer.email or '', str(customer.phone) if customer.phone else '', customer.created_at.strftime('%Y-%m-%d %H:%M:%S'), ]) return response class CustomerImporter: """ Простой универсальный импорт клиентов из CSV/XLSX. Поддерживаемые форматы: - CSV (UTF-8, заголовок в первой строке) - XLSX (первая строка — заголовки) Алгоритм: - Читаем файл → получаем headers и list[dict] строк - По headers строим маппинг на поля Customer: name/email/phone/notes - Для каждой строки: - пропускаем полностью пустые строки - ищем клиента по email, потом по телефону - если update_existing=False и клиент найден → пропуск - если update_existing=True → обновляем найденного - если не найден → создаём нового - Вся валидация/нормализация делается через Customer.full_clean() """ FIELD_ALIASES = { "name": ["name", "имя", "фио", "клиент", "фамилияимя"], "email": ["email", "e-mail", "e_mail", "почта", "элпочта", "электроннаяпочта"], "phone": ["phone", "телефон", "тел", "моб", "мобильный", "номер"], "notes": ["notes", "заметки", "комментарий", "comment", "примечание"], } def __init__(self): self.errors = [] self.success_count = 0 self.update_count = 0 self.enriched_count = 0 # Дополнены пустые поля self.conflicts_resolved = 0 # Альтернативные контакты через ContactChannel self.skip_count = 0 # Отслеживание уже обработанных email/phone для дедупликации внутри файла self.processed_emails = set() self.processed_phones = set() # Отдельный список для реальных ошибок (не дублей из БД) self.real_errors = [] # Сохраняем исходные данные для генерации error-файла self.original_headers = [] self.original_rows = [] self.file_format = None def import_from_file(self, file, update_existing: bool = False) -> dict: """ Импорт клиентов из загруженного файла. Args: file: UploadedFile update_existing: обновлять ли существующих клиентов (по email/телефону) Returns: dict: результат импорта """ file_format = self._detect_format(file) if file_format is None: return { "success": False, "message": "Неподдерживаемый формат файла. Ожидается CSV или XLSX.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "Unsupported file type"}], } if file_format == "xlsx" and load_workbook is None: return { "success": False, "message": "Для импорта XLSX необходим пакет openpyxl. Установите его и повторите попытку.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "openpyxl is not installed"}], } # Сохраняем формат файла self.file_format = file_format try: if file_format == "csv": headers, rows = self._read_csv(file) else: headers, rows = self._read_xlsx(file) # Сохраняем исходные данные self.original_headers = headers self.original_rows = rows except Exception as exc: return { "success": False, "message": f"Ошибка чтения файла: {exc}", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": str(exc)}], } if not headers: return { "success": False, "message": "В файле не найдены заголовки.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "Empty header row"}], } mapping = self._build_mapping(headers) if not any(field in mapping for field in ("name", "email", "phone")): return { "success": False, "message": "Не удалось сопоставить обязательные поля (имя, email или телефон).", "created": 0, "updated": 0, "skipped": len(rows), "errors": [ { "row": None, "reason": "No required fields (name/email/phone) mapped from headers", } ], } for index, row in enumerate(rows, start=2): # первая строка — заголовки self._process_row(index, row, mapping, update_existing) total_errors = len(self.errors) duplicate_count = len([e for e in self.errors if e.get('is_duplicate', False)]) real_error_count = len(self.real_errors) success = (self.success_count + self.update_count) > 0 if success and total_errors == 0: message = "Импорт завершён успешно." elif success and total_errors > 0: message = "Импорт завершён с ошибками." else: message = "Не удалось импортировать данные." return { "success": success, "message": message, "created": self.success_count, "updated": self.update_count, "enriched": self.enriched_count, "conflicts_resolved": self.conflicts_resolved, "skipped": self.skip_count, "errors": self.errors, "real_errors": self.real_errors, # Только невалидные данные, без дублей из БД "duplicate_count": duplicate_count, "real_error_count": real_error_count, } def _detect_format(self, file) -> str | None: name = (getattr(file, "name", None) or "").lower() if name.endswith(".csv"): return "csv" if name.endswith(".xlsx") or name.endswith(".xls"): return "xlsx" return None def _read_csv(self, file): file.seek(0) raw = file.read() if isinstance(raw, bytes): text = raw.decode("utf-8-sig") else: text = raw f = io.StringIO(text) reader = csv.DictReader(f) headers = reader.fieldnames or [] rows = list(reader) return headers, rows def _read_xlsx(self, file): file.seek(0) wb = load_workbook(file, read_only=True, data_only=True) ws = wb.active headers = [] rows = [] first_row = True for row in ws.iter_rows(values_only=True): if first_row: headers = [str(v).strip() if v is not None else "" for v in row] first_row = False continue if not any(row): continue row_dict = {} for idx, value in enumerate(row): if idx < len(headers): header = headers[idx] or f"col_{idx}" row_dict[header] = value rows.append(row_dict) return headers, rows def _normalize_header(self, header: str) -> str: if header is None: return "" cleaned = "".join(ch for ch in str(header).strip().lower() if ch.isalnum()) return cleaned def _build_mapping(self, headers): mapping = {} normalized_aliases = { field: {self._normalize_header(a) for a in aliases} for field, aliases in self.FIELD_ALIASES.items() } for header in headers: norm = self._normalize_header(header) if not norm: continue for field, alias_set in normalized_aliases.items(): if norm in alias_set and field not in mapping: mapping[field] = header break return mapping def _clean_value(self, value): if value is None: return "" return str(value).strip() def _normalize_phone(self, raw_phone: str) -> str | None: """ Умная нормализация телефона с попыткой различных регионов. Стратегии: 1. Если начинается с '+' — парсим как международный 2. Если начинается с '8' и 11 цифр — пробуем BY, потом RU 3. Пробуем распространённые регионы: BY, RU, PL, DE, US 4. Если всё не удалось — возвращаем None Returns: Нормализованный телефон в E.164 формате или None """ if not raw_phone or not phonenumbers: return None # Убираем все символы кроме цифр и + cleaned = ''.join(c for c in raw_phone if c.isdigit() or c == '+') if not cleaned: return None # Стратегия 1: Международный формат (начинается с +) if cleaned.startswith('+'): try: parsed = phonenumbers.parse(cleaned, None) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: pass # Стратегия 2: Формат '8XXXXXXXXXX' (11 цифр) if cleaned.startswith('8') and len(cleaned) == 11: for region in ['BY', 'RU']: try: parsed = phonenumbers.parse(cleaned, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue # Стратегия 3: Пробуем распространённые регионы for region in ['BY', 'RU', 'PL', 'DE', 'US']: try: # Добавляем '+' если его нет test_number = cleaned if cleaned.startswith('+') else f'+{cleaned}' parsed = phonenumbers.parse(test_number, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue # Пробуем без '+' try: parsed = phonenumbers.parse(cleaned, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue return None def _is_valid_email(self, email: str) -> bool: """ Проверка валидности email через Django EmailValidator. Args: email: Email для проверки Returns: bool: True если email валиден """ from django.core.validators import validate_email from django.core.exceptions import ValidationError if not email: return False try: validate_email(email) return True except ValidationError: return False def _get_better_name(self, existing_name: str, new_name: str) -> str: """ Выбирает более полное/информативное имя. Приоритеты: 1. Если одно пустое - возвращаем непустое 2. Если новое длиннее И содержит старое - берём новое 3. Если в новом есть пробелы (ФИО), а в старом нет - берём новое 4. Иначе оставляем старое Returns: str: Лучшее имя и флаг конфликта """ if not existing_name: return new_name if not new_name: return existing_name # Если новое имя длиннее и содержит старое - используем новое if len(new_name) > len(existing_name) and existing_name.lower() in new_name.lower(): return new_name # Если в новом есть пробелы (ФИО), а в старом нет - берём новое if ' ' in new_name and ' ' not in existing_name: return new_name # Иначе оставляем существующее return existing_name def _create_alternative_contact(self, customer, channel_type: str, value: str, source: str = 'Импорт'): """ Создаёт альтернативный контакт через ContactChannel. Args: customer: Объект Customer channel_type: Тип канала ('email' или 'phone') value: Значение контакта source: Источник (для notes) Returns: bool: True если создан, False если уже существует """ from ..models import ContactChannel # Проверяем, не существует ли уже такой контакт exists = ContactChannel.objects.filter( channel_type=channel_type, value=value ).exists() if exists: return False try: ContactChannel.objects.create( customer=customer, channel_type=channel_type, value=value, is_primary=False, notes=f'Из {source}' ) return True except Exception: # Если не удалось создать - пропускаем return False def _process_row(self, row_number: int, row: dict, mapping: dict, update_existing: bool): name = self._clean_value(row.get(mapping.get("name", ""), "")) email = self._clean_value(row.get(mapping.get("email", ""), "")) phone_raw = self._clean_value(row.get(mapping.get("phone", ""), "")) notes = self._clean_value(row.get(mapping.get("notes", ""), "")) if not any([name, email, phone_raw, notes]): self.skip_count += 1 return # Нормализуем email if email: email = email.lower().strip() # Проверка на дубли внутри файла if email in self.processed_emails: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email, "phone": phone_raw or None, "reason": "Дубль email внутри файла (уже обработан в предыдущей строке).", "is_duplicate": True, # Дубликат внутри файла } ) return # Умная нормализация телефона phone = None phone_normalization_failed = False if phone_raw: phone = self._normalize_phone(phone_raw) if not phone: phone_normalization_failed = True else: # Проверка на дубли внутри файла if phone in self.processed_phones: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email or None, "phone": phone_raw, "reason": "Дубль телефона внутри файла (уже обработан в предыдущей строке).", "is_duplicate": True, # Дубликат внутри файла } ) return # Если телефон не удалось нормализовать, сохраняем в notes if phone_normalization_failed: note_addition = f"Исходный телефон из импорта (невалидный): {phone_raw}" if notes: notes = f"{notes}\n{note_addition}" else: notes = note_addition # Проверка: есть ли хотя бы ОДИН валидный контакт has_valid_email = self._is_valid_email(email) has_valid_phone = phone is not None # phone уже нормализован или None if not has_valid_email and not has_valid_phone: # Нет валидных контактов - в ошибки self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": "Требуется хотя бы один: email или телефон", "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) return # Пытаемся найти существующего клиента existing = None if email: existing = Customer.objects.filter(email=email).first() if existing is None and phone: existing = Customer.objects.filter(phone=phone).first() if existing and not update_existing: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": "Клиент с таким email/телефоном уже существует, обновление отключено.", "is_duplicate": True, # Помечаем как дубликат из БД } ) return if existing and update_existing: # Умное слияние (smart merge) was_enriched = False # Флаг дополнения пустых полей # 1. Имя - выбираем лучшее if name: better_name = self._get_better_name(existing.name or '', name) if not existing.name: was_enriched = True existing.name = better_name elif better_name != existing.name: # Конфликт имен - добавляем в notes if name != existing.name and name.lower() not in existing.name.lower(): alt_name_note = f"Также известен как: {name}" if existing.notes: if alt_name_note not in existing.notes: existing.notes = f"{existing.notes}\n{alt_name_note}" else: existing.notes = alt_name_note existing.name = better_name # 2. Email - дополняем или создаём ContactChannel if email: if not existing.email: # Пустое поле - дополняем was_enriched = True existing.email = email elif existing.email != email: # Конфликт - создаём альтернативный контакт if self._create_alternative_contact(existing, 'email', email): self.conflicts_resolved += 1 # 3. Телефон - дополняем или создаём ContactChannel if phone: if not existing.phone: # Пустое поле - дополняем was_enriched = True existing.phone = phone elif str(existing.phone) != phone: # Конфликт - создаём альтернативный контакт if self._create_alternative_contact(existing, 'phone', phone): self.conflicts_resolved += 1 # 4. Заметки - ВСЕГДА дописываем if notes: if existing.notes: existing.notes = f"{existing.notes}\n{notes}" else: existing.notes = notes try: existing.full_clean() existing.save() # Счётчики if was_enriched: self.enriched_count += 1 else: self.update_count += 1 if email: self.processed_emails.add(email) if phone: self.processed_phones.add(phone) except Exception as exc: self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": str(exc), "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) # Реальная ошибка валидации return # Создание нового клиента customer = Customer( name=name or "", email=email or None, phone=phone or None, # Если не удалось нормализовать — будет None notes=notes or "", ) try: customer.full_clean() customer.save() self.success_count += 1 if email: self.processed_emails.add(email) if phone: self.processed_phones.add(phone) except Exception as exc: self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": str(exc), "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) # Реальная ошибка валидации def generate_error_file(self) -> tuple[bytes, str] | None: """ Генерирует файл с ошибочными строками (только real_errors). Возвращает тот же формат, что был загружен (CSV или XLSX). Добавляет колонку "Ошибка" с описанием проблемы. Returns: tuple[bytes, str]: (file_content, filename) или None если нет ошибок """ if not self.real_errors or not self.original_headers: return None # Создаём mapping row_number -> error error_map = {err['row']: err for err in self.real_errors if err.get('row')} if not error_map: return None # Собираем ошибочные строки error_rows = [] for index, row in enumerate(self.original_rows, start=2): # start=2 т.к. первая строка - заголовки if index in error_map: error_info = error_map[index] # Добавляем исходную строку + колонку с ошибкой row_with_error = dict(row) # копируем row_with_error['Ошибка'] = error_info['reason'] error_rows.append(row_with_error) if not error_rows: return None # Заголовки + колонка "Ошибка" headers_with_error = list(self.original_headers) + ['Ошибка'] # Генерируем файл в зависимости от формата timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") if self.file_format == 'csv': return self._generate_csv_error_file(headers_with_error, error_rows, timestamp) else: return self._generate_xlsx_error_file(headers_with_error, error_rows, timestamp) def _generate_csv_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str]: """ Генерирует CSV файл с ошибками (с BOM для Excel). """ output = io.StringIO() # BOM для корректного открытия в Excel output.write('\ufeff') writer = csv.DictWriter(output, fieldnames=headers, extrasaction='ignore') writer.writeheader() writer.writerows(rows) content = output.getvalue().encode('utf-8') filename = f'customer_import_errors_{timestamp}.csv' return content, filename def _generate_xlsx_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str] | None: """ Генерирует XLSX файл с ошибками. """ if load_workbook is None: # Fallback to CSV если openpyxl не установлен return self._generate_csv_error_file(headers, rows, timestamp) try: from openpyxl import Workbook wb = Workbook() ws = wb.active ws.title = "Ошибки импорта" # Заголовки ws.append(headers) # Данные for row_dict in rows: row_data = [row_dict.get(h, '') for h in headers] ws.append(row_data) # Сохраняем в BytesIO output = io.BytesIO() wb.save(output) output.seek(0) content = output.read() filename = f'customer_import_errors_{timestamp}.xlsx' return content, filename except Exception: # Fallback to CSV при любой ошибке return self._generate_csv_error_file(headers, rows, timestamp)