fix: Сохранять файл фото ДО запуска Celery task

При асинхронной обработке фото нужно сначала сохранить файл в БД,
потом запустить Celery task. Иначе task не найдет файл.

Изменения:
- BasePhoto.save() теперь сохраняет файл перед запуском task
- Исправлена проблема 'Photo has no image file' в Celery worker

🤖 Generated with Claude Code
This commit is contained in:
2025-11-15 11:11:08 +03:00
parent a03f4c3047
commit 0791ebb13b
11 changed files with 968 additions and 67 deletions

View File

@@ -0,0 +1,35 @@
# Generated by Django 5.0.10 on 2025-11-15 07:53
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('products', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='PhotoProcessingStatus',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('photo_id', models.IntegerField(help_text='ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto', verbose_name='ID фото')),
('photo_model', models.CharField(help_text='Полный путь модели (e.g., products.ProductPhoto)', max_length=100, verbose_name='Модель фото')),
('status', models.CharField(choices=[('pending', 'В очереди'), ('processing', 'Обрабатывается'), ('completed', 'Завершено'), ('failed', 'Ошибка')], db_index=True, default='pending', max_length=20, verbose_name='Статус обработки')),
('task_id', models.CharField(blank=True, db_index=True, help_text='Уникальный ID задачи для отслеживания', max_length=255, verbose_name='ID задачи Celery')),
('error_message', models.TextField(blank=True, help_text='Детальное описание ошибки при обработке', verbose_name='Сообщение об ошибке')),
('result_data', models.JSONField(blank=True, default=dict, help_text='JSON с информацией о качестве, путях и метаданных', verbose_name='Результаты обработки')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='Дата создания')),
('updated_at', models.DateTimeField(auto_now=True, verbose_name='Дата обновления')),
('started_at', models.DateTimeField(blank=True, null=True, verbose_name='Время начала обработки')),
('completed_at', models.DateTimeField(blank=True, null=True, verbose_name='Время завершения обработки')),
],
options={
'verbose_name': 'Статус обработки фото',
'verbose_name_plural': 'Статусы обработки фото',
'ordering': ['-created_at'],
'indexes': [models.Index(fields=['photo_id', 'photo_model'], name='products_ph_photo_i_e42a67_idx'), models.Index(fields=['task_id'], name='products_ph_task_id_748118_idx'), models.Index(fields=['status'], name='products_ph_status_1182b4_idx'), models.Index(fields=['status', 'created_at'], name='products_ph_status_41d415_idx')],
},
),
]

View File

