"""Celery задачи для асинхронной обработки фото товаров. ВАЖНО: django-tenants мультитенантность! Все задачи получают schema_name и активируют нужную схему для изоляции данных. МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ: На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов. Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema(). """ import os import time import logging from celery import shared_task from django.db import connection from django.apps import apps from django.conf import settings from django.core.files.storage import default_storage from django.utils import timezone from tenants.models import RESERVED_SCHEMA_NAMES logger = logging.getLogger(__name__) # Регистрация декодеров HEIF/AVIF для Pillow в Celery worker # Это критично для обработки HEIC/HEIF фото с iPhone try: from pillow_heif import register_heif_opener register_heif_opener() logger.info("[Celery] HEIF/AVIF decoders registered successfully") except ImportError: logger.warning("[Celery] pillow-heif not available - HEIC/HEIF/AVIF formats will not be supported") @shared_task( bind=True, name='products.tasks.process_product_photo_async', max_retries=3, default_retry_delay=60, # Повторить через 60 секунд при ошибке ) def process_product_photo_async(self, photo_id, photo_model_class, schema_name): """ Асинхронная обработка загруженного фото. Args: photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto photo_model_class: Строка с путем к модели ('products.ProductPhoto') schema_name: Имя схемы тенанта для активации правильной БД Returns: dict: Результат обработки с информацией о качестве и путях к файлам """ from .utils.image_processor import ImageProcessor try: # КРИТИЧНО: Активируем схему тенанта # Это гарантирует что мы работаем с данными правильного тенанта connection.set_schema(schema_name) logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}") # Получаем модель по строке пути ('products.ProductPhoto') app_label, model_name = photo_model_class.split('.') PhotoModel = apps.get_model(app_label, model_name) # Загружаем объект фото из БД photo_obj = PhotoModel.objects.get(pk=photo_id) entity = photo_obj.get_entity() logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}") # Проверяем что фото еще не обработано if not photo_obj.image: logger.warning(f"[Celery] Photo {photo_id} has no image file") return {'status': 'error', 'reason': 'no_image'} # Сохраняем путь к временному файлу до перезаписи поля image temp_path = photo_obj.image.name # КРИТИЧНО: Проверяем что файл существует перед обработкой # Это предотвращает бесполезные retry если файл был удален вручную if not default_storage.exists(temp_path): logger.error(f"[Celery] File does not exist: {temp_path}") raise FileNotFoundError(f"File not found: {temp_path}") # Получаем entity type для правильного пути сохранения entity_type = photo_obj.get_entity_type() # ОСНОВНАЯ РАБОТА: Обрабатываем изображение # Это операция занимает время (resize, convert formats, etc) logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}") processed_paths = ImageProcessor.process_image( photo_obj.image, entity_type, entity_id=entity.id, photo_id=photo_obj.id ) # Обновляем объект фото с новыми путями и метаданными качества photo_obj.image = processed_paths['original'] photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable') photo_obj.quality_warning = processed_paths.get('quality_warning', False) photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning']) # Удаляем временный файл из temp после успешной обработки try: if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted temp file: {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}") logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully " f"(quality: {processed_paths.get('quality_level')})") return { 'status': 'success', 'photo_id': photo_id, 'schema_name': schema_name, 'quality_level': processed_paths.get('quality_level'), 'paths': { 'original': processed_paths['original'], 'large': processed_paths.get('large'), 'medium': processed_paths.get('medium'), 'thumbnail': processed_paths.get('thumbnail'), } } except PhotoModel.DoesNotExist as e: logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}") # Retry briefly to allow DB transaction to commit (race condition on first photo) try: raise self.retry(exc=e, countdown=5) except self.MaxRetriesExceededError: # Final failure: attempt to delete the orphan temp file if we recorded it try: from .models.photos import PhotoProcessingStatus status = (PhotoProcessingStatus.objects .filter(photo_id=photo_id, photo_model=photo_model_class) .order_by('-created_at') .first()) temp_path = (status.result_data or {}).get('temp_path') if status else None if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}") return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id} except FileNotFoundError as exc: # Файл не найден - не имеет смысла повторять task # Это обычно означает что файл был удален до обработки logger.error(f"[Celery] File not found for photo {photo_id} in {schema_name}: {str(exc)}") logger.error(f"[Celery] This usually means the file was deleted before processing. Marking as failed.") # Обновляем статус обработки без retry try: from .models.photos import PhotoProcessingStatus status = (PhotoProcessingStatus.objects .filter(photo_id=photo_id, photo_model=photo_model_class) .order_by('-created_at') .first()) if status: status.status = 'failed' status.error_message = f"File not found: {str(exc)}" status.completed_at = timezone.now() status.save() logger.info(f"[Celery] Updated PhotoProcessingStatus to 'failed' for photo {photo_id}") except Exception as status_exc: logger.warning(f"[Celery] Could not update PhotoProcessingStatus: {status_exc}") return { 'status': 'error', 'reason': 'file_not_found', 'photo_id': photo_id, 'error': str(exc) } except Exception as exc: logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}", exc_info=True) # Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом) try: raise self.retry(exc=exc, countdown=60) except self.MaxRetriesExceededError: logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.") # Попытка удалить temp файл при окончательном провале try: from .models.photos import PhotoProcessingStatus status = (PhotoProcessingStatus.objects .filter(photo_id=photo_id, photo_model=photo_model_class) .order_by('-created_at') .first()) temp_path = (status.result_data or {}).get('temp_path') if status else None if temp_path and default_storage.exists(temp_path): default_storage.delete(temp_path) logger.info(f"[Celery] Deleted orphaned temp file (max_retries): {temp_path}") except Exception as del_exc: logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on max_retries: {del_exc}") return { 'status': 'error', 'reason': 'max_retries_exceeded', 'photo_id': photo_id, 'error': str(exc) } @shared_task(name='products.tasks.process_multiple_photos_async') def process_multiple_photos_async(photo_ids, photo_model_class, schema_name): """ Обработка нескольких фото параллельно (chord pattern). Это позволяет обрабатывать несколько фото одновременно если загружено много фото за раз. Args: photo_ids: Список ID фотографий photo_model_class: Путь к модели ('products.ProductPhoto') schema_name: Схема тенанта Returns: dict: Информация о submitted задачах """ from celery import group logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}") # Создаем группу задач для параллельной обработки # Celery автоматически распределит их между доступными workers job = group( process_product_photo_async.s(photo_id, photo_model_class, schema_name) for photo_id in photo_ids ) # Запускаем группу задач асинхронно result = job.apply_async() return { 'status': 'submitted', 'count': len(photo_ids), 'group_id': result.id, 'schema_name': schema_name } @shared_task(name='products.tasks.cleanup_temp_media_for_schema') def cleanup_temp_media_for_schema(schema_name, ttl_hours=None): """ Очистка временных файлов изображений для указанной схемы тенанта. Удаляет файлы старше TTL из папок: products/temp, kits/temp, categories/temp. Args: schema_name: Имя схемы тенанта ttl_hours: Время жизни файла в часах (по умолчанию из settings.TEMP_MEDIA_TTL_HOURS) Returns: dict: Результат очистки с количеством удаленных файлов """ from django.conf import settings try: # Пропускаем зарезервированные схемы (public, admin и т.д.) if schema_name in RESERVED_SCHEMA_NAMES: ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24)) logger.info(f"[Cleanup:{schema_name}] Skipping reserved schema") return { 'status': 'skipped', 'schema_name': schema_name, 'reason': 'reserved_schema', 'deleted': 0, 'scanned': 0, 'ttl_hours': ttl } # Активируем схему тенанта connection.set_schema(schema_name) ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24)) cutoff_seconds = ttl * 3600 now = time.time() temp_dirs = ['products/temp', 'kits/temp', 'categories/temp'] deleted_count = 0 scanned_count = 0 for rel_dir in temp_dirs: try: # Получаем полный путь с учётом tenant_id try: full_dir = default_storage.path(rel_dir) except RuntimeError as storage_error: # Если не удается определить tenant_id (например, для public схемы) if 'Cannot determine tenant ID' in str(storage_error): logger.warning(f"[Cleanup:{schema_name}] Skipping {rel_dir}: {storage_error}") continue raise # Перебрасываем другие ошибки if not os.path.isdir(full_dir): continue for filename in os.listdir(full_dir): scanned_count += 1 full_path = os.path.join(full_dir, filename) # Пропускаем поддиректории, работаем только с файлами if not os.path.isfile(full_path): continue # Проверяем возраст файла age_seconds = now - os.path.getmtime(full_path) if age_seconds >= cutoff_seconds: # Формируем относительный путь для storage.delete storage_rel_path = os.path.join(rel_dir, filename).replace('\\', '/') try: if default_storage.exists(storage_rel_path): default_storage.delete(storage_rel_path) deleted_count += 1 logger.info(f"[Cleanup:{schema_name}] Deleted: {storage_rel_path} (age: {age_seconds/3600:.1f}h)") except Exception as del_exc: logger.warning(f"[Cleanup:{schema_name}] Could not delete {storage_rel_path}: {del_exc}") except Exception as dir_exc: logger.warning(f"[Cleanup:{schema_name}] Error scanning {rel_dir}: {dir_exc}") logger.info(f"[Cleanup:{schema_name}] Complete. scanned={scanned_count}, deleted={deleted_count}, ttl_hours={ttl}") return { 'status': 'success', 'schema_name': schema_name, 'deleted': deleted_count, 'scanned': scanned_count, 'ttl_hours': ttl } except Exception as exc: logger.error(f"[Cleanup:{schema_name}] Failed: {exc}", exc_info=True) return {'status': 'error', 'schema_name': schema_name, 'error': str(exc)} @shared_task(name='products.tasks.cleanup_temp_media_all') def cleanup_temp_media_all(ttl_hours=None): """ Мастер-задача: перечисляет всех тенантов и запускает очистку temp для каждого. Запускается периодически через Celery Beat. Args: ttl_hours: Время жизни файла в часах (передается в подзадачи) Returns: dict: Информация о количестве тенантов и запущенных задачах """ from django.conf import settings try: # Работаем из public для списка тенантов connection.set_schema('public') from tenants.models import Client # Фильтруем зарезервированные схемы, чтобы не запускать задачи для них schemas = [s for s in Client.objects.values_list('schema_name', flat=True) if s not in RESERVED_SCHEMA_NAMES] ttl = ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24) logger.info(f"[CleanupAll] Scheduling cleanup for {len(schemas)} tenants (TTL: {ttl}h)") for schema in schemas: cleanup_temp_media_for_schema.delay(schema, ttl) return { 'status': 'submitted', 'tenants_count': len(schemas), 'ttl_hours': ttl } except Exception as exc: logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True) return {'status': 'error', 'error': str(exc)} # ============================================================================ # ИМПОРТ ТОВАРОВ # ============================================================================ @shared_task( bind=True, name='products.tasks.import_products_async', max_retries=0, # Не повторяем автоматически time_limit=3600, # 1 час максимум ) def import_products_async(self, job_id, file_path, update_existing, schema_name): """ Асинхронный импорт товаров из CSV/XLSX. Алгоритм: 1. Читаем файл и парсим заголовки 2. Для каждой строки: - Создаём/обновляем товар (без фото) - Собираем URL фото для параллельной загрузки - Обновляем прогресс в ProductImportJob 3. Запускаем параллельную загрузку всех фото (group task) 4. Удаляем временный файл Args: job_id: ID ProductImportJob file_path: Путь к загруженному файлу update_existing: Обновлять ли существующие товары schema_name: Схема тенанта Returns: dict: Результат импорта """ from .services.import_export import ProductImporter from .models import ProductImportJob from celery import group import os try: # Активируем схему тенанта connection.set_schema(schema_name) logger.info(f"[Import] Activated schema: {schema_name} for job {job_id}") # Загружаем задачу job = ProductImportJob.objects.get(id=job_id) job.status = 'processing' job.save(update_fields=['status']) # Открываем файл if not default_storage.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") with default_storage.open(file_path, 'rb') as file: # Создаём importer с callback для прогресса importer = ProductImporter() # Callback для обновления прогресса def progress_callback(current, total, created, updated, skipped, errors): job.refresh_from_db() job.processed_rows = current job.total_rows = total job.created_count = created job.updated_count = updated job.skipped_count = skipped job.errors_count = len(errors) job.save(update_fields=[ 'processed_rows', 'total_rows', 'created_count', 'updated_count', 'skipped_count', 'errors_count' ]) # Запускаем импорт (без фото, они загрузятся параллельно) result = importer.import_from_file( file=file, update_existing=update_existing, progress_callback=progress_callback, skip_images=True, # Не загружаем фото синхронно schema_name=schema_name # Передаём для задач фото ) # Обновляем результаты job.refresh_from_db() job.status = 'completed' job.created_count = result['created'] job.updated_count = result['updated'] job.skipped_count = result['skipped'] job.errors_count = result.get('real_error_count', 0) job.errors_json = result.get('real_errors', []) job.completed_at = timezone.now() job.save() # Удаляем временный файл try: if default_storage.exists(file_path): default_storage.delete(file_path) logger.info(f"[Import] Deleted temp file: {file_path}") except Exception as del_exc: logger.warning(f"[Import] Could not delete temp file: {del_exc}") # Запускаем параллельную загрузку фото (если есть) if result.get('photo_tasks'): photo_tasks = result['photo_tasks'] logger.info(f"[Import] Starting parallel download of {len(photo_tasks)} photos") # FIX: is_main удалён из сигнатуры download_product_photo_async # Создаём группу задач для параллельной загрузки photo_group = group( download_product_photo_async.s( product_id=task['product_id'], image_url=task['url'], order=task['order'], schema_name=schema_name ) for task in photo_tasks ) # Запускаем группу асинхронно photo_group.apply_async() logger.info(f"[Import] Photo download tasks submitted") logger.info(f"[Import] Job {job_id} completed successfully: " f"created={result['created']}, updated={result['updated']}, " f"skipped={result['skipped']}, errors={result.get('real_error_count', 0)}") return { 'status': 'success', 'job_id': job_id, 'result': result } except ProductImportJob.DoesNotExist: logger.error(f"[Import] Job {job_id} not found in schema {schema_name}") return {'status': 'error', 'reason': 'job_not_found'} except Exception as exc: logger.error(f"[Import] Job {job_id} failed: {exc}", exc_info=True) # Обновляем статус задачи try: connection.set_schema(schema_name) job = ProductImportJob.objects.get(id=job_id) job.status = 'failed' job.error_message = str(exc) job.completed_at = timezone.now() job.save() except Exception as save_exc: logger.error(f"[Import] Could not update job status: {save_exc}") # Удаляем временный файл try: if default_storage.exists(file_path): default_storage.delete(file_path) except Exception: pass return { 'status': 'error', 'job_id': job_id, 'error': str(exc) } @shared_task( bind=True, name='products.tasks.download_product_photo_async', max_retries=3, default_retry_delay=10, ) def download_product_photo_async(self, product_id, image_url, order, schema_name): """ Загрузка одного фото товара по URL. Запускается параллельно для всех фото. FIX: ProductPhoto не имеет is_main, главное фото определяется по order=0 Args: product_id: ID товара image_url: URL изображения order: Порядок фото (первое фото order=0 считается главным) schema_name: Схема тенанта Returns: dict: Результат загрузки """ import requests from django.core.files.base import ContentFile from .models import Product, ProductPhoto import urllib.parse try: # Активируем схему connection.set_schema(schema_name) # Загружаем товар product = Product.objects.get(id=product_id) # Скачиваем изображение response = requests.get(image_url, timeout=30) response.raise_for_status() # Получаем имя файла parsed_url = urllib.parse.urlparse(image_url) filename = parsed_url.path.split('/')[-1] # Создаём ProductPhoto # FIX: ProductPhoto не имеет поля is_main, используется только order # Первое фото (order=0) автоматически считается главным photo = ProductPhoto( product=product, order=order # is_main удалён — используется order для определения главного фото ) # Сохраняем файл # ВАЖНО: use_async=False чтобы НЕ запускать дополнительную Celery задачу # Обработка будет выполнена синхронно в текущей задаче photo.image.save( filename, ContentFile(response.content), save=False # Не сохраняем в БД пока ) photo.save(use_async=False) # Синхронная обработка через ImageProcessor logger.info(f"[PhotoDownload] Downloaded photo for product {product_id}: {image_url}") return { 'status': 'success', 'product_id': product_id, 'photo_id': photo.id, 'url': image_url } except Product.DoesNotExist: logger.error(f"[PhotoDownload] Product {product_id} not found in {schema_name}") return {'status': 'error', 'reason': 'product_not_found'} except requests.RequestException as exc: logger.warning(f"[PhotoDownload] Failed to download {image_url}: {exc}") # Повторяем при ошибках сети try: raise self.retry(exc=exc, countdown=10) except self.MaxRetriesExceededError: logger.error(f"[PhotoDownload] Max retries exceeded for {image_url}") return {'status': 'error', 'reason': 'max_retries', 'url': image_url} except Exception as exc: logger.error(f"[PhotoDownload] Unexpected error for {image_url}: {exc}", exc_info=True) return {'status': 'error', 'reason': 'unexpected', 'url': image_url, 'error': str(exc)}