Files
octopus/myproject/customers/views.py
Andrey Smakotin 5ded404346 Добавлены фильтры для списка клиентов через django-filter
- Создан CustomerFilter с тремя фильтрами:
  * Есть заметки (has_notes)
  * Нет телефона (no_phone)
  * Нет email (no_email)

- Обновлен views.py для использования фильтров
- Добавлены чекбоксы фильтров в шаблон списка клиентов
- Фильтры работают совместно с поиском
- Кнопка Очистить отображается при активных фильтрах или поиске
2026-01-03 14:50:24 +03:00

898 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from django.shortcuts import render, get_object_or_404, redirect
from django.contrib import messages
from django.core.paginator import Paginator
from django.core.exceptions import ValidationError, PermissionDenied
from django.db.models import Q, Sum, F, Value, DecimalField
from django.db.models.functions import Greatest, Coalesce
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from django.contrib.auth.decorators import login_required
from user_roles.decorators import manager_or_owner_required
import phonenumbers
import json
from decimal import Decimal
from .models import Customer, ContactChannel
from .forms import CustomerForm, ContactChannelForm
from .filters import CustomerFilter
def normalize_query_phone(q):
"""Normalize phone number for search"""
try:
parsed = phonenumbers.parse(q, "BY")
if phonenumbers.is_valid_number(parsed):
return phonenumbers.format_number(parsed, phonenumbers.PhoneNumberFormat.E164)
return q
except:
return q
def customer_list(request):
"""Список всех клиентов"""
query = request.GET.get('q', '').strip()
# Исключаем системного клиента из списка
customers = Customer.objects.filter(is_system_customer=False)
# Применяем фильтры django-filter
customer_filter = CustomerFilter(request.GET, queryset=customers)
customers = customer_filter.qs
if query:
# Нормализуем номер телефона
phone_normalized = normalize_query_phone(query)
# Определяем стратегию поиска
strategy, search_value = determine_search_strategy(query)
# Строим Q-объект для поиска
q_objects = build_customer_search_query(query, strategy, search_value)
# Добавляем поиск по телефону
if phone_normalized:
q_objects |= Q(phone__icontains=phone_normalized)
# Поиск по цифрам телефона
query_digits = ''.join(c for c in query if c.isdigit())
should_search_by_phone_digits = is_query_phone_only(query) and len(query_digits) >= 3
if should_search_by_phone_digits:
customers_by_phone = Customer.objects.filter(
phone__isnull=False,
phone__icontains=query_digits
)
if customers_by_phone.exists():
q_objects |= Q(pk__in=customers_by_phone.values_list('pk', flat=True))
# Поиск по каналам связи (Instagram, Telegram и т.д.)
channel_matches = ContactChannel.objects.filter(
value__icontains=query
).values_list('customer_id', flat=True)
if channel_matches:
q_objects |= Q(pk__in=channel_matches)
customers = customers.filter(q_objects)
customers = customers.order_by('-created_at')
# Пагинация
paginator = Paginator(customers, 25)
page_number = request.GET.get('page')
page_obj = paginator.get_page(page_number)
context = {
'page_obj': page_obj,
'query': query,
'total_customers': paginator.count, # Используем count из paginator, чтобы избежать дублирования SQL запроса
'filter': customer_filter, # Добавляем фильтр в контекст
}
return render(request, 'customers/customer_list.html', context)
def customer_detail(request, pk):
"""Детали клиента"""
customer = get_object_or_404(Customer, pk=pk)
# Для системного клиента показываем специальную заглушку
if customer.is_system_customer:
return render(request, 'customers/customer_system.html')
# Рассчитываем общий долг по заказам на стороне БД
# Долг = все заказы КРОМЕ отмененных и полностью оплаченных
# ВКЛЮЧАЕТ завершенные заказы с неполной оплатой!
total_debt_result = customer.orders.exclude(
Q(status__is_negative_end=True) | # Отмененные → учитываются в refund_amount
Q(payment_status='paid') # Полностью оплаченные
).aggregate(
total_debt=Coalesce(
Sum(Greatest(F('total_amount') - F('amount_paid'), Value(0), output_field=DecimalField())),
Value(0),
output_field=DecimalField()
)
)
total_debt = total_debt_result['total_debt'] or Decimal('0')
# Количество заказов с долгом (с той же логикой)
active_orders_count = customer.orders.exclude(
Q(status__is_negative_end=True) |
Q(payment_status='paid')
).count()
# Сумма к возврату (отмененные заказы с оплатой)
refund_amount_result = customer.orders.filter(
status__is_negative_end=True, # Отмененные
amount_paid__gt=0 # С оплатой
).aggregate(
total_refund=Coalesce(
Sum('amount_paid'),
Value(0),
output_field=DecimalField()
)
)
refund_amount = refund_amount_result['total_refund'] or Decimal('0')
# Сумма всех успешных заказов
total_orders_sum = customer.get_successful_orders_total()
# Сумма успешных заказов за последний год
last_year_orders_sum = customer.get_last_year_orders_total()
# История транзакций кошелька (последние 20)
from .models import WalletTransaction
wallet_transactions = WalletTransaction.objects.filter(
customer=customer
).select_related('order', 'created_by').order_by('-created_at')[:20]
# История заказов с пагинацией и оптимизацией запросов
orders_list = customer.orders.select_related('status', 'delivery').order_by('-created_at')
paginator = Paginator(orders_list, 10) # 10 заказов на страницу
page_number = request.GET.get('page')
orders_page = paginator.get_page(page_number)
# Каналы связи клиента
contact_channels = customer.contact_channels.all()
context = {
'customer': customer,
'total_debt': total_debt,
'active_orders_count': active_orders_count,
'refund_amount': refund_amount,
'wallet_transactions': wallet_transactions,
'orders_page': orders_page,
'total_orders_sum': total_orders_sum,
'last_year_orders_sum': last_year_orders_sum,
'contact_channels': contact_channels,
}
return render(request, 'customers/customer_detail.html', context)
def customer_create(request):
"""Создание нового клиента"""
if request.method == 'POST':
form = CustomerForm(request.POST)
if form.is_valid():
customer = form.save()
messages.success(request, f'Клиент {customer.full_name} успешно создан.')
return redirect('customers:customer-detail', pk=customer.pk)
else:
form = CustomerForm()
return render(request, 'customers/customer_form.html', {'form': form, 'is_creating': True})
def customer_delete(request, pk):
"""Удаление клиента"""
customer = get_object_or_404(Customer, pk=pk)
# Проверяем, не системный ли это клиент
if customer.is_system_customer:
messages.error(request, 'Невозможно удалить системного клиента. Он необходим для корректной работы системы.')
return redirect('customers:customer-detail', pk=pk)
if request.method == 'POST':
customer_name = customer.full_name
try:
customer.delete()
messages.success(request, f'Клиент {customer_name} успешно удален.')
return redirect('customers:customer-list')
except ValidationError as e:
messages.error(request, str(e))
return redirect('customers:customer-detail', pk=pk)
context = {
'customer': customer
}
return render(request, 'customers/customer_confirm_delete.html', context)
# === CONTACT CHANNELS ===
@require_http_methods(["POST"])
def add_contact_channel(request, customer_pk):
"""Добавить канал связи клиенту"""
customer = get_object_or_404(Customer, pk=customer_pk)
if customer.is_system_customer:
messages.error(request, 'Нельзя добавлять каналы связи системному клиенту.')
return redirect('customers:customer-detail', pk=customer_pk)
form = ContactChannelForm(request.POST)
if form.is_valid():
channel = form.save(commit=False)
channel.customer = customer
channel.save()
messages.success(request, f'Канал "{channel.get_channel_type_display()}" добавлен')
else:
for field, errors in form.errors.items():
for error in errors:
messages.error(request, error)
return redirect('customers:customer-detail', pk=customer_pk)
@require_http_methods(["POST"])
def delete_contact_channel(request, pk):
"""Удалить канал связи"""
channel = get_object_or_404(ContactChannel, pk=pk)
customer_pk = channel.customer.pk
if channel.customer.is_system_customer:
messages.error(request, 'Нельзя удалять каналы связи системного клиента.')
return redirect('customers:customer-detail', pk=customer_pk)
channel_name = channel.get_channel_type_display()
channel.delete()
messages.success(request, f'Канал "{channel_name}" удалён')
return redirect('customers:customer-detail', pk=customer_pk)
# === AJAX API ENDPOINTS ===
def determine_search_strategy(query):
"""
Определяет стратегию поиска на основе содержимого query.
На основе наличия и позиции символа @ определяет, по каким полям и как искать.
Возвращает tuple (strategy, search_value):
- 'name_only': поиск только по имени (для очень коротких запросов)
- 'email_prefix': поиск по началу email (query заканчивается на @)
- 'email_domain': поиск по домену (query начинается с @)
- 'email_full': полный поиск по email (query содержит обе части)
- 'universal': поиск везде (имя + email, для средних запросов)
Примеры:
- 'team_x3m@' → ('email_prefix', 'team_x3m')
- '@bk' → ('email_domain', 'bk')
- 'test@bk.ru' → ('email_full', 'test@bk.ru')
- 'natul' → ('universal', 'natul')
- 'te' → ('name_only', 'te')
"""
if '@' in query:
local, _, domain = query.partition('@')
if not local:
# Начинается с @: ищем по домену
return ('email_domain', domain)
if not domain:
# Заканчивается на @: ищем по префиксу email
return ('email_prefix', local)
# Есть и локальная часть, и домен: полный поиск
return ('email_full', query)
else:
# Нет @: определяем по длине
if len(query) < 2:
return ('name_only', query)
if len(query) >= 3:
return ('universal', query)
# 2 символа: только имя (слишком мало для поиска по email)
return ('name_only', query)
def is_query_phone_only(query):
"""
Проверяет, содержит ли query только символы телефонного номера.
Возвращает True, если query состоит ТОЛЬКО из:
- цифр: 0-9
- телефонных символов: +, -, (, ), пробелов, точек
И ОБЯЗАТЕЛЬНО содержит хотя бы одну цифру.
Возвращает False, если:
- есть буквы или другие символы (означает, что это поиск по имени/email)
- query пустой или состоит только из пробелов
Примеры:
- '295' → True (только цифры)
- '+375291234567' → True (цифры и телефонные символы)
- '(029) 123-45' → True (цифры и телефонные символы)
- 'x3m' → False (содержит буквы)
- 'team_x3m' → False (содержит буквы)
- 'Иван' → False (содержит буквы)
- ' ' → False (только пробелы, нет цифр)
- '' → False (пустая строка)
"""
if not query or not query.strip():
return False
# Проверяем, что query содержит ТОЛЬКО цифры и телефонные символы
phone_chars = set('0123456789+- ().')
if not all(c in phone_chars for c in query):
return False
# Проверяем, что есть хотя бы одна цифра
return any(c.isdigit() for c in query)
def build_customer_search_query(query, strategy, search_value):
"""
Строит Q-объект для поиска клиентов на основе стратегии.
Используется в customer_list() и api_search_customers() для единообразия.
Args:
query: Исходный поисковый запрос (для fallback)
strategy: Стратегия поиска (из determine_search_strategy)
search_value: Значение для поиска (из determine_search_strategy)
Returns:
Q-объект для фильтрации Customer.objects
"""
if strategy == 'name_only':
return Q(name__icontains=search_value)
elif strategy == 'email_prefix':
# Query вида "team_x3m@" — ищем email, начинающиеся с "team_x3m"
return Q(email__istartswith=search_value)
elif strategy == 'email_domain':
# Query вида "@bk" — ищем все email с доменом "@bk.*"
return Q(email__icontains=f'@{search_value}')
elif strategy == 'email_full':
# Query вида "test@bk.ru" — полный поиск по email
return Q(email__icontains=search_value)
elif strategy == 'universal':
# Query вида "natul" (3+ символов) — ищем везде
return Q(name__icontains=search_value) | Q(email__icontains=search_value)
else:
# На случай неизвестной стратегии (не должно быть)
return Q(name__icontains=query)
@require_http_methods(["GET"])
def api_search_customers(request):
"""
AJAX endpoint для поиска клиента по имени, телефону или email.
Параметры GET:
- q: поисковая строка
Возвращает JSON с результатами поиска:
{
"results": [
{"id": 1, "text": "Иван Петров (+375291234567)", "name": "Иван Петров", "phone": "+375291234567", "email": "ivan@example.com"},
...
],
"pagination": {"more": false}
}
Если ничего не найдено и заданы параметры поиска, возвращает:
{
"results": [
{"id": null, "text": "Создать клиента: 'Поиск'", "is_create_option": true, "search_text": "Поиск"}
],
"pagination": {"more": false}
}
"""
query = request.GET.get('q', '').strip()
if not query or len(query) < 1:
return JsonResponse({
'results': [],
'pagination': {'more': False}
})
# Пытаемся нормализовать номер телефона для поиска
phone_normalized = normalize_query_phone(query)
# Для поиска по телефону: извлекаем только цифры и ищем по ним
# Это позволит найти клиента независимо от формата ввода
query_digits = ''.join(c for c in query if c.isdigit())
# Ищем по имени, email или телефону
# Используем Q-объекты для OR условий
# Определяем стратегию поиска на основе содержимого query
strategy, search_value = determine_search_strategy(query)
# Строим Q-объект для поиска (единая функция, используется везде)
q_objects = build_customer_search_query(query, strategy, search_value)
# Для поиска по номерам телефонов: применяем умную логику
# Ищем по телефону ТОЛЬКО если query состоит из ТОЛЬКО цифр и телефонных символов
# (никаких букв — потому что если есть буквы, это явно поиск по имени/email, не по телефону)
if phone_normalized:
q_objects |= Q(phone__icontains=phone_normalized)
# Проверяем, похож ли query на номер телефона (только цифры/+/-/() и минимум 3 цифры)
should_search_by_phone_digits = is_query_phone_only(query) and len(query_digits) >= 3
if should_search_by_phone_digits:
# Ищем клиентов, чьи телефоны содержат введенные цифры
# Используем LIKE запрос вместо Python loop для оптимизации при большом количестве клиентов
# Номер телефона в E.164 формате: +375291234567
# Мы ищем по цифрам, поэтому можно просто использовать LIKE с цифрами
customers_by_phone = Customer.objects.filter(
phone__isnull=False,
phone__icontains=query_digits # Простой поиск по цифрам в phone строке
)
if customers_by_phone.exists():
q_objects |= Q(pk__in=customers_by_phone.values_list('pk', flat=True))
# Поиск по каналам связи (Instagram, Telegram и т.д.)
channel_matches = ContactChannel.objects.filter(
value__icontains=query
).values_list('customer_id', flat=True)
if channel_matches:
q_objects |= Q(pk__in=channel_matches)
# Исключаем системного клиента из результатов поиска
customers = Customer.objects.filter(q_objects).filter(is_system_customer=False).distinct().order_by('name')[:20]
results = []
# Добавляем найденные клиентов
for customer in customers:
phone_display = str(customer.phone) if customer.phone else ''
text = customer.name
if phone_display:
text += f' ({phone_display})'
results.append({
'id': customer.pk,
'text': text,
'name': customer.name,
'phone': phone_display,
'email': customer.email,
'wallet_balance': float(customer.wallet_balance),
})
# Если ничего не найдено, предлагаем создать нового клиента
if not results:
results.append({
'id': '__create_new__', # Специальный ID для опции создания
'text': f'Создать клиента: "{query}"',
'is_create_option': True,
'search_text': query,
})
return JsonResponse({
'results': results,
'pagination': {'more': False}
})
@require_http_methods(["POST"])
def api_update_customer(request, pk):
"""
AJAX endpoint для обновления отдельного поля клиента (inline-редактирование).
Принимает POST JSON:
{
"field": "name",
"value": "Новое имя"
}
Возвращает JSON:
{
"success": true,
"value": "Новое имя"
}
При ошибке:
{
"success": false,
"error": "Текст ошибки"
}
"""
customer = get_object_or_404(Customer, pk=pk)
# Защита системного клиента
if customer.is_system_customer:
return JsonResponse({
'success': False,
'error': 'Системный клиент не может быть изменён'
}, status=403)
try:
data = json.loads(request.body)
field = data.get('field')
value = data.get('value', '').strip()
# Разрешённые поля для редактирования
allowed_fields = ['name', 'phone', 'email', 'notes']
if field not in allowed_fields:
return JsonResponse({
'success': False,
'error': f'Поле "{field}" недоступно для редактирования'
}, status=400)
# Валидация через форму
form_data = {field: value if value else None}
form = CustomerForm(form_data, instance=customer)
# Проверяем только нужное поле
if field in form.fields:
form.fields[field].required = False
field_value = form.fields[field].clean(value if value else None)
# Обновляем поле
setattr(customer, field, field_value)
customer.save(update_fields=[field, 'updated_at'])
# Возвращаем отформатированное значение
display_value = getattr(customer, field)
if display_value is None:
display_value = ''
elif field == 'phone' and display_value:
display_value = str(display_value)
else:
display_value = str(display_value)
return JsonResponse({
'success': True,
'value': display_value
})
return JsonResponse({
'success': False,
'error': 'Неизвестное поле'
}, status=400)
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Некорректный JSON'
}, status=400)
except ValidationError as e:
error_msg = e.message if hasattr(e, 'message') else str(e)
return JsonResponse({
'success': False,
'error': error_msg
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=400)
@require_http_methods(["POST"])
def api_create_customer(request):
"""
AJAX endpoint для создания нового клиента.
Принимает POST JSON:
{
"name": "Иван Петров",
"phone": "+375291234567",
"email": "ivan@example.com"
}
Возвращает JSON:
{
"success": true,
"id": 123,
"name": "Иван Петров",
"phone": "+375291234567",
"email": "ivan@example.com"
}
При ошибке:
{
"success": false,
"error": "Клиент с таким номером телефона уже существует"
}
"""
try:
data = json.loads(request.body)
# Нормализуем данные
name = data.get('name', '').strip() if data.get('name') else ''
phone = data.get('phone', '').strip() if data.get('phone') else ''
email = data.get('email', '').strip() if data.get('email') else ''
# Подготавливаем данные для формы
form_data = {
'name': name,
'phone': phone if phone else None,
'email': email if email else None,
}
# Используем форму для валидации и создания
form = CustomerForm(data=form_data)
if form.is_valid():
# Сохраняем клиента через форму (автоматически вызывает все валидации)
customer = form.save()
phone_display = str(customer.phone) if customer.phone else ''
return JsonResponse({
'success': True,
'id': customer.pk,
'name': customer.name,
'phone': phone_display,
'email': customer.email if customer.email else '',
'wallet_balance': float(customer.wallet_balance),
}, status=201)
else:
# Собираем ошибки валидации с указанием полей
errors = []
field_labels = {
'name': 'Имя клиента',
'phone': 'Телефон',
'email': 'Email',
}
for field, field_errors in form.errors.items():
field_label = field_labels.get(field, field)
for error in field_errors:
errors.append(f'{field_label}: {error}')
# Возвращаем все ошибки
error_message = '<br>'.join(errors) if errors else 'Ошибка валидации данных'
return JsonResponse({
'success': False,
'error': error_message,
'errors': form.errors # Добавляем детальную информацию об ошибках
}, status=400)
except json.JSONDecodeError:
return JsonResponse({
'success': False,
'error': 'Некорректный JSON'
}, status=400)
except Exception as e:
return JsonResponse({
'success': False,
'error': f'Ошибка сервера: {str(e)}'
}, status=500)
@manager_or_owner_required
@require_http_methods(["POST"])
def wallet_deposit(request, pk):
"""Пополнение кошелька клиента"""
customer = get_object_or_404(Customer, pk=pk)
if customer.is_system_customer:
messages.error(request, 'Операции с кошельком недоступны для системного клиента.')
return redirect('customers:customer-detail', pk=pk)
amount_str = request.POST.get('amount') or ''
description = (request.POST.get('description') or '').strip()
try:
amount = Decimal(amount_str.replace(',', '.'))
except Exception:
messages.error(request, 'Некорректное значение суммы для пополнения.')
return redirect('customers:customer-detail', pk=pk)
try:
customer.adjust_wallet(amount, description, request.user)
messages.success(request, f'Кошелёк клиента пополнен на {amount:.2f} руб.')
except ValueError as e:
messages.error(request, str(e))
except ValidationError as e:
messages.error(request, '; '.join(e.messages) if hasattr(e, 'messages') else str(e))
return redirect('customers:customer-detail', pk=pk)
@manager_or_owner_required
@require_http_methods(["POST"])
def wallet_withdraw(request, pk):
"""Возврат / списание с кошелька клиента"""
customer = get_object_or_404(Customer, pk=pk)
if customer.is_system_customer:
messages.error(request, 'Операции с кошельком недоступны для системного клиента.')
return redirect('customers:customer-detail', pk=pk)
amount_str = request.POST.get('amount') or ''
description = (request.POST.get('description') or '').strip()
try:
amount = Decimal(amount_str.replace(',', '.'))
except Exception:
messages.error(request, 'Некорректное значение суммы для списания.')
return redirect('customers:customer-detail', pk=pk)
# Для списания делаем сумму отрицательной
withdraw_amount = -amount
try:
customer.adjust_wallet(withdraw_amount, description, request.user)
messages.success(request, f'С кошелька клиента списано {amount:.2f} руб.')
except ValueError as e:
messages.error(request, str(e))
except ValidationError as e:
messages.error(request, '; '.join(e.messages) if hasattr(e, 'messages') else str(e))
return redirect('customers:customer-detail', pk=pk)
@login_required
@manager_or_owner_required
def customer_import(request):
"""
Импорт клиентов из CSV/Excel файла.
"""
import os
from pathlib import Path
from django.conf import settings
from .services.import_export import CustomerImporter
if request.method == 'POST':
file = request.FILES.get('file')
update_existing = request.POST.get('update_existing') == 'on'
if not file:
messages.error(request, 'Файл не был загружен.')
return redirect('customers:customer-import')
# Выполняем импорт
importer = CustomerImporter()
result = importer.import_from_file(file, update_existing=update_existing)
# Формируем сообщения о результате
if result['success']:
success_parts = []
if result['created'] > 0:
success_parts.append(f"создано {result['created']}")
if result['enriched'] > 0:
success_parts.append(f"дополнено {result['enriched']}")
if result['updated'] > 0:
success_parts.append(f"обновлено {result['updated']}")
success_msg = f"Импорт завершён: {', '.join(success_parts) if success_parts else 'нет изменений'}"
if result.get('duplicate_count', 0) > 0:
success_msg += f", пропущено дубликатов: {result['duplicate_count']}"
if result.get('conflicts_resolved', 0) > 0:
success_msg += f", создано альтернативных контактов: {result['conflicts_resolved']}"
messages.success(request, success_msg)
else:
messages.error(request, result['message'])
# Если есть реальные ошибки валидации - генерируем файл
if result.get('real_error_count', 0) > 0:
error_file_data = importer.generate_error_file()
if error_file_data:
content, filename = error_file_data
# Сохраняем временный файл
temp_dir = Path(settings.MEDIA_ROOT) / 'temp_imports'
temp_dir.mkdir(parents=True, exist_ok=True)
temp_file_path = temp_dir / filename
with open(temp_file_path, 'wb') as f:
f.write(content)
# Сохраняем путь в сессии
request.session['import_error_file'] = str(temp_file_path)
request.session['import_error_filename'] = filename
messages.warning(
request,
f'Обнаружено {result["real_error_count"]} ошибок валидации. '
f'Скачайте файл с ошибками для исправления.'
)
# Передаём результаты в шаблон
context = {
'title': 'Импорт клиентов',
'import_result': result,
'has_error_file': 'import_error_file' in request.session,
}
return render(request, 'customers/customer_import.html', context)
context = {
'title': 'Импорт клиентов',
}
return render(request, 'customers/customer_import.html', context)
@login_required
@manager_or_owner_required
def customer_import_download_errors(request):
"""
Скачивание файла с ошибками импорта и немедленное удаление.
"""
import os
from django.http import FileResponse, Http404
file_path = request.session.get('import_error_file')
filename = request.session.get('import_error_filename', 'errors.csv')
if not file_path or not os.path.exists(file_path):
messages.error(request, 'Файл с ошибками не найден или уже был удалён.')
return redirect('customers:customer-import')
try:
# Открываем файл для чтения
response = FileResponse(
open(file_path, 'rb'),
as_attachment=True,
filename=filename
)
# Удаляем из сессии
del request.session['import_error_file']
del request.session['import_error_filename']
# Планируем удаление файла после отправки
# (FileResponse закроет файл автоматически, затем удаляем)
def cleanup_file():
try:
if os.path.exists(file_path):
os.remove(file_path)
except Exception:
pass
# Django FileResponse автоматически закрывает файл после отправки
# Используем middleware или сигнал для очистки, но проще - удалим сразу после response
# Поскольку FileResponse читает файл в память при малом размере, удаляем сразу
import atexit
atexit.register(cleanup_file)
# Альтернатива: читаем файл в память и сразу удаляем
with open(file_path, 'rb') as f:
file_content = f.read()
# Удаляем файл немедленно
try:
os.remove(file_path)
except Exception:
pass
# Возвращаем содержимое из памяти
from django.http import HttpResponse
response = HttpResponse(file_content, content_type='application/octet-stream')
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
except Exception as e:
messages.error(request, f'Ошибка при скачивании файла: {str(e)}')
return redirect('customers:customer-import')
@login_required
@manager_or_owner_required
def customer_export(request):
"""
Экспорт клиентов в CSV файл.
"""
from .services.import_export import CustomerExporter
exporter = CustomerExporter()
return exporter.export_to_csv()