- Реализован импорт Product из CSV/XLSX через Celery с прогресс-баром - Параллельная загрузка фото товаров с внешних URL (масштабируемость до 500+ товаров) - Добавлена модель ProductImportJob для отслеживания статуса импорта - Создан таск download_product_photo_async для загрузки фото в фоне - Интеграция с существующим ImageProcessor (синхронная обработка через use_async=False) - Добавлены view и template для импорта с real-time обновлением через AJAX FIX: Исправлен баг со счётчиком SKU - инкремент только после успешного сохранения - Добавлен SKUCounter.peek_next_value() - возвращает следующий номер БЕЗ инкремента - Добавлен SKUCounter.increment_counter() - инкрементирует счётчик - generate_product_sku() использует peek_next_value() вместо get_next_value() - Добавлен post_save сигнал increment_sku_counter_after_save() для инкремента после создания - Предотвращает пропуски номеров при ошибках валидации (например cost_price NULL) FIX: Исправлена ошибка с is_main в ProductPhoto - ProductPhoto не имеет поля is_main, используется только order - Первое фото (order=0) автоматически считается главным - Удалён параметр is_main из download_product_photo_async и _collect_photo_tasks Изменены файлы: - products/models/base.py - методы для управления счётчиком SKU - products/models/import_job.py - модель для отслеживания импорта - products/services/import_export.py - сервис импорта с поддержкой Celery - products/tasks.py - таски для асинхронного импорта и загрузки фото - products/signals.py - сигнал для инкремента счётчика после сохранения - products/utils/sku_generator.py - использование peek_next_value() - products/views/product_import_views.py - view для импорта - products/templates/products/product_import*.html - UI для импорта - docker/entrypoint.sh - настройка Celery worker (concurrency=4) - requirements.txt - добавлен requests для загрузки фото
622 lines
28 KiB
Python
622 lines
28 KiB
Python
"""Celery задачи для асинхронной обработки фото товаров.
|
||
|
||
ВАЖНО: django-tenants мультитенантность!
|
||
Все задачи получают schema_name и активируют нужную схему для изоляции данных.
|
||
|
||
МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ:
|
||
На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
|
||
TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов.
|
||
Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema().
|
||
"""
|
||
import os
|
||
import time
|
||
import logging
|
||
from celery import shared_task
|
||
from django.db import connection
|
||
from django.apps import apps
|
||
from django.conf import settings
|
||
from django.core.files.storage import default_storage
|
||
from django.utils import timezone
|
||
from tenants.models import RESERVED_SCHEMA_NAMES
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Регистрация декодеров HEIF/AVIF для Pillow в Celery worker
|
||
# Это критично для обработки HEIC/HEIF фото с iPhone
|
||
try:
|
||
from pillow_heif import register_heif_opener
|
||
register_heif_opener()
|
||
logger.info("[Celery] HEIF/AVIF decoders registered successfully")
|
||
except ImportError:
|
||
logger.warning("[Celery] pillow-heif not available - HEIC/HEIF/AVIF formats will not be supported")
|
||
|
||
|
||
@shared_task(
|
||
bind=True,
|
||
name='products.tasks.process_product_photo_async',
|
||
max_retries=3,
|
||
default_retry_delay=60, # Повторить через 60 секунд при ошибке
|
||
)
|
||
def process_product_photo_async(self, photo_id, photo_model_class, schema_name):
|
||
"""
|
||
Асинхронная обработка загруженного фото.
|
||
|
||
Args:
|
||
photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto
|
||
photo_model_class: Строка с путем к модели ('products.ProductPhoto')
|
||
schema_name: Имя схемы тенанта для активации правильной БД
|
||
|
||
Returns:
|
||
dict: Результат обработки с информацией о качестве и путях к файлам
|
||
"""
|
||
from .utils.image_processor import ImageProcessor
|
||
|
||
try:
|
||
# КРИТИЧНО: Активируем схему тенанта
|
||
# Это гарантирует что мы работаем с данными правильного тенанта
|
||
connection.set_schema(schema_name)
|
||
logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}")
|
||
|
||
# Получаем модель по строке пути ('products.ProductPhoto')
|
||
app_label, model_name = photo_model_class.split('.')
|
||
PhotoModel = apps.get_model(app_label, model_name)
|
||
|
||
# Загружаем объект фото из БД
|
||
photo_obj = PhotoModel.objects.get(pk=photo_id)
|
||
entity = photo_obj.get_entity()
|
||
|
||
logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}")
|
||
|
||
# Проверяем что фото еще не обработано
|
||
if not photo_obj.image:
|
||
logger.warning(f"[Celery] Photo {photo_id} has no image file")
|
||
return {'status': 'error', 'reason': 'no_image'}
|
||
|
||
# Сохраняем путь к временному файлу до перезаписи поля image
|
||
temp_path = photo_obj.image.name
|
||
|
||
# КРИТИЧНО: Проверяем что файл существует перед обработкой
|
||
# Это предотвращает бесполезные retry если файл был удален вручную
|
||
if not default_storage.exists(temp_path):
|
||
logger.error(f"[Celery] File does not exist: {temp_path}")
|
||
raise FileNotFoundError(f"File not found: {temp_path}")
|
||
|
||
# Получаем entity type для правильного пути сохранения
|
||
entity_type = photo_obj.get_entity_type()
|
||
|
||
# ОСНОВНАЯ РАБОТА: Обрабатываем изображение
|
||
# Это операция занимает время (resize, convert formats, etc)
|
||
logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}")
|
||
|
||
processed_paths = ImageProcessor.process_image(
|
||
photo_obj.image,
|
||
entity_type,
|
||
entity_id=entity.id,
|
||
photo_id=photo_obj.id
|
||
)
|
||
|
||
# Обновляем объект фото с новыми путями и метаданными качества
|
||
photo_obj.image = processed_paths['original']
|
||
photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable')
|
||
photo_obj.quality_warning = processed_paths.get('quality_warning', False)
|
||
photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning'])
|
||
|
||
# Удаляем временный файл из temp после успешной обработки
|
||
try:
|
||
if temp_path and default_storage.exists(temp_path):
|
||
default_storage.delete(temp_path)
|
||
logger.info(f"[Celery] Deleted temp file: {temp_path}")
|
||
except Exception as del_exc:
|
||
logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}")
|
||
|
||
logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully "
|
||
f"(quality: {processed_paths.get('quality_level')})")
|
||
|
||
return {
|
||
'status': 'success',
|
||
'photo_id': photo_id,
|
||
'schema_name': schema_name,
|
||
'quality_level': processed_paths.get('quality_level'),
|
||
'paths': {
|
||
'original': processed_paths['original'],
|
||
'large': processed_paths.get('large'),
|
||
'medium': processed_paths.get('medium'),
|
||
'thumbnail': processed_paths.get('thumbnail'),
|
||
}
|
||
}
|
||
|
||
except PhotoModel.DoesNotExist as e:
|
||
logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}")
|
||
# Retry briefly to allow DB transaction to commit (race condition on first photo)
|
||
try:
|
||
raise self.retry(exc=e, countdown=5)
|
||
except self.MaxRetriesExceededError:
|
||
# Final failure: attempt to delete the orphan temp file if we recorded it
|
||
try:
|
||
from .models.photos import PhotoProcessingStatus
|
||
status = (PhotoProcessingStatus.objects
|
||
.filter(photo_id=photo_id, photo_model=photo_model_class)
|
||
.order_by('-created_at')
|
||
.first())
|
||
temp_path = (status.result_data or {}).get('temp_path') if status else None
|
||
if temp_path and default_storage.exists(temp_path):
|
||
default_storage.delete(temp_path)
|
||
logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}")
|
||
except Exception as del_exc:
|
||
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}")
|
||
return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id}
|
||
|
||
except FileNotFoundError as exc:
|
||
# Файл не найден - не имеет смысла повторять task
|
||
# Это обычно означает что файл был удален до обработки
|
||
logger.error(f"[Celery] File not found for photo {photo_id} in {schema_name}: {str(exc)}")
|
||
logger.error(f"[Celery] This usually means the file was deleted before processing. Marking as failed.")
|
||
|
||
# Обновляем статус обработки без retry
|
||
try:
|
||
from .models.photos import PhotoProcessingStatus
|
||
status = (PhotoProcessingStatus.objects
|
||
.filter(photo_id=photo_id, photo_model=photo_model_class)
|
||
.order_by('-created_at')
|
||
.first())
|
||
if status:
|
||
status.status = 'failed'
|
||
status.error_message = f"File not found: {str(exc)}"
|
||
status.completed_at = timezone.now()
|
||
status.save()
|
||
logger.info(f"[Celery] Updated PhotoProcessingStatus to 'failed' for photo {photo_id}")
|
||
except Exception as status_exc:
|
||
logger.warning(f"[Celery] Could not update PhotoProcessingStatus: {status_exc}")
|
||
|
||
return {
|
||
'status': 'error',
|
||
'reason': 'file_not_found',
|
||
'photo_id': photo_id,
|
||
'error': str(exc)
|
||
}
|
||
|
||
except Exception as exc:
|
||
logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}",
|
||
exc_info=True)
|
||
|
||
# Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом)
|
||
try:
|
||
raise self.retry(exc=exc, countdown=60)
|
||
except self.MaxRetriesExceededError:
|
||
logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.")
|
||
# Попытка удалить temp файл при окончательном провале
|
||
try:
|
||
from .models.photos import PhotoProcessingStatus
|
||
status = (PhotoProcessingStatus.objects
|
||
.filter(photo_id=photo_id, photo_model=photo_model_class)
|
||
.order_by('-created_at')
|
||
.first())
|
||
temp_path = (status.result_data or {}).get('temp_path') if status else None
|
||
if temp_path and default_storage.exists(temp_path):
|
||
default_storage.delete(temp_path)
|
||
logger.info(f"[Celery] Deleted orphaned temp file (max_retries): {temp_path}")
|
||
except Exception as del_exc:
|
||
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on max_retries: {del_exc}")
|
||
return {
|
||
'status': 'error',
|
||
'reason': 'max_retries_exceeded',
|
||
'photo_id': photo_id,
|
||
'error': str(exc)
|
||
}
|
||
|
||
|
||
@shared_task(name='products.tasks.process_multiple_photos_async')
|
||
def process_multiple_photos_async(photo_ids, photo_model_class, schema_name):
|
||
"""
|
||
Обработка нескольких фото параллельно (chord pattern).
|
||
|
||
Это позволяет обрабатывать несколько фото одновременно
|
||
если загружено много фото за раз.
|
||
|
||
Args:
|
||
photo_ids: Список ID фотографий
|
||
photo_model_class: Путь к модели ('products.ProductPhoto')
|
||
schema_name: Схема тенанта
|
||
|
||
Returns:
|
||
dict: Информация о submitted задачах
|
||
"""
|
||
from celery import group
|
||
|
||
logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}")
|
||
|
||
# Создаем группу задач для параллельной обработки
|
||
# Celery автоматически распределит их между доступными workers
|
||
job = group(
|
||
process_product_photo_async.s(photo_id, photo_model_class, schema_name)
|
||
for photo_id in photo_ids
|
||
)
|
||
|
||
# Запускаем группу задач асинхронно
|
||
result = job.apply_async()
|
||
|
||
return {
|
||
'status': 'submitted',
|
||
'count': len(photo_ids),
|
||
'group_id': result.id,
|
||
'schema_name': schema_name
|
||
}
|
||
|
||
|
||
@shared_task(name='products.tasks.cleanup_temp_media_for_schema')
|
||
def cleanup_temp_media_for_schema(schema_name, ttl_hours=None):
|
||
"""
|
||
Очистка временных файлов изображений для указанной схемы тенанта.
|
||
Удаляет файлы старше TTL из папок: products/temp, kits/temp, categories/temp.
|
||
|
||
Args:
|
||
schema_name: Имя схемы тенанта
|
||
ttl_hours: Время жизни файла в часах (по умолчанию из settings.TEMP_MEDIA_TTL_HOURS)
|
||
|
||
Returns:
|
||
dict: Результат очистки с количеством удаленных файлов
|
||
"""
|
||
from django.conf import settings
|
||
|
||
try:
|
||
# Пропускаем зарезервированные схемы (public, admin и т.д.)
|
||
if schema_name in RESERVED_SCHEMA_NAMES:
|
||
ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24))
|
||
logger.info(f"[Cleanup:{schema_name}] Skipping reserved schema")
|
||
return {
|
||
'status': 'skipped',
|
||
'schema_name': schema_name,
|
||
'reason': 'reserved_schema',
|
||
'deleted': 0,
|
||
'scanned': 0,
|
||
'ttl_hours': ttl
|
||
}
|
||
|
||
# Активируем схему тенанта
|
||
connection.set_schema(schema_name)
|
||
|
||
ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24))
|
||
cutoff_seconds = ttl * 3600
|
||
now = time.time()
|
||
|
||
temp_dirs = ['products/temp', 'kits/temp', 'categories/temp']
|
||
deleted_count = 0
|
||
scanned_count = 0
|
||
|
||
for rel_dir in temp_dirs:
|
||
try:
|
||
# Получаем полный путь с учётом tenant_id
|
||
try:
|
||
full_dir = default_storage.path(rel_dir)
|
||
except RuntimeError as storage_error:
|
||
# Если не удается определить tenant_id (например, для public схемы)
|
||
if 'Cannot determine tenant ID' in str(storage_error):
|
||
logger.warning(f"[Cleanup:{schema_name}] Skipping {rel_dir}: {storage_error}")
|
||
continue
|
||
raise # Перебрасываем другие ошибки
|
||
|
||
if not os.path.isdir(full_dir):
|
||
continue
|
||
|
||
for filename in os.listdir(full_dir):
|
||
scanned_count += 1
|
||
full_path = os.path.join(full_dir, filename)
|
||
|
||
# Пропускаем поддиректории, работаем только с файлами
|
||
if not os.path.isfile(full_path):
|
||
continue
|
||
|
||
# Проверяем возраст файла
|
||
age_seconds = now - os.path.getmtime(full_path)
|
||
if age_seconds >= cutoff_seconds:
|
||
# Формируем относительный путь для storage.delete
|
||
storage_rel_path = os.path.join(rel_dir, filename).replace('\\', '/')
|
||
try:
|
||
if default_storage.exists(storage_rel_path):
|
||
default_storage.delete(storage_rel_path)
|
||
deleted_count += 1
|
||
logger.info(f"[Cleanup:{schema_name}] Deleted: {storage_rel_path} (age: {age_seconds/3600:.1f}h)")
|
||
except Exception as del_exc:
|
||
logger.warning(f"[Cleanup:{schema_name}] Could not delete {storage_rel_path}: {del_exc}")
|
||
except Exception as dir_exc:
|
||
logger.warning(f"[Cleanup:{schema_name}] Error scanning {rel_dir}: {dir_exc}")
|
||
|
||
logger.info(f"[Cleanup:{schema_name}] Complete. scanned={scanned_count}, deleted={deleted_count}, ttl_hours={ttl}")
|
||
return {
|
||
'status': 'success',
|
||
'schema_name': schema_name,
|
||
'deleted': deleted_count,
|
||
'scanned': scanned_count,
|
||
'ttl_hours': ttl
|
||
}
|
||
except Exception as exc:
|
||
logger.error(f"[Cleanup:{schema_name}] Failed: {exc}", exc_info=True)
|
||
return {'status': 'error', 'schema_name': schema_name, 'error': str(exc)}
|
||
|
||
|
||
@shared_task(name='products.tasks.cleanup_temp_media_all')
|
||
def cleanup_temp_media_all(ttl_hours=None):
|
||
"""
|
||
Мастер-задача: перечисляет всех тенантов и запускает очистку temp для каждого.
|
||
Запускается периодически через Celery Beat.
|
||
|
||
Args:
|
||
ttl_hours: Время жизни файла в часах (передается в подзадачи)
|
||
|
||
Returns:
|
||
dict: Информация о количестве тенантов и запущенных задачах
|
||
"""
|
||
from django.conf import settings
|
||
|
||
try:
|
||
# Работаем из public для списка тенантов
|
||
connection.set_schema('public')
|
||
from tenants.models import Client
|
||
|
||
# Фильтруем зарезервированные схемы, чтобы не запускать задачи для них
|
||
schemas = [s for s in Client.objects.values_list('schema_name', flat=True)
|
||
if s not in RESERVED_SCHEMA_NAMES]
|
||
ttl = ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24)
|
||
|
||
logger.info(f"[CleanupAll] Scheduling cleanup for {len(schemas)} tenants (TTL: {ttl}h)")
|
||
|
||
for schema in schemas:
|
||
cleanup_temp_media_for_schema.delay(schema, ttl)
|
||
|
||
return {
|
||
'status': 'submitted',
|
||
'tenants_count': len(schemas),
|
||
'ttl_hours': ttl
|
||
}
|
||
except Exception as exc:
|
||
logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True)
|
||
return {'status': 'error', 'error': str(exc)}
|
||
|
||
|
||
# ============================================================================
|
||
# ИМПОРТ ТОВАРОВ
|
||
# ============================================================================
|
||
|
||
@shared_task(
|
||
bind=True,
|
||
name='products.tasks.import_products_async',
|
||
max_retries=0, # Не повторяем автоматически
|
||
time_limit=3600, # 1 час максимум
|
||
)
|
||
def import_products_async(self, job_id, file_path, update_existing, schema_name):
|
||
"""
|
||
Асинхронный импорт товаров из CSV/XLSX.
|
||
|
||
Алгоритм:
|
||
1. Читаем файл и парсим заголовки
|
||
2. Для каждой строки:
|
||
- Создаём/обновляем товар (без фото)
|
||
- Собираем URL фото для параллельной загрузки
|
||
- Обновляем прогресс в ProductImportJob
|
||
3. Запускаем параллельную загрузку всех фото (group task)
|
||
4. Удаляем временный файл
|
||
|
||
Args:
|
||
job_id: ID ProductImportJob
|
||
file_path: Путь к загруженному файлу
|
||
update_existing: Обновлять ли существующие товары
|
||
schema_name: Схема тенанта
|
||
|
||
Returns:
|
||
dict: Результат импорта
|
||
"""
|
||
from .services.import_export import ProductImporter
|
||
from .models import ProductImportJob
|
||
from celery import group
|
||
import os
|
||
|
||
try:
|
||
# Активируем схему тенанта
|
||
connection.set_schema(schema_name)
|
||
logger.info(f"[Import] Activated schema: {schema_name} for job {job_id}")
|
||
|
||
# Загружаем задачу
|
||
job = ProductImportJob.objects.get(id=job_id)
|
||
job.status = 'processing'
|
||
job.save(update_fields=['status'])
|
||
|
||
# Открываем файл
|
||
if not default_storage.exists(file_path):
|
||
raise FileNotFoundError(f"File not found: {file_path}")
|
||
|
||
with default_storage.open(file_path, 'rb') as file:
|
||
# Создаём importer с callback для прогресса
|
||
importer = ProductImporter()
|
||
|
||
# Callback для обновления прогресса
|
||
def progress_callback(current, total, created, updated, skipped, errors):
|
||
job.refresh_from_db()
|
||
job.processed_rows = current
|
||
job.total_rows = total
|
||
job.created_count = created
|
||
job.updated_count = updated
|
||
job.skipped_count = skipped
|
||
job.errors_count = len(errors)
|
||
job.save(update_fields=[
|
||
'processed_rows', 'total_rows', 'created_count',
|
||
'updated_count', 'skipped_count', 'errors_count'
|
||
])
|
||
|
||
# Запускаем импорт (без фото, они загрузятся параллельно)
|
||
result = importer.import_from_file(
|
||
file=file,
|
||
update_existing=update_existing,
|
||
progress_callback=progress_callback,
|
||
skip_images=True, # Не загружаем фото синхронно
|
||
schema_name=schema_name # Передаём для задач фото
|
||
)
|
||
|
||
# Обновляем результаты
|
||
job.refresh_from_db()
|
||
job.status = 'completed'
|
||
job.created_count = result['created']
|
||
job.updated_count = result['updated']
|
||
job.skipped_count = result['skipped']
|
||
job.errors_count = result.get('real_error_count', 0)
|
||
job.errors_json = result.get('real_errors', [])
|
||
job.completed_at = timezone.now()
|
||
job.save()
|
||
|
||
# Удаляем временный файл
|
||
try:
|
||
if default_storage.exists(file_path):
|
||
default_storage.delete(file_path)
|
||
logger.info(f"[Import] Deleted temp file: {file_path}")
|
||
except Exception as del_exc:
|
||
logger.warning(f"[Import] Could not delete temp file: {del_exc}")
|
||
|
||
# Запускаем параллельную загрузку фото (если есть)
|
||
if result.get('photo_tasks'):
|
||
photo_tasks = result['photo_tasks']
|
||
logger.info(f"[Import] Starting parallel download of {len(photo_tasks)} photos")
|
||
|
||
# FIX: is_main удалён из сигнатуры download_product_photo_async
|
||
# Создаём группу задач для параллельной загрузки
|
||
photo_group = group(
|
||
download_product_photo_async.s(
|
||
product_id=task['product_id'],
|
||
image_url=task['url'],
|
||
order=task['order'],
|
||
schema_name=schema_name
|
||
)
|
||
for task in photo_tasks
|
||
)
|
||
|
||
# Запускаем группу асинхронно
|
||
photo_group.apply_async()
|
||
logger.info(f"[Import] Photo download tasks submitted")
|
||
|
||
logger.info(f"[Import] Job {job_id} completed successfully: "
|
||
f"created={result['created']}, updated={result['updated']}, "
|
||
f"skipped={result['skipped']}, errors={result.get('real_error_count', 0)}")
|
||
|
||
return {
|
||
'status': 'success',
|
||
'job_id': job_id,
|
||
'result': result
|
||
}
|
||
|
||
except ProductImportJob.DoesNotExist:
|
||
logger.error(f"[Import] Job {job_id} not found in schema {schema_name}")
|
||
return {'status': 'error', 'reason': 'job_not_found'}
|
||
|
||
except Exception as exc:
|
||
logger.error(f"[Import] Job {job_id} failed: {exc}", exc_info=True)
|
||
|
||
# Обновляем статус задачи
|
||
try:
|
||
connection.set_schema(schema_name)
|
||
job = ProductImportJob.objects.get(id=job_id)
|
||
job.status = 'failed'
|
||
job.error_message = str(exc)
|
||
job.completed_at = timezone.now()
|
||
job.save()
|
||
except Exception as save_exc:
|
||
logger.error(f"[Import] Could not update job status: {save_exc}")
|
||
|
||
# Удаляем временный файл
|
||
try:
|
||
if default_storage.exists(file_path):
|
||
default_storage.delete(file_path)
|
||
except Exception:
|
||
pass
|
||
|
||
return {
|
||
'status': 'error',
|
||
'job_id': job_id,
|
||
'error': str(exc)
|
||
}
|
||
|
||
|
||
@shared_task(
|
||
bind=True,
|
||
name='products.tasks.download_product_photo_async',
|
||
max_retries=3,
|
||
default_retry_delay=10,
|
||
)
|
||
def download_product_photo_async(self, product_id, image_url, order, schema_name):
|
||
"""
|
||
Загрузка одного фото товара по URL.
|
||
Запускается параллельно для всех фото.
|
||
|
||
FIX: ProductPhoto не имеет is_main, главное фото определяется по order=0
|
||
|
||
Args:
|
||
product_id: ID товара
|
||
image_url: URL изображения
|
||
order: Порядок фото (первое фото order=0 считается главным)
|
||
schema_name: Схема тенанта
|
||
|
||
Returns:
|
||
dict: Результат загрузки
|
||
"""
|
||
import requests
|
||
from django.core.files.base import ContentFile
|
||
from .models import Product, ProductPhoto
|
||
import urllib.parse
|
||
|
||
try:
|
||
# Активируем схему
|
||
connection.set_schema(schema_name)
|
||
|
||
# Загружаем товар
|
||
product = Product.objects.get(id=product_id)
|
||
|
||
# Скачиваем изображение
|
||
response = requests.get(image_url, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
# Получаем имя файла
|
||
parsed_url = urllib.parse.urlparse(image_url)
|
||
filename = parsed_url.path.split('/')[-1]
|
||
|
||
# Создаём ProductPhoto
|
||
# FIX: ProductPhoto не имеет поля is_main, используется только order
|
||
# Первое фото (order=0) автоматически считается главным
|
||
photo = ProductPhoto(
|
||
product=product,
|
||
order=order # is_main удалён — используется order для определения главного фото
|
||
)
|
||
|
||
# Сохраняем файл
|
||
# ВАЖНО: use_async=False чтобы НЕ запускать дополнительную Celery задачу
|
||
# Обработка будет выполнена синхронно в текущей задаче
|
||
photo.image.save(
|
||
filename,
|
||
ContentFile(response.content),
|
||
save=False # Не сохраняем в БД пока
|
||
)
|
||
photo.save(use_async=False) # Синхронная обработка через ImageProcessor
|
||
|
||
logger.info(f"[PhotoDownload] Downloaded photo for product {product_id}: {image_url}")
|
||
|
||
return {
|
||
'status': 'success',
|
||
'product_id': product_id,
|
||
'photo_id': photo.id,
|
||
'url': image_url
|
||
}
|
||
|
||
except Product.DoesNotExist:
|
||
logger.error(f"[PhotoDownload] Product {product_id} not found in {schema_name}")
|
||
return {'status': 'error', 'reason': 'product_not_found'}
|
||
|
||
except requests.RequestException as exc:
|
||
logger.warning(f"[PhotoDownload] Failed to download {image_url}: {exc}")
|
||
|
||
# Повторяем при ошибках сети
|
||
try:
|
||
raise self.retry(exc=exc, countdown=10)
|
||
except self.MaxRetriesExceededError:
|
||
logger.error(f"[PhotoDownload] Max retries exceeded for {image_url}")
|
||
return {'status': 'error', 'reason': 'max_retries', 'url': image_url}
|
||
|
||
except Exception as exc:
|
||
logger.error(f"[PhotoDownload] Unexpected error for {image_url}: {exc}", exc_info=True)
|
||
return {'status': 'error', 'reason': 'unexpected', 'url': image_url, 'error': str(exc)}
|