Добавлен асинхронный импорт товаров с параллельной загрузкой фото + исправлен баг со счётчиком SKU

- Реализован импорт Product из CSV/XLSX через Celery с прогресс-баром
- Параллельная загрузка фото товаров с внешних URL (масштабируемость до 500+ товаров)
- Добавлена модель ProductImportJob для отслеживания статуса импорта
- Создан таск download_product_photo_async для загрузки фото в фоне
- Интеграция с существующим ImageProcessor (синхронная обработка через use_async=False)
- Добавлены view и template для импорта с real-time обновлением через AJAX

FIX: Исправлен баг со счётчиком SKU - инкремент только после успешного сохранения
- Добавлен SKUCounter.peek_next_value() - возвращает следующий номер БЕЗ инкремента
- Добавлен SKUCounter.increment_counter() - инкрементирует счётчик
- generate_product_sku() использует peek_next_value() вместо get_next_value()
- Добавлен post_save сигнал increment_sku_counter_after_save() для инкремента после создания
- Предотвращает пропуски номеров при ошибках валидации (например cost_price NULL)

FIX: Исправлена ошибка с is_main в ProductPhoto
- ProductPhoto не имеет поля is_main, используется только order
- Первое фото (order=0) автоматически считается главным
- Удалён параметр is_main из download_product_photo_async и _collect_photo_tasks

Изменены файлы:
- products/models/base.py - методы для управления счётчиком SKU
- products/models/import_job.py - модель для отслеживания импорта
- products/services/import_export.py - сервис импорта с поддержкой Celery
- products/tasks.py - таски для асинхронного импорта и загрузки фото
- products/signals.py - сигнал для инкремента счётчика после сохранения
- products/utils/sku_generator.py - использование peek_next_value()
- products/views/product_import_views.py - view для импорта
- products/templates/products/product_import*.html - UI для импорта
- docker/entrypoint.sh - настройка Celery worker (concurrency=4)
- requirements.txt - добавлен requests для загрузки фото
This commit is contained in:
2026-01-06 07:10:12 +03:00
parent d44ae0b598
commit 0f19542ac9
16 changed files with 1678 additions and 6 deletions

View File