@@ -35,7 +35,7 @@ from .products import Product
from .kits import ProductKit, KitItem, KitItemPriority
# Фотографии
from .photos import BasePhoto, ProductPhoto, ProductKitPhoto, ProductCategoryPhoto
from .photos import BasePhoto, ProductPhoto, ProductKitPhoto, ProductCategoryPhoto, PhotoProcessingStatus
# Явно указываем, что экспортируется при импорте *
__all__ = [
@@ -69,4 +69,5 @@ __all__ = [
'ProductPhoto',
'ProductKitPhoto',
'ProductCategoryPhoto',
'PhotoProcessingStatus',
]

View File

@@ -47,78 +47,99 @@ class BasePhoto(models.Model):
def save(self, *args, **kwargs):
"""
При загрузке нового изображения обрабатывает его и создает все необходимые размеры.
Автоматически определяет и сохраняет уровень качества (quality_level и quality_warning).
При загрузке нового изображения запускает асинхронную обработку через Celery.
ВАЖНО: Асинхронная обработка!
1. Сохраняем объект БЕЗ обработки изображения (быстро)
2. Запускаем Celery task для обработки (в фоне)
3. Пользователь видит "Обрабатывается..." + прогресс-бар
4. Когда обработка завершится, фото обновляется
Преимущества:
- HTTP request не блокируется (не зависает UI)
- Другие тенанты работают нормально
- Можно обрабатывать много фото параллельно
"""
import logging
from django.db import connection
from ..utils.image_processor import ImageProcessor
logger = logging.getLogger(__name__)
is_new = not self.pk
use_async = kwargs.pop('use_async', True) # Можно отключить для тестов/админки
# Если это новый объект с изображением
if is_new and self.image:
temp_image = self.image
# КРИТИЧНО: Сохраняем объект С ФАЙЛОМ сначала!
# (потом Celery сможет прочитать файл)
super().save(*args, **kwargs)
if use_async:
# АСИНХРОННАЯ ОБРАБОТКА через Celery
try:
from ..tasks import process_product_photo_async
# Получаем текущую схему тенанта (для мультитенантности)
schema_name = connection.schema_name
logger.info(f"[BasePhoto.save] Photo {self.pk} submitted to Celery "
f"(schema: {schema_name})")
# Формируем полный путь к модели
photo_model_class = f"{self._meta.app_label}.{self.__class__.__name__}"
# Запускаем асинхронную задачу
task_result = process_product_photo_async.delay(
self.pk,
photo_model_class,
schema_name
)
logger.info(f"[BasePhoto.save] Task ID: {task_result.id}")
# Создаем запись о статусе обработки для фронтенда
PhotoProcessingStatus.objects.create(
photo_id=self.pk,
photo_model=photo_model_class,
status='pending',
task_id=task_result.id
)
except ImportError:
logger.error("Celery task import failed, falling back to sync processing")
# Fallback на синхронную обработку если Celery недоступен
self._process_image_sync(temp_image, use_sync=True)
else:
# СИНХРОННАЯ ОБРАБОТКА (для совместимости и тестов)
self._process_image_sync(temp_image)
else:
# Обновление существующего объекта (без изменения изображения)
super().save(*args, **kwargs)
def _process_image_sync(self, temp_image, use_sync=False):
"""
Синхронная обработка изображения (fallback метод).
Используется только если Celery недоступен.
"""
from ..utils.image_processor import ImageProcessor
is_new = not self.pk
entity = self.get_entity()
entity_type = self.get_entity_type()
# Если это новый объект с изображением, нужно сначала сохранить без изображения, чтобы получить ID
if is_new and self.image:
# Сохраняем объект без изображения, чтобы получить ID
temp_image = self.image
self.image = None
super().save(*args, **kwargs)
processed_paths = ImageProcessor.process_image(
temp_image,
entity_type,
entity_id=entity.id,
photo_id=self.id
)
# Теперь обрабатываем изображение с известными ID
entity = self.get_entity()
entity_type = self.get_entity_type()
processed_paths = ImageProcessor.process_image(
temp_image,
entity_type,
entity_id=entity.id,
photo_id=self.id
)
self.image = processed_paths['original']
self.image = processed_paths['original']
self.quality_level = processed_paths.get('quality_level', 'acceptable')
self.quality_warning = processed_paths.get('quality_warning', False)
# Сохраняем уровень качества
self.quality_level = processed_paths.get('quality_level', 'acceptable')
self.quality_warning = processed_paths.get('quality_warning', False)
# Обновляем поля image, quality_level и quality_warning
super().save(update_fields=['image', 'quality_level', 'quality_warning'])
else:
# Проверяем старый путь для удаления, если это обновление
old_image_path = None
if self.pk:
try:
old_obj = self.__class__.objects.get(pk=self.pk)
if old_obj.image and old_obj.image != self.image:
old_image_path = old_obj.image.name
except self.__class__.DoesNotExist:
pass
# Проверяем, нужно ли обрабатывать изображение
if self.image and old_image_path:
# Обновление существующего изображения
entity = self.get_entity()
entity_type = self.get_entity_type()
processed_paths = ImageProcessor.process_image(
self.image,
entity_type,
entity_id=entity.id,
photo_id=self.id
)
self.image = processed_paths['original']
# Обновляем уровень качества
self.quality_level = processed_paths.get('quality_level', 'acceptable')
self.quality_warning = processed_paths.get('quality_warning', False)
# Удаляем старые версии
ImageProcessor.delete_all_versions(
entity_type,
old_image_path,
entity_id=entity.id,
photo_id=self.id
)
# Обновляем поля image, quality_level и quality_warning
super().save(update_fields=['image', 'quality_level', 'quality_warning'])
else:
# Просто сохраняем без обработки изображения
super().save(*args, **kwargs)
super().save(update_fields=['image', 'quality_level', 'quality_warning'])
def delete(self, *args, **kwargs):
"""Удаляет все версии изображения при удалении фото"""
@@ -351,3 +372,101 @@ class ProductCategoryPhoto(BasePhoto):
def get_entity_type(self):
"""Возвращает тип сущности для путей"""
return 'categories'
class PhotoProcessingStatus(models.Model):
"""
Модель для отслеживания статуса асинхронной обработки фото через Celery.
Используется для показа прогресса пользователю во время загрузки.
Каждая загрузка фото создает запись с информацией о статусе обработки.
Фронтенд опрашивает этот статус через API.
"""
STATUS_CHOICES = [
('pending', 'В очереди'),
('processing', 'Обрабатывается'),
('completed', 'Завершено'),
('failed', 'Ошибка'),
]
photo_id = models.IntegerField(
verbose_name="ID фото",
help_text='ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto'
)
photo_model = models.CharField(
max_length=100,
verbose_name="Модель фото",
help_text='Полный путь модели (e.g., products.ProductPhoto)'
)
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default='pending',
db_index=True,
verbose_name="Статус обработки"
)
task_id = models.CharField(
max_length=255,
blank=True,
verbose_name="ID задачи Celery",
help_text='Уникальный ID задачи для отслеживания',
db_index=True
)
error_message = models.TextField(
blank=True,
verbose_name="Сообщение об ошибке",
help_text='Детальное описание ошибки при обработке'
)
result_data = models.JSONField(
default=dict,
blank=True,
verbose_name="Результаты обработки",
help_text='JSON с информацией о качестве, путях и метаданных'
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name="Дата создания"
)
updated_at = models.DateTimeField(
auto_now=True,
verbose_name="Дата обновления"
)
started_at = models.DateTimeField(
null=True,
blank=True,
verbose_name="Время начала обработки"
)
completed_at = models.DateTimeField(
null=True,
blank=True,
verbose_name="Время завершения обработки"
)
class Meta:
verbose_name = "Статус обработки фото"
verbose_name_plural = "Статусы обработки фото"
ordering = ['-created_at']
indexes = [
models.Index(fields=['photo_id', 'photo_model']),
models.Index(fields=['task_id']),
models.Index(fields=['status']),
models.Index(fields=['status', 'created_at']),
]
def __str__(self):
return f"{self.photo_model}#{self.photo_id} - {self.get_status_display()}"
@property
def is_processing(self):
"""Проверяет находится ли фото в обработке"""
return self.status in ['pending', 'processing']
@property
def is_completed(self):
"""Проверяет завершена ли обработка успешно"""
return self.status == 'completed'
@property
def is_failed(self):
"""Проверяет произошла ли ошибка"""
return self.status == 'failed'

