Files
octopus/myproject/products/tasks.py
Andrey Smakotin 6669d47cdf feat(orders): add recipient management and enhance order forms
- Introduced Recipient model to manage order recipients separately from customers.
- Updated Order model to link to Recipient, replacing recipient_name and recipient_phone fields.
- Enhanced OrderForm to include recipient selection modes: customer, history, and new.
- Added AJAX endpoint to fetch recipient history for customers.
- Updated admin interface to manage recipients and display recipient information in order details.
- Refactored address handling to accommodate new recipient logic.
- Improved demo order creation to include random recipients.
2025-12-23 00:08:41 +03:00

374 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Celery задачи для асинхронной обработки фото товаров.
ВАЖНО: django-tenants мультитенантность!
Все задачи получают schema_name и активируют нужную схему для изоляции данных.
МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ:
На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов.
Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema().
"""
import os
import time
import logging
from celery import shared_task
from django.db import connection
from django.apps import apps
from django.conf import settings
from django.core.files.storage import default_storage
from django.utils import timezone
from tenants.models import RESERVED_SCHEMA_NAMES
logger = logging.getLogger(__name__)
# Регистрация декодеров HEIF/AVIF для Pillow в Celery worker
# Это критично для обработки HEIC/HEIF фото с iPhone
try:
from pillow_heif import register_heif_opener
register_heif_opener()
logger.info("[Celery] HEIF/AVIF decoders registered successfully")
except ImportError:
logger.warning("[Celery] pillow-heif not available - HEIC/HEIF/AVIF formats will not be supported")
@shared_task(
bind=True,
name='products.tasks.process_product_photo_async',
max_retries=3,
default_retry_delay=60, # Повторить через 60 секунд при ошибке
)
def process_product_photo_async(self, photo_id, photo_model_class, schema_name):
"""
Асинхронная обработка загруженного фото.
Args:
photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto
photo_model_class: Строка с путем к модели ('products.ProductPhoto')
schema_name: Имя схемы тенанта для активации правильной БД
Returns:
dict: Результат обработки с информацией о качестве и путях к файлам
"""
from .utils.image_processor import ImageProcessor
try:
# КРИТИЧНО: Активируем схему тенанта
# Это гарантирует что мы работаем с данными правильного тенанта
connection.set_schema(schema_name)
logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}")
# Получаем модель по строке пути ('products.ProductPhoto')
app_label, model_name = photo_model_class.split('.')
PhotoModel = apps.get_model(app_label, model_name)
# Загружаем объект фото из БД
photo_obj = PhotoModel.objects.get(pk=photo_id)
entity = photo_obj.get_entity()
logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}")
# Проверяем что фото еще не обработано
if not photo_obj.image:
logger.warning(f"[Celery] Photo {photo_id} has no image file")
return {'status': 'error', 'reason': 'no_image'}
# Сохраняем путь к временному файлу до перезаписи поля image
temp_path = photo_obj.image.name
# КРИТИЧНО: Проверяем что файл существует перед обработкой
# Это предотвращает бесполезные retry если файл был удален вручную
if not default_storage.exists(temp_path):
logger.error(f"[Celery] File does not exist: {temp_path}")
raise FileNotFoundError(f"File not found: {temp_path}")
# Получаем entity type для правильного пути сохранения
entity_type = photo_obj.get_entity_type()
# ОСНОВНАЯ РАБОТА: Обрабатываем изображение
# Это операция занимает время (resize, convert formats, etc)
logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}")
processed_paths = ImageProcessor.process_image(
photo_obj.image,
entity_type,
entity_id=entity.id,
photo_id=photo_obj.id
)
# Обновляем объект фото с новыми путями и метаданными качества
photo_obj.image = processed_paths['original']
photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable')
photo_obj.quality_warning = processed_paths.get('quality_warning', False)
photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning'])
# Удаляем временный файл из temp после успешной обработки
try:
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted temp file: {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}")
logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully "
f"(quality: {processed_paths.get('quality_level')})")
return {
'status': 'success',
'photo_id': photo_id,
'schema_name': schema_name,
'quality_level': processed_paths.get('quality_level'),
'paths': {
'original': processed_paths['original'],
'large': processed_paths.get('large'),
'medium': processed_paths.get('medium'),
'thumbnail': processed_paths.get('thumbnail'),
}
}
except PhotoModel.DoesNotExist as e:
logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}")
# Retry briefly to allow DB transaction to commit (race condition on first photo)
try:
raise self.retry(exc=e, countdown=5)
except self.MaxRetriesExceededError:
# Final failure: attempt to delete the orphan temp file if we recorded it
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
temp_path = (status.result_data or {}).get('temp_path') if status else None
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}")
return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id}
except FileNotFoundError as exc:
# Файл не найден - не имеет смысла повторять task
# Это обычно означает что файл был удален до обработки
logger.error(f"[Celery] File not found for photo {photo_id} in {schema_name}: {str(exc)}")
logger.error(f"[Celery] This usually means the file was deleted before processing. Marking as failed.")
# Обновляем статус обработки без retry
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
if status:
status.status = 'failed'
status.error_message = f"File not found: {str(exc)}"
status.completed_at = timezone.now()
status.save()
logger.info(f"[Celery] Updated PhotoProcessingStatus to 'failed' for photo {photo_id}")
except Exception as status_exc:
logger.warning(f"[Celery] Could not update PhotoProcessingStatus: {status_exc}")
return {
'status': 'error',
'reason': 'file_not_found',
'photo_id': photo_id,
'error': str(exc)
}
except Exception as exc:
logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}",
exc_info=True)
# Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом)
try:
raise self.retry(exc=exc, countdown=60)
except self.MaxRetriesExceededError:
logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.")
# Попытка удалить temp файл при окончательном провале
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
temp_path = (status.result_data or {}).get('temp_path') if status else None
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted orphaned temp file (max_retries): {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on max_retries: {del_exc}")
return {
'status': 'error',
'reason': 'max_retries_exceeded',
'photo_id': photo_id,
'error': str(exc)
}
@shared_task(name='products.tasks.process_multiple_photos_async')
def process_multiple_photos_async(photo_ids, photo_model_class, schema_name):
"""
Обработка нескольких фото параллельно (chord pattern).
Это позволяет обрабатывать несколько фото одновременно
если загружено много фото за раз.
Args:
photo_ids: Список ID фотографий
photo_model_class: Путь к модели ('products.ProductPhoto')
schema_name: Схема тенанта
Returns:
dict: Информация о submitted задачах
"""
from celery import group
logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}")
# Создаем группу задач для параллельной обработки
# Celery автоматически распределит их между доступными workers
job = group(
process_product_photo_async.s(photo_id, photo_model_class, schema_name)
for photo_id in photo_ids
)
# Запускаем группу задач асинхронно
result = job.apply_async()
return {
'status': 'submitted',
'count': len(photo_ids),
'group_id': result.id,
'schema_name': schema_name
}
@shared_task(name='products.tasks.cleanup_temp_media_for_schema')
def cleanup_temp_media_for_schema(schema_name, ttl_hours=None):
"""
Очистка временных файлов изображений для указанной схемы тенанта.
Удаляет файлы старше TTL из папок: products/temp, kits/temp, categories/temp.
Args:
schema_name: Имя схемы тенанта
ttl_hours: Время жизни файла в часах (по умолчанию из settings.TEMP_MEDIA_TTL_HOURS)
Returns:
dict: Результат очистки с количеством удаленных файлов
"""
from django.conf import settings
try:
# Пропускаем зарезервированные схемы (public, admin и т.д.)
if schema_name in RESERVED_SCHEMA_NAMES:
ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24))
logger.info(f"[Cleanup:{schema_name}] Skipping reserved schema")
return {
'status': 'skipped',
'schema_name': schema_name,
'reason': 'reserved_schema',
'deleted': 0,
'scanned': 0,
'ttl_hours': ttl
}
# Активируем схему тенанта
connection.set_schema(schema_name)
ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24))
cutoff_seconds = ttl * 3600
now = time.time()
temp_dirs = ['products/temp', 'kits/temp', 'categories/temp']
deleted_count = 0
scanned_count = 0
for rel_dir in temp_dirs:
try:
# Получаем полный путь с учётом tenant_id
try:
full_dir = default_storage.path(rel_dir)
except RuntimeError as storage_error:
# Если не удается определить tenant_id (например, для public схемы)
if 'Cannot determine tenant ID' in str(storage_error):
logger.warning(f"[Cleanup:{schema_name}] Skipping {rel_dir}: {storage_error}")
continue
raise # Перебрасываем другие ошибки
if not os.path.isdir(full_dir):
continue
for filename in os.listdir(full_dir):
scanned_count += 1
full_path = os.path.join(full_dir, filename)
# Пропускаем поддиректории, работаем только с файлами
if not os.path.isfile(full_path):
continue
# Проверяем возраст файла
age_seconds = now - os.path.getmtime(full_path)
if age_seconds >= cutoff_seconds:
# Формируем относительный путь для storage.delete
storage_rel_path = os.path.join(rel_dir, filename).replace('\\', '/')
try:
if default_storage.exists(storage_rel_path):
default_storage.delete(storage_rel_path)
deleted_count += 1
logger.info(f"[Cleanup:{schema_name}] Deleted: {storage_rel_path} (age: {age_seconds/3600:.1f}h)")
except Exception as del_exc:
logger.warning(f"[Cleanup:{schema_name}] Could not delete {storage_rel_path}: {del_exc}")
except Exception as dir_exc:
logger.warning(f"[Cleanup:{schema_name}] Error scanning {rel_dir}: {dir_exc}")
logger.info(f"[Cleanup:{schema_name}] Complete. scanned={scanned_count}, deleted={deleted_count}, ttl_hours={ttl}")
return {
'status': 'success',
'schema_name': schema_name,
'deleted': deleted_count,
'scanned': scanned_count,
'ttl_hours': ttl
}
except Exception as exc:
logger.error(f"[Cleanup:{schema_name}] Failed: {exc}", exc_info=True)
return {'status': 'error', 'schema_name': schema_name, 'error': str(exc)}
@shared_task(name='products.tasks.cleanup_temp_media_all')
def cleanup_temp_media_all(ttl_hours=None):
"""
Мастер-задача: перечисляет всех тенантов и запускает очистку temp для каждого.
Запускается периодически через Celery Beat.
Args:
ttl_hours: Время жизни файла в часах (передается в подзадачи)
Returns:
dict: Информация о количестве тенантов и запущенных задачах
"""
from django.conf import settings
try:
# Работаем из public для списка тенантов
connection.set_schema('public')
from tenants.models import Client
# Фильтруем зарезервированные схемы, чтобы не запускать задачи для них
schemas = [s for s in Client.objects.values_list('schema_name', flat=True)
if s not in RESERVED_SCHEMA_NAMES]
ttl = ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24)
logger.info(f"[CleanupAll] Scheduling cleanup for {len(schemas)} tenants (TTL: {ttl}h)")
for schema in schemas:
cleanup_temp_media_for_schema.delay(schema, ttl)
return {
'status': 'submitted',
'tenants_count': len(schemas),
'ttl_hours': ttl
}
except Exception as exc:
logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True)
return {'status': 'error', 'error': str(exc)}