@@ -371,3 +371,251 @@ def cleanup_temp_media_all(ttl_hours=None):
except Exception as exc:
logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True)
return {'status': 'error', 'error': str(exc)}
# ============================================================================
# ИМПОРТ ТОВАРОВ
# ============================================================================
@shared_task(
bind=True,
name='products.tasks.import_products_async',
max_retries=0, # Не повторяем автоматически
time_limit=3600, # 1 час максимум
)
def import_products_async(self, job_id, file_path, update_existing, schema_name):
"""
Асинхронный импорт товаров из CSV/XLSX.
Алгоритм:
1. Читаем файл и парсим заголовки
2. Для каждой строки:
- Создаём/обновляем товар (без фото)
- Собираем URL фото для параллельной загрузки
- Обновляем прогресс в ProductImportJob
3. Запускаем параллельную загрузку всех фото (group task)
4. Удаляем временный файл
Args:
job_id: ID ProductImportJob
file_path: Путь к загруженному файлу
update_existing: Обновлять ли существующие товары
schema_name: Схема тенанта
Returns:
dict: Результат импорта
"""
from .services.import_export import ProductImporter
from .models import ProductImportJob
from celery import group
import os
try:
# Активируем схему тенанта
connection.set_schema(schema_name)
logger.info(f"[Import] Activated schema: {schema_name} for job {job_id}")
# Загружаем задачу
job = ProductImportJob.objects.get(id=job_id)
job.status = 'processing'
job.save(update_fields=['status'])
# Открываем файл
if not default_storage.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with default_storage.open(file_path, 'rb') as file:
# Создаём importer с callback для прогресса
importer = ProductImporter()
# Callback для обновления прогресса
def progress_callback(current, total, created, updated, skipped, errors):
job.refresh_from_db()
job.processed_rows = current
job.total_rows = total
job.created_count = created
job.updated_count = updated
job.skipped_count = skipped
job.errors_count = len(errors)
job.save(update_fields=[
'processed_rows', 'total_rows', 'created_count',
'updated_count', 'skipped_count', 'errors_count'
])
# Запускаем импорт (без фото, они загрузятся параллельно)
result = importer.import_from_file(
file=file,
update_existing=update_existing,
progress_callback=progress_callback,
skip_images=True, # Не загружаем фото синхронно
schema_name=schema_name # Передаём для задач фото
)
# Обновляем результаты
job.refresh_from_db()
job.status = 'completed'
job.created_count = result['created']
job.updated_count = result['updated']
job.skipped_count = result['skipped']
job.errors_count = result.get('real_error_count', 0)
job.errors_json = result.get('real_errors', [])
job.completed_at = timezone.now()
job.save()
# Удаляем временный файл
try:
if default_storage.exists(file_path):
default_storage.delete(file_path)
logger.info(f"[Import] Deleted temp file: {file_path}")
except Exception as del_exc:
logger.warning(f"[Import] Could not delete temp file: {del_exc}")
# Запускаем параллельную загрузку фото (если есть)
if result.get('photo_tasks'):
photo_tasks = result['photo_tasks']
logger.info(f"[Import] Starting parallel download of {len(photo_tasks)} photos")
# FIX: is_main удалён из сигнатуры download_product_photo_async
# Создаём группу задач для параллельной загрузки
photo_group = group(
download_product_photo_async.s(
product_id=task['product_id'],
image_url=task['url'],
order=task['order'],
schema_name=schema_name
)
for task in photo_tasks
)
# Запускаем группу асинхронно
photo_group.apply_async()
logger.info(f"[Import] Photo download tasks submitted")
logger.info(f"[Import] Job {job_id} completed successfully: "
f"created={result['created']}, updated={result['updated']}, "
f"skipped={result['skipped']}, errors={result.get('real_error_count', 0)}")
return {
'status': 'success',
'job_id': job_id,
'result': result
}
except ProductImportJob.DoesNotExist:
logger.error(f"[Import] Job {job_id} not found in schema {schema_name}")
return {'status': 'error', 'reason': 'job_not_found'}
except Exception as exc:
logger.error(f"[Import] Job {job_id} failed: {exc}", exc_info=True)
# Обновляем статус задачи
try:
connection.set_schema(schema_name)
job = ProductImportJob.objects.get(id=job_id)
job.status = 'failed'
job.error_message = str(exc)
job.completed_at = timezone.now()
job.save()
except Exception as save_exc:
logger.error(f"[Import] Could not update job status: {save_exc}")
# Удаляем временный файл
try:
if default_storage.exists(file_path):
default_storage.delete(file_path)
except Exception:
pass
return {
'status': 'error',
'job_id': job_id,
'error': str(exc)
}
@shared_task(
bind=True,
name='products.tasks.download_product_photo_async',
max_retries=3,
default_retry_delay=10,
)
def download_product_photo_async(self, product_id, image_url, order, schema_name):
"""
Загрузка одного фото товара по URL.
Запускается параллельно для всех фото.
FIX: ProductPhoto не имеет is_main, главное фото определяется по order=0
Args:
product_id: ID товара
image_url: URL изображения
order: Порядок фото (первое фото order=0 считается главным)
schema_name: Схема тенанта
Returns:
dict: Результат загрузки
"""
import requests
from django.core.files.base import ContentFile
from .models import Product, ProductPhoto
import urllib.parse
try:
# Активируем схему
connection.set_schema(schema_name)
# Загружаем товар
product = Product.objects.get(id=product_id)
# Скачиваем изображение
response = requests.get(image_url, timeout=30)
response.raise_for_status()
# Получаем имя файла
parsed_url = urllib.parse.urlparse(image_url)
filename = parsed_url.path.split('/')[-1]
# Создаём ProductPhoto
# FIX: ProductPhoto не имеет поля is_main, используется только order
# Первое фото (order=0) автоматически считается главным
photo = ProductPhoto(
product=product,
order=order # is_main удалён — используется order для определения главного фото
)
# Сохраняем файл
# ВАЖНО: use_async=False чтобы НЕ запускать дополнительную Celery задачу
# Обработка будет выполнена синхронно в текущей задаче
photo.image.save(
filename,
ContentFile(response.content),
save=False # Не сохраняем в БД пока
)
photo.save(use_async=False) # Синхронная обработка через ImageProcessor
logger.info(f"[PhotoDownload] Downloaded photo for product {product_id}: {image_url}")
return {
'status': 'success',
'product_id': product_id,
'photo_id': photo.id,
'url': image_url
}
except Product.DoesNotExist:
logger.error(f"[PhotoDownload] Product {product_id} not found in {schema_name}")
return {'status': 'error', 'reason': 'product_not_found'}
except requests.RequestException as exc:
logger.warning(f"[PhotoDownload] Failed to download {image_url}: {exc}")
# Повторяем при ошибках сети
try:
raise self.retry(exc=exc, countdown=10)
except self.MaxRetriesExceededError:
logger.error(f"[PhotoDownload] Max retries exceeded for {image_url}")
return {'status': 'error', 'reason': 'max_retries', 'url': image_url}
except Exception as exc:
logger.error(f"[PhotoDownload] Unexpected error for {image_url}: {exc}", exc_info=True)
return {'status': 'error', 'reason': 'unexpected', 'url': image_url, 'error': str(exc)}