148
myproject/products/tasks.py Normal file
View File

@@ -0,0 +1,148 @@
"""
Celery задачи для асинхронной обработки фото товаров.
ВАЖНО: django-tenants мультитенантность!
Все задачи получают schema_name и активируют нужную схему для изоляции данных.
"""
import logging
from celery import shared_task
from django.db import connection
from django.apps import apps
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
name='products.tasks.process_product_photo_async',
max_retries=3,
default_retry_delay=60, # Повторить через 60 секунд при ошибке
)
def process_product_photo_async(self, photo_id, photo_model_class, schema_name):
"""
Асинхронная обработка загруженного фото.
Args:
photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto
photo_model_class: Строка с путем к модели ('products.ProductPhoto')
schema_name: Имя схемы тенанта для активации правильной БД
Returns:
dict: Результат обработки с информацией о качестве и путях к файлам
"""
from .utils.image_processor import ImageProcessor
try:
# КРИТИЧНО: Активируем схему тенанта
# Это гарантирует что мы работаем с данными правильного тенанта
connection.set_schema(schema_name)
logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}")
# Получаем модель по строке пути ('products.ProductPhoto')
app_label, model_name = photo_model_class.split('.')
PhotoModel = apps.get_model(app_label, model_name)
# Загружаем объект фото из БД
photo_obj = PhotoModel.objects.get(pk=photo_id)
entity = photo_obj.get_entity()
logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}")
# Проверяем что фото еще не обработано
if not photo_obj.image:
logger.warning(f"[Celery] Photo {photo_id} has no image file")
return {'status': 'error', 'reason': 'no_image'}
# Получаем entity type для правильного пути сохранения
entity_type = photo_obj.get_entity_type()
# ОСНОВНАЯ РАБОТА: Обрабатываем изображение
# Это операция занимает время (resize, convert formats, etc)
logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}")
processed_paths = ImageProcessor.process_image(
photo_obj.image,
entity_type,
entity_id=entity.id,
photo_id=photo_obj.id
)
# Обновляем объект фото с новыми путями и метаданными качества
photo_obj.image = processed_paths['original']
photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable')
photo_obj.quality_warning = processed_paths.get('quality_warning', False)
photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning'])
logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully "
f"(quality: {processed_paths.get('quality_level')})")
return {
'status': 'success',
'photo_id': photo_id,
'schema_name': schema_name,
'quality_level': processed_paths.get('quality_level'),
'paths': {
'original': processed_paths['original'],
'large': processed_paths.get('large'),
'medium': processed_paths.get('medium'),
'thumbnail': processed_paths.get('thumbnail'),
}
}
except PhotoModel.DoesNotExist:
logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}")
return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id}
except Exception as exc:
logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}",
exc_info=True)
# Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом)
try:
raise self.retry(exc=exc, countdown=60)
except self.MaxRetriesExceededError:
logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.")
return {
'status': 'error',
'reason': 'max_retries_exceeded',
'photo_id': photo_id,
'error': str(exc)
}
@shared_task(name='products.tasks.process_multiple_photos_async')
def process_multiple_photos_async(photo_ids, photo_model_class, schema_name):
"""
Обработка нескольких фото параллельно (chord pattern).
Это позволяет обрабатывать несколько фото одновременно
если загружено много фото за раз.
Args:
photo_ids: Список ID фотографий
photo_model_class: Путь к модели ('products.ProductPhoto')
schema_name: Схема тенанта
Returns:
dict: Информация о submitted задачах
"""
from celery import group
logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}")
# Создаем группу задач для параллельной обработки
# Celery автоматически распределит их между доступными workers
job = group(
process_product_photo_async.s(photo_id, photo_model_class, schema_name)
for photo_id in photo_ids
)
# Запускаем группу задач асинхронно
result = job.apply_async()
return {
'status': 'submitted',
'count': len(photo_ids),
'group_id': result.id,
'schema_name': schema_name
}

