""" Celery задачи для асинхронной обработки фото товаров. ВАЖНО: django-tenants мультитенантность! Все задачи получают schema_name и активируют нужную схему для изоляции данных. МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ: На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов. Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema(). """ import logging from celery import shared_task from django.db import connection from django.apps import apps from django.core.files.storage import default_storage logger = logging.getLogger(__name__) @shared_task( bind=True, name='products.tasks.process_product_photo_async', max_retries=3, default_retry_delay=60, # Повторить через 60 секунд при ошибке ) def process_product_photo_async(self, photo_id, photo_model_class, schema_name): """ Асинхронная обработка загруженного фото. Args: photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto photo_model_class: Строка с путем к модели ('products.ProductPhoto') schema_name: Имя схемы тенанта для активации правильной БД Returns: dict: Результат обработки с информацией о качестве и путях к файлам """ from .utils.image_processor import ImageProcessor try: # КРИТИЧНО: Активируем схему тенанта # Это гарантирует что мы работаем с данными правильного тенанта connection.set_schema(schema_name) logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}") # Получаем модель по строке пути ('products.ProductPhoto') app_label, model_name = photo_model_class.split('.') PhotoModel = apps.get_model(app_label, model_name) # Загружаем объект фото из БД photo_obj = PhotoModel.objects.get(pk=photo_id) entity = photo_obj.get_entity() logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}") # Проверяем что фото еще не обработано if not photo_obj.image: logger.warning(f"[Celery] Photo {photo_id} has no image file") return {'status': 'error', 'reason': 'no_image'} # Сохраняем путь к временному файлу до перезаписи поля image temp_path = photo_obj.image.name # Получаем entity type для правильного пути сохранения entity_type = photo_obj.get_entity_type() # ОСНОВНАЯ РАБОТА: Обрабатываем изображение # Это операция занимает время (resize, convert formats, etc) logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}") processed_paths = ImageProcessor.process_image( photo_obj.image, entity_type, entity_id=entity.id, photo_id=photo_obj.id ) # Обновляем объект фото с новыми путями и метаданными качества photo_obj.image = processed_paths['original'] photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable') photo_obj.quality_warning = processed_paths.get('quality_warning', False) photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning']) # Удаляем временный файл из temp после успешной обработки try: if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted temp file: {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}") logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully " f"(quality: {processed_paths.get('quality_level')})") return { 'status': 'success', 'photo_id': photo_id, 'schema_name': schema_name, 'quality_level': processed_paths.get('quality_level'), 'paths': { 'original': processed_paths['original'], 'large': processed_paths.get('large'), 'medium': processed_paths.get('medium'), 'thumbnail': processed_paths.get('thumbnail'), } } except PhotoModel.DoesNotExist as e: logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}") # Retry briefly to allow DB transaction to commit (race condition on first photo) try: raise self.retry(exc=e, countdown=5) except self.MaxRetriesExceededError: # Final failure: attempt to delete the orphan temp file if we recorded it try: from .models.photos import PhotoProcessingStatus status = (PhotoProcessingStatus.objects .filter(photo_id=photo_id, photo_model=photo_model_class) .order_by('-created_at') .first()) temp_path = (status.result_data or {}).get('temp_path') if status else None if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}") return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id} except Exception as exc: logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}", exc_info=True) # Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом) try: raise self.retry(exc=exc, countdown=60) except self.MaxRetriesExceededError: logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.") return { 'status': 'error', 'reason': 'max_retries_exceeded', 'photo_id': photo_id, 'error': str(exc) } @shared_task(name='products.tasks.process_multiple_photos_async') def process_multiple_photos_async(photo_ids, photo_model_class, schema_name): """ Обработка нескольких фото параллельно (chord pattern). Это позволяет обрабатывать несколько фото одновременно если загружено много фото за раз. Args: photo_ids: Список ID фотографий photo_model_class: Путь к модели ('products.ProductPhoto') schema_name: Схема тенанта Returns: dict: Информация о submitted задачах """ from celery import group logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}") # Создаем группу задач для параллельной обработки # Celery автоматически распределит их между доступными workers job = group( process_product_photo_async.s(photo_id, photo_model_class, schema_name) for photo_id in photo_ids ) # Запускаем группу задач асинхронно result = job.apply_async() return { 'status': 'submitted', 'count': len(photo_ids), 'group_id': result.id, 'schema_name': schema_name }