"""Celery задачи для асинхронной обработки фото товаров. ВАЖНО: django-tenants мультитенантность! Все задачи получают schema_name и активируют нужную схему для изоляции данных. МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ: На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов. Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema(). """ import os import time import logging from celery import shared_task from django.db import connection from django.apps import apps from django.conf import settings from django.core.files.storage import default_storage logger = logging.getLogger(__name__) # Регистрация декодеров HEIF/AVIF для Pillow в Celery worker # Это критично для обработки HEIC/HEIF фото с iPhone try: from pillow_heif import register_heif_opener register_heif_opener() logger.info("[Celery] HEIF/AVIF decoders registered successfully") except ImportError: logger.warning("[Celery] pillow-heif not available - HEIC/HEIF/AVIF formats will not be supported") @shared_task( bind=True, name='products.tasks.process_product_photo_async', max_retries=3, default_retry_delay=60, # Повторить через 60 секунд при ошибке ) def process_product_photo_async(self, photo_id, photo_model_class, schema_name): """ Асинхронная обработка загруженного фото. Args: photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto photo_model_class: Строка с путем к модели ('products.ProductPhoto') schema_name: Имя схемы тенанта для активации правильной БД Returns: dict: Результат обработки с информацией о качестве и путях к файлам """ from .utils.image_processor import ImageProcessor try: # КРИТИЧНО: Активируем схему тенанта # Это гарантирует что мы работаем с данными правильного тенанта connection.set_schema(schema_name) logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}") # Получаем модель по строке пути ('products.ProductPhoto') app_label, model_name = photo_model_class.split('.') PhotoModel = apps.get_model(app_label, model_name) # Загружаем объект фото из БД photo_obj = PhotoModel.objects.get(pk=photo_id) entity = photo_obj.get_entity() logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}") # Проверяем что фото еще не обработано if not photo_obj.image: logger.warning(f"[Celery] Photo {photo_id} has no image file") return {'status': 'error', 'reason': 'no_image'} # Сохраняем путь к временному файлу до перезаписи поля image temp_path = photo_obj.image.name # Получаем entity type для правильного пути сохранения entity_type = photo_obj.get_entity_type() # ОСНОВНАЯ РАБОТА: Обрабатываем изображение # Это операция занимает время (resize, convert formats, etc) logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}") processed_paths = ImageProcessor.process_image( photo_obj.image, entity_type, entity_id=entity.id, photo_id=photo_obj.id ) # Обновляем объект фото с новыми путями и метаданными качества photo_obj.image = processed_paths['original'] photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable') photo_obj.quality_warning = processed_paths.get('quality_warning', False) photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning']) # Удаляем временный файл из temp после успешной обработки try: if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted temp file: {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}") logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully " f"(quality: {processed_paths.get('quality_level')})") return { 'status': 'success', 'photo_id': photo_id, 'schema_name': schema_name, 'quality_level': processed_paths.get('quality_level'), 'paths': { 'original': processed_paths['original'], 'large': processed_paths.get('large'), 'medium': processed_paths.get('medium'), 'thumbnail': processed_paths.get('thumbnail'), } } except PhotoModel.DoesNotExist as e: logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}") # Retry briefly to allow DB transaction to commit (race condition on first photo) try: raise self.retry(exc=e, countdown=5) except self.MaxRetriesExceededError: # Final failure: attempt to delete the orphan temp file if we recorded it try: from .models.photos import PhotoProcessingStatus status = (PhotoProcessingStatus.objects .filter(photo_id=photo_id, photo_model=photo_model_class) .order_by('-created_at') .first()) temp_path = (status.result_data or {}).get('temp_path') if status else None if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}") return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id} except Exception as exc: logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}", exc_info=True) # Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом) try: raise self.retry(exc=exc, countdown=60) except self.MaxRetriesExceededError: logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.") # Попытка удалить temp файл при окончательном провале try: from .models.photos import PhotoProcessingStatus status = (PhotoProcessingStatus.objects .filter(photo_id=photo_id, photo_model=photo_model_class) .order_by('-created_at') .first()) temp_path = (status.result_data or {}).get('temp_path') if status else None if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted orphaned temp file (max_retries): {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on max_retries: {del_exc}") return { 'status': 'error', 'reason': 'max_retries_exceeded', 'photo_id': photo_id, 'error': str(exc) } @shared_task(name='products.tasks.process_multiple_photos_async') def process_multiple_photos_async(photo_ids, photo_model_class, schema_name): """ Обработка нескольких фото параллельно (chord pattern). Это позволяет обрабатывать несколько фото одновременно если загружено много фото за раз. Args: photo_ids: Список ID фотографий photo_model_class: Путь к модели ('products.ProductPhoto') schema_name: Схема тенанта Returns: dict: Информация о submitted задачах """ from celery import group logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}") # Создаем группу задач для параллельной обработки # Celery автоматически распределит их между доступными workers job = group( process_product_photo_async.s(photo_id, photo_model_class, schema_name) for photo_id in photo_ids ) # Запускаем группу задач асинхронно result = job.apply_async() return { 'status': 'submitted', 'count': len(photo_ids), 'group_id': result.id, 'schema_name': schema_name } @shared_task(name='products.tasks.cleanup_temp_media_for_schema') def cleanup_temp_media_for_schema(schema_name, ttl_hours=None): """ Очистка временных файлов изображений для указанной схемы тенанта. Удаляет файлы старше TTL из папок: products/temp, kits/temp, categories/temp. Args: schema_name: Имя схемы тенанта ttl_hours: Время жизни файла в часах (по умолчанию из settings.TEMP_MEDIA_TTL_HOURS) Returns: dict: Результат очистки с количеством удаленных файлов """ from django.conf import settings try: # Активируем схему тенанта connection.set_schema(schema_name) ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24)) cutoff_seconds = ttl * 3600 now = time.time() temp_dirs = ['products/temp', 'kits/temp', 'categories/temp'] deleted_count = 0 scanned_count = 0 for rel_dir in temp_dirs: try: # Получаем полный путь с учётом tenant_id full_dir = default_storage.path(rel_dir) if not os.path.isdir(full_dir): continue for filename in os.listdir(full_dir): scanned_count += 1 full_path = os.path.join(full_dir, filename) # Пропускаем поддиректории, работаем только с файлами if not os.path.isfile(full_path): continue # Проверяем возраст файла age_seconds = now - os.path.getmtime(full_path) if age_seconds >= cutoff_seconds: # Формируем относительный путь для storage.delete storage_rel_path = os.path.join(rel_dir, filename).replace('\\', '/') try: if default_storage.exists(storage_rel_path): default_storage.delete(storage_rel_path) deleted_count += 1 logger.info(f"[Cleanup:{schema_name}] Deleted: {storage_rel_path} (age: {age_seconds/3600:.1f}h)") except Exception as del_exc: logger.warning(f"[Cleanup:{schema_name}] Could not delete {storage_rel_path}: {del_exc}") except Exception as dir_exc: logger.warning(f"[Cleanup:{schema_name}] Error scanning {rel_dir}: {dir_exc}") logger.info(f"[Cleanup:{schema_name}] Complete. scanned={scanned_count}, deleted={deleted_count}, ttl_hours={ttl}") return { 'status': 'success', 'schema_name': schema_name, 'deleted': deleted_count, 'scanned': scanned_count, 'ttl_hours': ttl } except Exception as exc: logger.error(f"[Cleanup:{schema_name}] Failed: {exc}", exc_info=True) return {'status': 'error', 'schema_name': schema_name, 'error': str(exc)} @shared_task(name='products.tasks.cleanup_temp_media_all') def cleanup_temp_media_all(ttl_hours=None): """ Мастер-задача: перечисляет всех тенантов и запускает очистку temp для каждого. Запускается периодически через Celery Beat. Args: ttl_hours: Время жизни файла в часах (передается в подзадачи) Returns: dict: Информация о количестве тенантов и запущенных задачах """ from django.conf import settings try: # Работаем из public для списка тенантов connection.set_schema('public') from tenants.models import Client schemas = list(Client.objects.values_list('schema_name', flat=True)) ttl = ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24) logger.info(f"[CleanupAll] Scheduling cleanup for {len(schemas)} tenants (TTL: {ttl}h)") for schema in schemas: cleanup_temp_media_for_schema.delay(schema, ttl) return { 'status': 'submitted', 'tenants_count': len(schemas), 'ttl_hours': ttl } except Exception as exc: logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True) return {'status': 'error', 'error': str(exc)}