View File

@@ -1,6 +1,7 @@
from django.urls import path
from . import views
from .views import api_views
from .views import photo_status_api
app_name = 'products'
@@ -41,6 +42,10 @@ urlpatterns = [
path('api/tags/create/', api_views.create_tag_api, name='api-tag-create'),
path('api/tags/<int:pk>/toggle/', api_views.toggle_tag_status_api, name='api-tag-toggle'),
# Photo processing status API (for AJAX polling)
path('api/photos/status/<str:task_id>/', photo_status_api.photo_processing_status, name='api-photo-status'),
path('api/photos/batch-status/', photo_status_api.batch_photo_status, name='api-batch-photo-status'),
# CRUD URLs for ProductVariantGroup (Варианты товаров)
path('variant-groups/', views.ProductVariantGroupListView.as_view(), name='variantgroup-list'),
path('variant-groups/create/', views.ProductVariantGroupCreateView.as_view(), name='variantgroup-create'),

View File

@@ -0,0 +1,182 @@
"""
API endpoints для отслеживания статуса асинхронной обработки фото.
Фронтенд опрашивает эти endpoints через AJAX для получения информации о прогрессе.
"""
import logging
from django.http import JsonResponse
from django.views.decorators.http import require_GET
from django.views.decorators.csrf import csrf_exempt
from celery.result import AsyncResult
from products.models import PhotoProcessingStatus
logger = logging.getLogger(__name__)
@csrf_exempt
@require_GET
def photo_processing_status(request, task_id):
"""
Получить статус обработки фото по task_id Celery.
URL: /api/photos/status/<task_id>/
Returns:
{
'status': 'PENDING' | 'STARTED' | 'SUCCESS' | 'FAILURE',
'task_id': str,
'progress': 0-100,
'message': str,
'result': {...} # если завершено успешно
}
Пример JavaScript вызова:
fetch('/api/photos/status/abc123/')
.then(r => r.json())
.then(data => {
if (data.status === 'SUCCESS') {
// Фото обработано успешно
location.reload();
}
})
"""
try:
# Получаем результат задачи Celery по task_id
result = AsyncResult(task_id)
response_data = {
'status': result.state,
'task_id': task_id,
'message': 'Неизвестный статус',
}
if result.state == 'PENDING':
# Задача еще не запущена (в очереди)
response_data['progress'] = 0
response_data['message'] = 'В очереди на обработку...'
elif result.state == 'STARTED':
# Задача выполняется в данный момент
response_data['progress'] = 50
response_data['message'] = 'Обрабатывается изображение...'
elif result.state == 'SUCCESS':
# Задача завершена успешно
response_data['progress'] = 100
response_data['message'] = 'Готово'
response_data['result'] = result.result
logger.info(f"[PhotoStatusAPI] Photo processing completed (task_id: {task_id})")
elif result.state == 'FAILURE':
# Произошла ошибка при обработке
response_data['progress'] = 0
response_data['message'] = 'Ошибка при обработке'
response_data['error'] = str(result.info)
logger.error(f"[PhotoStatusAPI] Photo processing failed (task_id: {task_id}): {str(result.info)}")
elif result.state == 'RETRY':
# Задача повторяется (была ошибка, но пытаемся еще раз)
response_data['progress'] = 25
response_data['message'] = 'Повторная попытка обработки...'
return JsonResponse(response_data, status=200)
except Exception as e:
logger.error(f"[PhotoStatusAPI] Error getting task status: {str(e)}", exc_info=True)
return JsonResponse({
'status': 'error',
'message': 'Ошибка при получении статуса',
'error': str(e)
}, status=500)
@csrf_exempt
@require_GET
def batch_photo_status(request):
"""
Получить статус обработки для нескольких фото одновременно.
URL: /api/photos/batch-status/?task_ids=id1&task_ids=id2&task_ids=id3
Параметры:
task_ids: Список task_id (может передаваться несколько раз или через запятую)
Returns:
{
'results': [
{
'task_id': str,
'status': 'PENDING' | 'SUCCESS' | 'FAILURE',
'progress': 0-100,
'message': str,
},
...
],
'completed': int,
'failed': int,
'processing': int,
}
"""
try:
# Получаем task_ids из query параметров
task_ids = request.GET.getlist('task_ids')
if not task_ids:
return JsonResponse({
'error': 'Параметр task_ids обязателен',
'results': []
}, status=400)
results = []
completed_count = 0
failed_count = 0
processing_count = 0
for task_id in task_ids:
result = AsyncResult(task_id)
status_info = {
'task_id': task_id,
'status': result.state,
}
if result.state == 'PENDING':
status_info['progress'] = 0
status_info['message'] = 'В очереди'
processing_count += 1
elif result.state == 'STARTED':
status_info['progress'] = 50
status_info['message'] = 'Обрабатывается'
processing_count += 1
elif result.state == 'SUCCESS':
status_info['progress'] = 100
status_info['message'] = 'Готово'
status_info['result'] = result.result
completed_count += 1
elif result.state == 'FAILURE':
status_info['progress'] = 0
status_info['message'] = 'Ошибка'
status_info['error'] = str(result.info)
failed_count += 1
results.append(status_info)
return JsonResponse({
'results': results,
'completed': completed_count,
'failed': failed_count,
'processing': processing_count,
'total': len(task_ids),
}, status=200)
except Exception as e:
logger.error(f"[PhotoStatusAPI] Error in batch status: {str(e)}", exc_info=True)
return JsonResponse({
'error': str(e),
'results': []
}, status=500)