Files
octopus/myproject/customers/services/import_export.py
Andrey Smakotin 56725e8092 Добавлена система фильтрации клиентов с универсальным поиском
- Реализован универсальный поиск по имени, email и телефону в одной строке
- Добавлен счетчик общего количества клиентов
- Поиск работает по нажатию Enter или кнопке 'Поиск'
- Удалены неиспользуемые фильтры django-filter
- Упрощен интерфейс списка клиентов
- Добавлена кнопка 'Очистить' для сброса поиска
2025-12-14 22:39:32 +03:00

496 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Сервис для импорта и экспорта клиентов.
Этот модуль содержит логику импорта/экспорта клиентов в различных форматах (CSV, Excel).
Разделение на отдельный модуль улучшает организацию кода и следует принципам SRP.
"""
import csv
import io
from django.http import HttpResponse
from django.utils import timezone
from ..models import Customer
try:
# Для чтения .xlsx файлов
from openpyxl import load_workbook
except ImportError:
load_workbook = None
try:
import phonenumbers
from phonenumbers import NumberParseException
except ImportError:
phonenumbers = None
NumberParseException = Exception
class CustomerExporter:
"""
Класс для экспорта клиентов в различные форматы.
"""
@staticmethod
def export_to_csv():
"""
Экспортирует всех клиентов (кроме системного) в CSV файл.
Поля экспорта:
- ID
- Имя
- Email
- Телефон
- Дата создания
Примечание: Баланс кошелька НЕ экспортируется (требование безопасности).
Returns:
HttpResponse: HTTP ответ с CSV файлом для скачивания
"""
# Создаём HTTP ответ с CSV файлом
response = HttpResponse(content_type='text/csv; charset=utf-8')
timestamp = timezone.now().strftime("%Y%m%d_%H%M%S")
response['Content-Disposition'] = f'attachment; filename="customers_export_{timestamp}.csv"'
# Добавляем BOM для корректного открытия в Excel
response.write('\ufeff')
writer = csv.writer(response)
# Заголовки
writer.writerow([
'ID',
'Имя',
'Email',
'Телефон',
'Дата создания',
])
# Данные (исключаем системного клиента)
customers = Customer.objects.filter(is_system_customer=False).order_by('-created_at')
for customer in customers:
writer.writerow([
customer.id,
customer.name or '',
customer.email or '',
str(customer.phone) if customer.phone else '',
customer.created_at.strftime('%Y-%m-%d %H:%M:%S'),
])
return response
class CustomerImporter:
"""
Простой универсальный импорт клиентов из CSV/XLSX.
Поддерживаемые форматы:
- CSV (UTF-8, заголовок в первой строке)
- XLSX (первая строка — заголовки)
Алгоритм:
- Читаем файл → получаем headers и list[dict] строк
- По headers строим маппинг на поля Customer: name/email/phone/notes
- Для каждой строки:
- пропускаем полностью пустые строки
- ищем клиента по email, потом по телефону
- если update_existing=False и клиент найден → пропуск
- если update_existing=True → обновляем найденного
- если не найден → создаём нового
- Вся валидация/нормализация делается через Customer.full_clean()
"""
FIELD_ALIASES = {
"name": ["name", "имя", "фио", "клиент", "фамилияимя"],
"email": ["email", "e-mail", "e_mail", "почта", "элпочта", "электроннаяпочта"],
"phone": ["phone", "телефон", "тел", "моб", "мобильный", "номер"],
"notes": ["notes", "заметки", "комментарий", "comment", "примечание"],
}
def __init__(self):
self.errors = []
self.success_count = 0
self.update_count = 0
self.skip_count = 0
# Отслеживание уже обработанных email/phone для дедупликации внутри файла
self.processed_emails = set()
self.processed_phones = set()
# Отдельный список для реальных ошибок (не дублей из БД)
self.real_errors = []
def import_from_file(self, file, update_existing: bool = False) -> dict:
"""
Импорт клиентов из загруженного файла.
Args:
file: UploadedFile
update_existing: обновлять ли существующих клиентов (по email/телефону)
Returns:
dict: результат импорта
"""
file_format = self._detect_format(file)
if file_format is None:
return {
"success": False,
"message": "Неподдерживаемый формат файла. Ожидается CSV или XLSX.",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": "Unsupported file type"}],
}
if file_format == "xlsx" and load_workbook is None:
return {
"success": False,
"message": "Для импорта XLSX необходим пакет openpyxl. Установите его и повторите попытку.",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": "openpyxl is not installed"}],
}
try:
if file_format == "csv":
headers, rows = self._read_csv(file)
else:
headers, rows = self._read_xlsx(file)
except Exception as exc:
return {
"success": False,
"message": f"Ошибка чтения файла: {exc}",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": str(exc)}],
}
if not headers:
return {
"success": False,
"message": "В файле не найдены заголовки.",
"created": 0,
"updated": 0,
"skipped": 0,
"errors": [{"row": None, "reason": "Empty header row"}],
}
mapping = self._build_mapping(headers)
if not any(field in mapping for field in ("name", "email", "phone")):
return {
"success": False,
"message": "Не удалось сопоставить обязательные поля (имя, email или телефон).",
"created": 0,
"updated": 0,
"skipped": len(rows),
"errors": [
{
"row": None,
"reason": "No required fields (name/email/phone) mapped from headers",
}
],
}
for index, row in enumerate(rows, start=2): # первая строка — заголовки
self._process_row(index, row, mapping, update_existing)
total_errors = len(self.errors)
success = (self.success_count + self.update_count) > 0
if success and total_errors == 0:
message = "Импорт завершён успешно."
elif success and total_errors > 0:
message = "Импорт завершён с ошибками."
else:
message = "Не удалось импортировать данные."
return {
"success": success,
"message": message,
"created": self.success_count,
"updated": self.update_count,
"skipped": self.skip_count,
"errors": self.errors,
"real_errors": self.real_errors, # Только невалидные данные, без дублей из БД
}
def _detect_format(self, file) -> str | None:
name = (getattr(file, "name", None) or "").lower()
if name.endswith(".csv"):
return "csv"
if name.endswith(".xlsx") or name.endswith(".xls"):
return "xlsx"
return None
def _read_csv(self, file):
file.seek(0)
raw = file.read()
if isinstance(raw, bytes):
text = raw.decode("utf-8-sig")
else:
text = raw
f = io.StringIO(text)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
rows = list(reader)
return headers, rows
def _read_xlsx(self, file):
file.seek(0)
wb = load_workbook(file, read_only=True, data_only=True)
ws = wb.active
headers = []
rows = []
first_row = True
for row in ws.iter_rows(values_only=True):
if first_row:
headers = [str(v).strip() if v is not None else "" for v in row]
first_row = False
continue
if not any(row):
continue
row_dict = {}
for idx, value in enumerate(row):
if idx < len(headers):
header = headers[idx] or f"col_{idx}"
row_dict[header] = value
rows.append(row_dict)
return headers, rows
def _normalize_header(self, header: str) -> str:
if header is None:
return ""
cleaned = "".join(ch for ch in str(header).strip().lower() if ch.isalnum())
return cleaned
def _build_mapping(self, headers):
mapping = {}
normalized_aliases = {
field: {self._normalize_header(a) for a in aliases}
for field, aliases in self.FIELD_ALIASES.items()
}
for header in headers:
norm = self._normalize_header(header)
if not norm:
continue
for field, alias_set in normalized_aliases.items():
if norm in alias_set and field not in mapping:
mapping[field] = header
break
return mapping
def _clean_value(self, value):
if value is None:
return ""
return str(value).strip()
def _normalize_phone(self, raw_phone: str) -> str | None:
"""
Умная нормализация телефона с попыткой различных регионов.
Стратегии:
1. Если начинается с '+' — парсим как международный
2. Если начинается с '8' и 11 цифр — пробуем BY, потом RU
3. Пробуем распространённые регионы: BY, RU, PL, DE, US
4. Если всё не удалось — возвращаем None
Returns:
Нормализованный телефон в E.164 формате или None
"""
if not raw_phone or not phonenumbers:
return None
# Убираем все символы кроме цифр и +
cleaned = ''.join(c for c in raw_phone if c.isdigit() or c == '+')
if not cleaned:
return None
# Стратегия 1: Международный формат (начинается с +)
if cleaned.startswith('+'):
try:
parsed = phonenumbers.parse(cleaned, None)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
pass
# Стратегия 2: Формат '8XXXXXXXXXX' (11 цифр)
if cleaned.startswith('8') and len(cleaned) == 11:
for region in ['BY', 'RU']:
try:
parsed = phonenumbers.parse(cleaned, region)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
continue
# Стратегия 3: Пробуем распространённые регионы
for region in ['BY', 'RU', 'PL', 'DE', 'US']:
try:
# Добавляем '+' если его нет
test_number = cleaned if cleaned.startswith('+') else f'+{cleaned}'
parsed = phonenumbers.parse(test_number, region)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
continue
# Пробуем без '+'
try:
parsed = phonenumbers.parse(cleaned, region)
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
except NumberParseException:
continue
return None
def _process_row(self, row_number: int, row: dict, mapping: dict, update_existing: bool):
name = self._clean_value(row.get(mapping.get("name", ""), ""))
email = self._clean_value(row.get(mapping.get("email", ""), ""))
phone_raw = self._clean_value(row.get(mapping.get("phone", ""), ""))
notes = self._clean_value(row.get(mapping.get("notes", ""), ""))
if not any([name, email, phone_raw, notes]):
self.skip_count += 1
return
# Нормализуем email
if email:
email = email.lower().strip()
# Проверка на дубли внутри файла
if email in self.processed_emails:
self.skip_count += 1
self.errors.append(
{
"row": row_number,
"email": email,
"phone": phone_raw or None,
"reason": "Дубль email внутри файла (уже обработан в предыдущей строке).",
"is_duplicate": True, # Дубликат внутри файла
}
)
return
# Умная нормализация телефона
phone = None
phone_normalization_failed = False
if phone_raw:
phone = self._normalize_phone(phone_raw)
if not phone:
phone_normalization_failed = True
else:
# Проверка на дубли внутри файла
if phone in self.processed_phones:
self.skip_count += 1
self.errors.append(
{
"row": row_number,
"email": email or None,
"phone": phone_raw,
"reason": "Дубль телефона внутри файла (уже обработан в предыдущей строке).",
"is_duplicate": True, # Дубликат внутри файла
}
)
return
# Если телефон не удалось нормализовать, сохраняем в notes
if phone_normalization_failed:
note_addition = f"Исходный телефон из импорта (невалидный): {phone_raw}"
if notes:
notes = f"{notes}\n{note_addition}"
else:
notes = note_addition
# Пытаемся найти существующего клиента
existing = None
if email:
existing = Customer.objects.filter(email=email).first()
if existing is None and phone:
existing = Customer.objects.filter(phone=phone).first()
if existing and not update_existing:
self.skip_count += 1
self.errors.append(
{
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": "Клиент с таким email/телефоном уже существует, обновление отключено.",
"is_duplicate": True, # Помечаем как дубликат из БД
}
)
return
if existing and update_existing:
if name:
existing.name = name
if email:
existing.email = email
if phone:
existing.phone = phone
if notes:
if existing.notes:
existing.notes = f"{existing.notes}\n{notes}"
else:
existing.notes = notes
try:
existing.full_clean()
existing.save()
self.update_count += 1
if email:
self.processed_emails.add(email)
if phone:
self.processed_phones.add(phone)
except Exception as exc:
self.skip_count += 1
error_record = {
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": str(exc),
"is_duplicate": False,
}
self.errors.append(error_record)
self.real_errors.append(error_record) # Реальная ошибка валидации
return
# Создание нового клиента
customer = Customer(
name=name or "",
email=email or None,
phone=phone or None, # Если не удалось нормализовать — будет None
notes=notes or "",
)
try:
customer.full_clean()
customer.save()
self.success_count += 1
if email:
self.processed_emails.add(email)
if phone:
self.processed_phones.add(phone)
except Exception as exc:
self.skip_count += 1
error_record = {
"row": row_number,
"email": email or None,
"phone": phone_raw or None,
"reason": str(exc),
"is_duplicate": False,
}
self.errors.append(error_record)
self.real_errors.append(error_record) # Реальная ошибка валидации