""" Сервис для импорта и экспорта клиентов. Этот модуль содержит логику импорта/экспорта клиентов в различных форматах (CSV, Excel). Разделение на отдельный модуль улучшает организацию кода и следует принципам SRP. """ import csv import io from django.http import HttpResponse from django.utils import timezone from ..models import Customer try: # Для чтения .xlsx файлов from openpyxl import load_workbook except ImportError: load_workbook = None try: import phonenumbers from phonenumbers import NumberParseException except ImportError: phonenumbers = None NumberParseException = Exception import re class CustomerExporter: """ Класс для экспорта клиентов в различные форматы (CSV/XLSX). Поддерживает выбор полей и фильтрацию по ролям. """ # Конфигурация доступных полей с метаданными AVAILABLE_FIELDS = { 'id': {'label': 'ID', 'owner_only': False}, 'name': {'label': 'Имя', 'owner_only': False}, 'email': {'label': 'Email', 'owner_only': False}, 'phone': {'label': 'Телефон', 'owner_only': False}, 'notes': {'label': 'Заметки', 'owner_only': False}, 'contact_channels': {'label': 'Каналы связи', 'owner_only': False}, 'wallet_balance': {'label': 'Баланс кошелька', 'owner_only': True}, 'created_at': {'label': 'Дата создания', 'owner_only': False}, } DEFAULT_FIELDS = ['id', 'name', 'email', 'phone'] @classmethod def get_available_fields(cls, user): """ Получить поля доступные для пользователя на основе роли. Args: user: Объект пользователя Returns: dict: Словарь доступных полей с метаданными """ fields = {} is_owner = user.is_superuser or user.is_owner for field_key, field_info in cls.AVAILABLE_FIELDS.items(): if field_info['owner_only'] and not is_owner: continue fields[field_key] = field_info return fields def __init__(self, queryset, selected_fields, user): """ Инициализация экспортера. Args: queryset: QuerySet клиентов (уже отфильтрованный) selected_fields: Список ключей полей для экспорта user: Текущий пользователь (для проверки прав) """ self.queryset = queryset self.selected_fields = selected_fields self.user = user def _get_headers(self): """Генерация заголовков на основе выбранных полей""" return [ self.AVAILABLE_FIELDS[field]['label'] for field in self.selected_fields ] def _get_field_value(self, customer, field_key): """ Получить отформатированное значение для конкретного поля. Args: customer: Объект Customer field_key: Ключ поля Returns: str: Форматированное значение """ if field_key == 'id': return customer.id elif field_key == 'name': return customer.name or '' elif field_key == 'email': return customer.email or '' elif field_key == 'phone': return str(customer.phone) if customer.phone else '' elif field_key == 'notes': return customer.notes or '' elif field_key == 'contact_channels': return self._get_contact_channels_display(customer) elif field_key == 'wallet_balance': # Двойная защита: проверка роли if not (self.user.is_superuser or self.user.is_owner): return 'N/A' return str(customer.wallet_balance) elif field_key == 'created_at': return customer.created_at.strftime('%Y-%m-%d %H:%M:%S') return '' def _get_contact_channels_display(self, customer): """ Форматирование каналов связи для экспорта. Объединяет все каналы в одну строку с переводами строк. Args: customer: Объект Customer Returns: str: Форматированная строка каналов связи """ channels = customer.contact_channels.all() if not channels: return '' from ..models import ContactChannel lines = [] for channel in channels: channel_name = dict(ContactChannel.CHANNEL_TYPES).get( channel.channel_type, channel.channel_type ) lines.append(f"{channel_name}: {channel.value}") return '\n'.join(lines) def export_to_csv(self): """ Экспорт в CSV с выбранными полями. Returns: HttpResponse: CSV файл для скачивания """ response = HttpResponse(content_type='text/csv; charset=utf-8') timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.csv"' # BOM для корректного открытия в Excel response.write('\ufeff') writer = csv.writer(response) # Динамические заголовки writer.writerow(self._get_headers()) # Данные for customer in self.queryset: row = [ self._get_field_value(customer, field) for field in self.selected_fields ] writer.writerow(row) return response def export_to_xlsx(self): """ Экспорт в XLSX используя openpyxl. Returns: HttpResponse: XLSX файл для скачивания """ try: from openpyxl import Workbook except ImportError: # Fallback to CSV если openpyxl не установлен return self.export_to_csv() wb = Workbook() ws = wb.active ws.title = "Клиенты" # Заголовки ws.append(self._get_headers()) # Данные for customer in self.queryset: row = [ self._get_field_value(customer, field) for field in self.selected_fields ] ws.append(row) # Автоподстройка ширины столбцов for column in ws.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(cell.value) except: pass adjusted_width = min(max_length + 2, 50) # Максимум 50 ws.column_dimensions[column_letter].width = adjusted_width # Сохранение в BytesIO output = io.BytesIO() wb.save(output) output.seek(0) # Создание response response = HttpResponse( output.read(), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.xlsx"' return response class CustomerImporter: """ Простой универсальный импорт клиентов из CSV/XLSX. Поддерживаемые форматы: - CSV (UTF-8, заголовок в первой строке) - XLSX (первая строка — заголовки) Алгоритм: - Читаем файл → получаем headers и list[dict] строк - По headers строим маппинг на поля Customer: name/email/phone/notes - Для каждой строки: - пропускаем полностью пустые строки - ищем клиента по email, потом по телефону - если update_existing=False и клиент найден → пропуск - если update_existing=True → обновляем найденного - если не найден → создаём нового - Вся валидация/нормализация делается через Customer.full_clean() """ FIELD_ALIASES = { "name": ["name", "имя", "фио", "клиент", "фамилияимя"], "email": ["email", "e-mail", "e_mail", "почта", "элпочта", "электроннаяпочта"], "phone": ["phone", "телефон", "тел", "моб", "мобильный", "номер"], "notes": ["notes", "заметки", "комментарий", "comment", "примечание"], } def __init__(self): self.errors = [] self.success_count = 0 self.update_count = 0 self.enriched_count = 0 # Дополнены пустые поля self.conflicts_resolved = 0 # Альтернативные контакты через ContactChannel self.skip_count = 0 # Отслеживание уже обработанных email/phone для дедупликации внутри файла self.processed_emails = set() self.processed_phones = set() # Отдельный список для реальных ошибок (не дублей из БД) self.real_errors = [] # Сохраняем исходные данные для генерации error-файла self.original_headers = [] self.original_rows = [] self.file_format = None def import_from_file(self, file, update_existing: bool = False) -> dict: """ Импорт клиентов из загруженного файла. Args: file: UploadedFile update_existing: обновлять ли существующих клиентов (по email/телефону) Returns: dict: результат импорта """ file_format = self._detect_format(file) if file_format is None: return { "success": False, "message": "Неподдерживаемый формат файла. Ожидается CSV или XLSX.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "Unsupported file type"}], } if file_format == "xlsx" and load_workbook is None: return { "success": False, "message": "Для импорта XLSX необходим пакет openpyxl. Установите его и повторите попытку.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "openpyxl is not installed"}], } # Сохраняем формат файла self.file_format = file_format try: if file_format == "csv": headers, rows = self._read_csv(file) else: headers, rows = self._read_xlsx(file) # Сохраняем исходные данные self.original_headers = headers self.original_rows = rows except Exception as exc: return { "success": False, "message": f"Ошибка чтения файла: {exc}", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": str(exc)}], } if not headers: return { "success": False, "message": "В файле не найдены заголовки.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "Empty header row"}], } mapping = self._build_mapping(headers) if not any(field in mapping for field in ("name", "email", "phone")): return { "success": False, "message": "Не удалось сопоставить обязательные поля (имя, email или телефон).", "created": 0, "updated": 0, "skipped": len(rows), "errors": [ { "row": None, "reason": "No required fields (name/email/phone) mapped from headers", } ], } for index, row in enumerate(rows, start=2): # первая строка — заголовки self._process_row(index, row, mapping, update_existing) total_errors = len(self.errors) duplicate_count = len([e for e in self.errors if e.get('is_duplicate', False)]) real_error_count = len(self.real_errors) success = (self.success_count + self.update_count) > 0 if success and total_errors == 0: message = "Импорт завершён успешно." elif success and total_errors > 0: message = "Импорт завершён с ошибками." else: message = "Не удалось импортировать данные." return { "success": success, "message": message, "created": self.success_count, "updated": self.update_count, "enriched": self.enriched_count, "conflicts_resolved": self.conflicts_resolved, "skipped": self.skip_count, "errors": self.errors, "real_errors": self.real_errors, # Только невалидные данные, без дублей из БД "duplicate_count": duplicate_count, "real_error_count": real_error_count, } def _detect_format(self, file) -> str | None: name = (getattr(file, "name", None) or "").lower() if name.endswith(".csv"): return "csv" if name.endswith(".xlsx") or name.endswith(".xls"): return "xlsx" return None def _read_csv(self, file): file.seek(0) raw = file.read() if isinstance(raw, bytes): text = raw.decode("utf-8-sig") else: text = raw f = io.StringIO(text) reader = csv.DictReader(f) headers = reader.fieldnames or [] rows = list(reader) return headers, rows def _read_xlsx(self, file): file.seek(0) wb = load_workbook(file, read_only=True, data_only=True) ws = wb.active headers = [] rows = [] first_row = True for row in ws.iter_rows(values_only=True): if first_row: headers = [str(v).strip() if v is not None else "" for v in row] first_row = False continue if not any(row): continue row_dict = {} for idx, value in enumerate(row): if idx < len(headers): header = headers[idx] or f"col_{idx}" row_dict[header] = value rows.append(row_dict) return headers, rows def _normalize_header(self, header: str) -> str: if header is None: return "" cleaned = "".join(ch for ch in str(header).strip().lower() if ch.isalnum()) return cleaned def _build_mapping(self, headers): mapping = {} normalized_aliases = { field: {self._normalize_header(a) for a in aliases} for field, aliases in self.FIELD_ALIASES.items() } for header in headers: norm = self._normalize_header(header) if not norm: continue for field, alias_set in normalized_aliases.items(): if norm in alias_set and field not in mapping: mapping[field] = header break return mapping def _clean_value(self, value): if value is None: return "" return str(value).strip() def _normalize_phone(self, raw_phone: str) -> str | None: """ Умная нормализация телефона с попыткой различных регионов. Стратегии: 1. Если начинается с '+' — парсим как международный 2. Если начинается с '8' и 11 цифр — пробуем BY, потом RU 3. Пробуем распространённые регионы: BY, RU, PL, DE, US 4. Если всё не удалось — возвращаем None Returns: Нормализованный телефон в E.164 формате или None """ if not raw_phone or not phonenumbers: return None # Убираем все символы кроме цифр и + cleaned = re.sub(r'[^\d+]', '', str(raw_phone)) if not cleaned: return None # Проверка длины if len(cleaned) < 10 or len(cleaned) > 15: return None # Умное добавление кода страны if not cleaned.startswith('+'): # Белорусские номера (375...) if cleaned.startswith('375'): cleaned = '+' + cleaned # Российские номера (7XXXXXXXXXX) elif cleaned.startswith('7') and len(cleaned) == 11: cleaned = '+' + cleaned # Украинские номера (380...) elif cleaned.startswith('380'): cleaned = '+' + cleaned # Старый формат 8XXXXXXXXXX -> +7XXXXXXXXXX elif cleaned.startswith('8') and len(cleaned) == 11: cleaned = '+7' + cleaned[1:] # 9 цифр - предполагаем Беларусь elif len(cleaned) == 9: cleaned = '+375' + cleaned # Стратегия 1: Международный формат (начинается с +) if cleaned.startswith('+'): try: parsed = phonenumbers.parse(cleaned, None) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: pass # Стратегия 2: Пробуем распространённые регионы for region in ['BY', 'RU', 'UA', 'PL', 'DE']: try: test_number = cleaned if cleaned.startswith('+') else f'+{cleaned}' parsed = phonenumbers.parse(test_number, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue try: parsed = phonenumbers.parse(cleaned, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue return None def _preprocess_email(self, email: str) -> str | None: """ Предобработка email перед валидацией. Исправляет типичные ошибки и опечатки. Args: email: Исходный email Returns: str | None: Очищенный email или None если пустой """ if not email or not isinstance(email, str): return None email = email.strip() if not email: return None # Удаляем лишний текст в конце (фото, комментарии) email = re.sub(r'\s+фото.*$', '', email, flags=re.IGNORECASE) email = re.sub(r'\s+ф\d+$', '', email) # Убираем пробелы внутри email email = email.replace(' ', '') # Двойные @@ -> одинарный @ email = email.replace('@@', '@') # Исправляем пробелы вокруг @ email = re.sub(r'\s*@\s*', '@', email) # Исправляем типичные опечатки в доменах email = email.replace('.ry', '.ru') email = email.replace('mail ru', 'mail.ru') email = email.replace('ya ru', 'ya.ru') email = email.replace('tut by', 'tut.by') email = email.replace('gmail.co.m', 'gmail.com') email = email.replace('/com', '.com') # Если нет @, но есть gmail/mail/etc - пытаемся добавить if '@' not in email: if 'gmail' in email.lower(): email = re.sub(r'gmail', '@gmail', email, flags=re.IGNORECASE) elif 'mail.ru' in email.lower(): email = re.sub(r'mail\.ru', '@mail.ru', email, flags=re.IGNORECASE) elif 'yandex' in email.lower(): email = re.sub(r'yandex', '@yandex', email, flags=re.IGNORECASE) # Исправляем домены без точки if '@' in email: parts = email.split('@') if len(parts) == 2: local, domain = parts domain_lower = domain.lower() # Если домен слишком короткий - невалидный if len(domain) < 4: return None # Если нет точки в домене - пытаемся исправить if '.' not in domain: domain_map = { 'gmail': 'gmail.com', 'mailru': 'mail.ru', 'yaru': 'ya.ru', 'tutby': 'tut.by', 'yandexru': 'yandex.ru', } domain = domain_map.get(domain_lower, None) if not domain: # Неизвестный домен без точки return None email = f"{local}@{domain}" return email.lower() if email else None def _is_valid_email(self, email: str) -> bool: """ Проверка валидности email через Django EmailValidator. Args: email: Email для проверки Returns: bool: True если email валиден """ from django.core.validators import validate_email from django.core.exceptions import ValidationError if not email: return False try: validate_email(email) return True except ValidationError: return False def _get_better_name(self, existing_name: str, new_name: str) -> str: """ Выбирает более полное/информативное имя. Приоритеты: 1. Если одно пустое - возвращаем непустое 2. Если новое длиннее И содержит старое - берём новое 3. Если в новом есть пробелы (ФИО), а в старом нет - берём новое 4. Иначе оставляем старое Returns: str: Лучшее имя и флаг конфликта """ if not existing_name: return new_name if not new_name: return existing_name # Если новое имя длиннее и содержит старое - используем новое if len(new_name) > len(existing_name) and existing_name.lower() in new_name.lower(): return new_name # Если в новом есть пробелы (ФИО), а в старом нет - берём новое if ' ' in new_name and ' ' not in existing_name: return new_name # Иначе оставляем существующее return existing_name def _create_alternative_contact(self, customer, channel_type: str, value: str, source: str = 'Импорт'): """ Создаёт альтернативный контакт через ContactChannel. Args: customer: Объект Customer channel_type: Тип канала ('email' или 'phone') value: Значение контакта source: Источник (для notes) Returns: bool: True если создан, False если уже существует """ from ..models import ContactChannel # Проверяем, не существует ли уже такой контакт exists = ContactChannel.objects.filter( channel_type=channel_type, value=value ).exists() if exists: return False try: ContactChannel.objects.create( customer=customer, channel_type=channel_type, value=value, is_primary=False, notes=f'Из {source}' ) return True except Exception: # Если не удалось создать - пропускаем return False def _process_row(self, row_number: int, row: dict, mapping: dict, update_existing: bool): name = self._clean_value(row.get(mapping.get("name", ""), "")) email = self._clean_value(row.get(mapping.get("email", ""), "")) phone_raw = self._clean_value(row.get(mapping.get("phone", ""), "")) notes = self._clean_value(row.get(mapping.get("notes", ""), "")) if not any([name, email, phone_raw, notes]): self.skip_count += 1 return # Нормализуем email (с предобработкой) if email: email = self._preprocess_email(email) # Проверка на дубли внутри файла if email and email in self.processed_emails: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email, "phone": phone_raw or None, "reason": "Дубль email внутри файла (уже обработан в предыдущей строке).", "is_duplicate": True, # Дубликат внутри файла } ) return # Умная нормализация телефона phone = None phone_normalization_failed = False if phone_raw: phone = self._normalize_phone(phone_raw) if not phone: phone_normalization_failed = True else: # Проверка на дубли внутри файла if phone in self.processed_phones: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email or None, "phone": phone_raw, "reason": "Дубль телефона внутри файла (уже обработан в предыдущей строке).", "is_duplicate": True, # Дубликат внутри файла } ) return # Если телефон не удалось нормализовать, сохраняем в notes if phone_normalization_failed: note_addition = f"Исходный телефон из импорта (невалидный): {phone_raw}" if notes: notes = f"{notes}\n{note_addition}" else: notes = note_addition # Проверка: есть ли хотя бы ОДИН валидный контакт has_valid_email = self._is_valid_email(email) has_valid_phone = phone is not None # phone уже нормализован или None if not has_valid_email and not has_valid_phone: # Нет валидных контактов - в ошибки self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": "Требуется хотя бы один: email или телефон", "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) return # Пытаемся найти существующего клиента existing = None if email: existing = Customer.objects.filter(email=email).first() if existing is None and phone: existing = Customer.objects.filter(phone=phone).first() if existing and not update_existing: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": "Клиент с таким email/телефоном уже существует, обновление отключено.", "is_duplicate": True, # Помечаем как дубликат из БД } ) return if existing and update_existing: # Умное слияние (smart merge) was_enriched = False # Флаг дополнения пустых полей # 1. Имя - выбираем лучшее if name: better_name = self._get_better_name(existing.name or '', name) if not existing.name: was_enriched = True existing.name = better_name elif better_name != existing.name: # Конфликт имен - добавляем в notes if name != existing.name and name.lower() not in existing.name.lower(): alt_name_note = f"Также известен как: {name}" if existing.notes: if alt_name_note not in existing.notes: existing.notes = f"{existing.notes}\n{alt_name_note}" else: existing.notes = alt_name_note existing.name = better_name # 2. Email - дополняем или создаём ContactChannel if email: if not existing.email: # Пустое поле - дополняем was_enriched = True existing.email = email elif existing.email != email: # Конфликт - создаём альтернативный контакт if self._create_alternative_contact(existing, 'email', email): self.conflicts_resolved += 1 # 3. Телефон - дополняем или создаём ContactChannel if phone: if not existing.phone: # Пустое поле - дополняем was_enriched = True existing.phone = phone elif str(existing.phone) != phone: # Конфликт - создаём альтернативный контакт if self._create_alternative_contact(existing, 'phone', phone): self.conflicts_resolved += 1 # 4. Заметки - ВСЕГДА дописываем if notes: if existing.notes: existing.notes = f"{existing.notes}\n{notes}" else: existing.notes = notes try: existing.full_clean() existing.save() # Счётчики if was_enriched: self.enriched_count += 1 else: self.update_count += 1 if email: self.processed_emails.add(email) if phone: self.processed_phones.add(phone) except Exception as exc: self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": str(exc), "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) # Реальная ошибка валидации return # Создание нового клиента customer = Customer( name=name or "", email=email or None, phone=phone or None, # Если не удалось нормализовать — будет None notes=notes or "", ) try: customer.full_clean() customer.save() self.success_count += 1 if email: self.processed_emails.add(email) if phone: self.processed_phones.add(phone) except Exception as exc: self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": str(exc), "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) # Реальная ошибка валидации def generate_error_file(self) -> tuple[bytes, str] | None: """ Генерирует файл с ошибочными строками (только real_errors). Возвращает тот же формат, что был загружен (CSV или XLSX). Добавляет колонку "Ошибка" с описанием проблемы. Returns: tuple[bytes, str]: (file_content, filename) или None если нет ошибок """ if not self.real_errors or not self.original_headers: return None # Создаём mapping row_number -> error error_map = {err['row']: err for err in self.real_errors if err.get('row')} if not error_map: return None # Собираем ошибочные строки error_rows = [] for index, row in enumerate(self.original_rows, start=2): # start=2 т.к. первая строка - заголовки if index in error_map: error_info = error_map[index] # Добавляем исходную строку + колонку с ошибкой row_with_error = dict(row) # копируем row_with_error['Ошибка'] = error_info['reason'] error_rows.append(row_with_error) if not error_rows: return None # Заголовки + колонка "Ошибка" headers_with_error = list(self.original_headers) + ['Ошибка'] # Генерируем файл в зависимости от формата timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") if self.file_format == 'csv': return self._generate_csv_error_file(headers_with_error, error_rows, timestamp) else: return self._generate_xlsx_error_file(headers_with_error, error_rows, timestamp) def _generate_csv_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str]: """ Генерирует CSV файл с ошибками (с BOM для Excel). """ output = io.StringIO() # BOM для корректного открытия в Excel output.write('\ufeff') writer = csv.DictWriter(output, fieldnames=headers, extrasaction='ignore') writer.writeheader() writer.writerows(rows) content = output.getvalue().encode('utf-8') filename = f'customer_import_errors_{timestamp}.csv' return content, filename def _generate_xlsx_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str] | None: """ Генерирует XLSX файл с ошибками. """ if load_workbook is None: # Fallback to CSV если openpyxl не установлен return self._generate_csv_error_file(headers, rows, timestamp) try: from openpyxl import Workbook wb = Workbook() ws = wb.active ws.title = "Ошибки импорта" # Заголовки ws.append(headers) # Данные for row_dict in rows: row_data = [row_dict.get(h, '') for h in headers] ws.append(row_data) # Сохраняем в BytesIO output = io.BytesIO() wb.save(output) output.seek(0) content = output.read() filename = f'customer_import_errors_{timestamp}.xlsx' return content, filename except Exception: # Fallback to CSV при любой ошибке return self._generate_csv_error_file(headers, rows, timestamp)