""" Сервис для импорта и экспорта клиентов. Этот модуль содержит логику импорта/экспорта клиентов в различных форматах (CSV, Excel). Разделение на отдельный модуль улучшает организацию кода и следует принципам SRP. """ import csv import io from django.http import HttpResponse from django.utils import timezone from ..models import Customer try: # Для чтения .xlsx файлов from openpyxl import load_workbook except ImportError: load_workbook = None try: import phonenumbers from phonenumbers import NumberParseException except ImportError: phonenumbers = None NumberParseException = Exception class CustomerExporter: """ Класс для экспорта клиентов в различные форматы. """ @staticmethod def export_to_csv(): """ Экспортирует всех клиентов (кроме системного) в CSV файл. Поля экспорта: - ID - Имя - Email - Телефон - Дата создания Примечание: Баланс кошелька НЕ экспортируется (требование безопасности). Returns: HttpResponse: HTTP ответ с CSV файлом для скачивания """ # Создаём HTTP ответ с CSV файлом response = HttpResponse(content_type='text/csv; charset=utf-8') timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.csv"' # Добавляем BOM для корректного открытия в Excel response.write('\ufeff') writer = csv.writer(response) # Заголовки writer.writerow([ 'ID', 'Имя', 'Email', 'Телефон', 'Дата создания', ]) # Данные (исключаем системного клиента) customers = Customer.objects.filter(is_system_customer=False).order_by('-created_at') for customer in customers: writer.writerow([ customer.id, customer.name or '', customer.email or '', str(customer.phone) if customer.phone else '', customer.created_at.strftime('%Y-%m-%d %H:%M:%S'), ]) return response class CustomerImporter: """ Простой универсальный импорт клиентов из CSV/XLSX. Поддерживаемые форматы: - CSV (UTF-8, заголовок в первой строке) - XLSX (первая строка — заголовки) Алгоритм: - Читаем файл → получаем headers и list[dict] строк - По headers строим маппинг на поля Customer: name/email/phone/notes - Для каждой строки: - пропускаем полностью пустые строки - ищем клиента по email, потом по телефону - если update_existing=False и клиент найден → пропуск - если update_existing=True → обновляем найденного - если не найден → создаём нового - Вся валидация/нормализация делается через Customer.full_clean() """ FIELD_ALIASES = { "name": ["name", "имя", "фио", "клиент", "фамилияимя"], "email": ["email", "e-mail", "e_mail", "почта", "элпочта", "электроннаяпочта"], "phone": ["phone", "телефон", "тел", "моб", "мобильный", "номер"], "notes": ["notes", "заметки", "комментарий", "comment", "примечание"], } def __init__(self): self.errors = [] self.success_count = 0 self.update_count = 0 self.skip_count = 0 # Отслеживание уже обработанных email/phone для дедупликации внутри файла self.processed_emails = set() self.processed_phones = set() # Отдельный список для реальных ошибок (не дублей из БД) self.real_errors = [] def import_from_file(self, file, update_existing: bool = False) -> dict: """ Импорт клиентов из загруженного файла. Args: file: UploadedFile update_existing: обновлять ли существующих клиентов (по email/телефону) Returns: dict: результат импорта """ file_format = self._detect_format(file) if file_format is None: return { "success": False, "message": "Неподдерживаемый формат файла. Ожидается CSV или XLSX.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "Unsupported file type"}], } if file_format == "xlsx" and load_workbook is None: return { "success": False, "message": "Для импорта XLSX необходим пакет openpyxl. Установите его и повторите попытку.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "openpyxl is not installed"}], } try: if file_format == "csv": headers, rows = self._read_csv(file) else: headers, rows = self._read_xlsx(file) except Exception as exc: return { "success": False, "message": f"Ошибка чтения файла: {exc}", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": str(exc)}], } if not headers: return { "success": False, "message": "В файле не найдены заголовки.", "created": 0, "updated": 0, "skipped": 0, "errors": [{"row": None, "reason": "Empty header row"}], } mapping = self._build_mapping(headers) if not any(field in mapping for field in ("name", "email", "phone")): return { "success": False, "message": "Не удалось сопоставить обязательные поля (имя, email или телефон).", "created": 0, "updated": 0, "skipped": len(rows), "errors": [ { "row": None, "reason": "No required fields (name/email/phone) mapped from headers", } ], } for index, row in enumerate(rows, start=2): # первая строка — заголовки self._process_row(index, row, mapping, update_existing) total_errors = len(self.errors) success = (self.success_count + self.update_count) > 0 if success and total_errors == 0: message = "Импорт завершён успешно." elif success and total_errors > 0: message = "Импорт завершён с ошибками." else: message = "Не удалось импортировать данные." return { "success": success, "message": message, "created": self.success_count, "updated": self.update_count, "skipped": self.skip_count, "errors": self.errors, "real_errors": self.real_errors, # Только невалидные данные, без дублей из БД } def _detect_format(self, file) -> str | None: name = (getattr(file, "name", None) or "").lower() if name.endswith(".csv"): return "csv" if name.endswith(".xlsx") or name.endswith(".xls"): return "xlsx" return None def _read_csv(self, file): file.seek(0) raw = file.read() if isinstance(raw, bytes): text = raw.decode("utf-8-sig") else: text = raw f = io.StringIO(text) reader = csv.DictReader(f) headers = reader.fieldnames or [] rows = list(reader) return headers, rows def _read_xlsx(self, file): file.seek(0) wb = load_workbook(file, read_only=True, data_only=True) ws = wb.active headers = [] rows = [] first_row = True for row in ws.iter_rows(values_only=True): if first_row: headers = [str(v).strip() if v is not None else "" for v in row] first_row = False continue if not any(row): continue row_dict = {} for idx, value in enumerate(row): if idx < len(headers): header = headers[idx] or f"col_{idx}" row_dict[header] = value rows.append(row_dict) return headers, rows def _normalize_header(self, header: str) -> str: if header is None: return "" cleaned = "".join(ch for ch in str(header).strip().lower() if ch.isalnum()) return cleaned def _build_mapping(self, headers): mapping = {} normalized_aliases = { field: {self._normalize_header(a) for a in aliases} for field, aliases in self.FIELD_ALIASES.items() } for header in headers: norm = self._normalize_header(header) if not norm: continue for field, alias_set in normalized_aliases.items(): if norm in alias_set and field not in mapping: mapping[field] = header break return mapping def _clean_value(self, value): if value is None: return "" return str(value).strip() def _normalize_phone(self, raw_phone: str) -> str | None: """ Умная нормализация телефона с попыткой различных регионов. Стратегии: 1. Если начинается с '+' — парсим как международный 2. Если начинается с '8' и 11 цифр — пробуем BY, потом RU 3. Пробуем распространённые регионы: BY, RU, PL, DE, US 4. Если всё не удалось — возвращаем None Returns: Нормализованный телефон в E.164 формате или None """ if not raw_phone or not phonenumbers: return None # Убираем все символы кроме цифр и + cleaned = ''.join(c for c in raw_phone if c.isdigit() or c == '+') if not cleaned: return None # Стратегия 1: Международный формат (начинается с +) if cleaned.startswith('+'): try: parsed = phonenumbers.parse(cleaned, None) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: pass # Стратегия 2: Формат '8XXXXXXXXXX' (11 цифр) if cleaned.startswith('8') and len(cleaned) == 11: for region in ['BY', 'RU']: try: parsed = phonenumbers.parse(cleaned, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue # Стратегия 3: Пробуем распространённые регионы for region in ['BY', 'RU', 'PL', 'DE', 'US']: try: # Добавляем '+' если его нет test_number = cleaned if cleaned.startswith('+') else f'+{cleaned}' parsed = phonenumbers.parse(test_number, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue # Пробуем без '+' try: parsed = phonenumbers.parse(cleaned, region) if phonenumbers.is_valid_number(parsed): return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164) except NumberParseException: continue return None def _process_row(self, row_number: int, row: dict, mapping: dict, update_existing: bool): name = self._clean_value(row.get(mapping.get("name", ""), "")) email = self._clean_value(row.get(mapping.get("email", ""), "")) phone_raw = self._clean_value(row.get(mapping.get("phone", ""), "")) notes = self._clean_value(row.get(mapping.get("notes", ""), "")) if not any([name, email, phone_raw, notes]): self.skip_count += 1 return # Нормализуем email if email: email = email.lower().strip() # Проверка на дубли внутри файла if email in self.processed_emails: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email, "phone": phone_raw or None, "reason": "Дубль email внутри файла (уже обработан в предыдущей строке).", "is_duplicate": True, # Дубликат внутри файла } ) return # Умная нормализация телефона phone = None phone_normalization_failed = False if phone_raw: phone = self._normalize_phone(phone_raw) if not phone: phone_normalization_failed = True else: # Проверка на дубли внутри файла if phone in self.processed_phones: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email or None, "phone": phone_raw, "reason": "Дубль телефона внутри файла (уже обработан в предыдущей строке).", "is_duplicate": True, # Дубликат внутри файла } ) return # Если телефон не удалось нормализовать, сохраняем в notes if phone_normalization_failed: note_addition = f"Исходный телефон из импорта (невалидный): {phone_raw}" if notes: notes = f"{notes}\n{note_addition}" else: notes = note_addition # Пытаемся найти существующего клиента existing = None if email: existing = Customer.objects.filter(email=email).first() if existing is None and phone: existing = Customer.objects.filter(phone=phone).first() if existing and not update_existing: self.skip_count += 1 self.errors.append( { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": "Клиент с таким email/телефоном уже существует, обновление отключено.", "is_duplicate": True, # Помечаем как дубликат из БД } ) return if existing and update_existing: if name: existing.name = name if email: existing.email = email if phone: existing.phone = phone if notes: if existing.notes: existing.notes = f"{existing.notes}\n{notes}" else: existing.notes = notes try: existing.full_clean() existing.save() self.update_count += 1 if email: self.processed_emails.add(email) if phone: self.processed_phones.add(phone) except Exception as exc: self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": str(exc), "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) # Реальная ошибка валидации return # Создание нового клиента customer = Customer( name=name or "", email=email or None, phone=phone or None, # Если не удалось нормализовать — будет None notes=notes or "", ) try: customer.full_clean() customer.save() self.success_count += 1 if email: self.processed_emails.add(email) if phone: self.processed_phones.add(phone) except Exception as exc: self.skip_count += 1 error_record = { "row": row_number, "email": email or None, "phone": phone_raw or None, "reason": str(exc), "is_duplicate": False, } self.errors.append(error_record) self.real_errors.append(error_record) # Реальная ошибка валидации