Files
octopus/myproject/products/tasks.py
Andrey Smakotin 0f19542ac9 Добавлен асинхронный импорт товаров с параллельной загрузкой фото + исправлен баг со счётчиком SKU
- Реализован импорт Product из CSV/XLSX через Celery с прогресс-баром
- Параллельная загрузка фото товаров с внешних URL (масштабируемость до 500+ товаров)
- Добавлена модель ProductImportJob для отслеживания статуса импорта
- Создан таск download_product_photo_async для загрузки фото в фоне
- Интеграция с существующим ImageProcessor (синхронная обработка через use_async=False)
- Добавлены view и template для импорта с real-time обновлением через AJAX

FIX: Исправлен баг со счётчиком SKU - инкремент только после успешного сохранения
- Добавлен SKUCounter.peek_next_value() - возвращает следующий номер БЕЗ инкремента
- Добавлен SKUCounter.increment_counter() - инкрементирует счётчик
- generate_product_sku() использует peek_next_value() вместо get_next_value()
- Добавлен post_save сигнал increment_sku_counter_after_save() для инкремента после создания
- Предотвращает пропуски номеров при ошибках валидации (например cost_price NULL)

FIX: Исправлена ошибка с is_main в ProductPhoto
- ProductPhoto не имеет поля is_main, используется только order
- Первое фото (order=0) автоматически считается главным
- Удалён параметр is_main из download_product_photo_async и _collect_photo_tasks

Изменены файлы:
- products/models/base.py - методы для управления счётчиком SKU
- products/models/import_job.py - модель для отслеживания импорта
- products/services/import_export.py - сервис импорта с поддержкой Celery
- products/tasks.py - таски для асинхронного импорта и загрузки фото
- products/signals.py - сигнал для инкремента счётчика после сохранения
- products/utils/sku_generator.py - использование peek_next_value()
- products/views/product_import_views.py - view для импорта
- products/templates/products/product_import*.html - UI для импорта
- docker/entrypoint.sh - настройка Celery worker (concurrency=4)
- requirements.txt - добавлен requests для загрузки фото
2026-01-06 07:10:12 +03:00

622 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Celery задачи для асинхронной обработки фото товаров.
ВАЖНО: django-tenants мультитенантность!
Все задачи получают schema_name и активируют нужную схему для изоляции данных.
МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ:
На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов.
Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema().
"""
import os
import time
import logging
from celery import shared_task
from django.db import connection
from django.apps import apps
from django.conf import settings
from django.core.files.storage import default_storage
from django.utils import timezone
from tenants.models import RESERVED_SCHEMA_NAMES
logger = logging.getLogger(__name__)
# Регистрация декодеров HEIF/AVIF для Pillow в Celery worker
# Это критично для обработки HEIC/HEIF фото с iPhone
try:
from pillow_heif import register_heif_opener
register_heif_opener()
logger.info("[Celery] HEIF/AVIF decoders registered successfully")
except ImportError:
logger.warning("[Celery] pillow-heif not available - HEIC/HEIF/AVIF formats will not be supported")
@shared_task(
bind=True,
name='products.tasks.process_product_photo_async',
max_retries=3,
default_retry_delay=60, # Повторить через 60 секунд при ошибке
)
def process_product_photo_async(self, photo_id, photo_model_class, schema_name):
"""
Асинхронная обработка загруженного фото.
Args:
photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto
photo_model_class: Строка с путем к модели ('products.ProductPhoto')
schema_name: Имя схемы тенанта для активации правильной БД
Returns:
dict: Результат обработки с информацией о качестве и путях к файлам
"""
from .utils.image_processor import ImageProcessor
try:
# КРИТИЧНО: Активируем схему тенанта
# Это гарантирует что мы работаем с данными правильного тенанта
connection.set_schema(schema_name)
logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}")
# Получаем модель по строке пути ('products.ProductPhoto')
app_label, model_name = photo_model_class.split('.')
PhotoModel = apps.get_model(app_label, model_name)
# Загружаем объект фото из БД
photo_obj = PhotoModel.objects.get(pk=photo_id)
entity = photo_obj.get_entity()
logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}")
# Проверяем что фото еще не обработано
if not photo_obj.image:
logger.warning(f"[Celery] Photo {photo_id} has no image file")
return {'status': 'error', 'reason': 'no_image'}
# Сохраняем путь к временному файлу до перезаписи поля image
temp_path = photo_obj.image.name
# КРИТИЧНО: Проверяем что файл существует перед обработкой
# Это предотвращает бесполезные retry если файл был удален вручную
if not default_storage.exists(temp_path):
logger.error(f"[Celery] File does not exist: {temp_path}")
raise FileNotFoundError(f"File not found: {temp_path}")
# Получаем entity type для правильного пути сохранения
entity_type = photo_obj.get_entity_type()
# ОСНОВНАЯ РАБОТА: Обрабатываем изображение
# Это операция занимает время (resize, convert formats, etc)
logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}")
processed_paths = ImageProcessor.process_image(
photo_obj.image,
entity_type,
entity_id=entity.id,
photo_id=photo_obj.id
)
# Обновляем объект фото с новыми путями и метаданными качества
photo_obj.image = processed_paths['original']
photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable')
photo_obj.quality_warning = processed_paths.get('quality_warning', False)
photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning'])
# Удаляем временный файл из temp после успешной обработки
try:
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted temp file: {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}")
logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully "
f"(quality: {processed_paths.get('quality_level')})")
return {
'status': 'success',
'photo_id': photo_id,
'schema_name': schema_name,
'quality_level': processed_paths.get('quality_level'),
'paths': {
'original': processed_paths['original'],
'large': processed_paths.get('large'),
'medium': processed_paths.get('medium'),
'thumbnail': processed_paths.get('thumbnail'),
}
}
except PhotoModel.DoesNotExist as e:
logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}")
# Retry briefly to allow DB transaction to commit (race condition on first photo)
try:
raise self.retry(exc=e, countdown=5)
except self.MaxRetriesExceededError:
# Final failure: attempt to delete the orphan temp file if we recorded it
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
temp_path = (status.result_data or {}).get('temp_path') if status else None
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}")
return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id}
except FileNotFoundError as exc:
# Файл не найден - не имеет смысла повторять task
# Это обычно означает что файл был удален до обработки
logger.error(f"[Celery] File not found for photo {photo_id} in {schema_name}: {str(exc)}")
logger.error(f"[Celery] This usually means the file was deleted before processing. Marking as failed.")
# Обновляем статус обработки без retry
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
if status:
status.status = 'failed'
status.error_message = f"File not found: {str(exc)}"
status.completed_at = timezone.now()
status.save()
logger.info(f"[Celery] Updated PhotoProcessingStatus to 'failed' for photo {photo_id}")
except Exception as status_exc:
logger.warning(f"[Celery] Could not update PhotoProcessingStatus: {status_exc}")
return {
'status': 'error',
'reason': 'file_not_found',
'photo_id': photo_id,
'error': str(exc)
}
except Exception as exc:
logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}",
exc_info=True)
# Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом)
try:
raise self.retry(exc=exc, countdown=60)
except self.MaxRetriesExceededError:
logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.")
# Попытка удалить temp файл при окончательном провале
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
temp_path = (status.result_data or {}).get('temp_path') if status else None
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted orphaned temp file (max_retries): {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on max_retries: {del_exc}")
return {
'status': 'error',
'reason': 'max_retries_exceeded',
'photo_id': photo_id,
'error': str(exc)
}
@shared_task(name='products.tasks.process_multiple_photos_async')
def process_multiple_photos_async(photo_ids, photo_model_class, schema_name):
"""
Обработка нескольких фото параллельно (chord pattern).
Это позволяет обрабатывать несколько фото одновременно
если загружено много фото за раз.
Args:
photo_ids: Список ID фотографий
photo_model_class: Путь к модели ('products.ProductPhoto')
schema_name: Схема тенанта
Returns:
dict: Информация о submitted задачах
"""
from celery import group
logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}")
# Создаем группу задач для параллельной обработки
# Celery автоматически распределит их между доступными workers
job = group(
process_product_photo_async.s(photo_id, photo_model_class, schema_name)
for photo_id in photo_ids
)
# Запускаем группу задач асинхронно
result = job.apply_async()
return {
'status': 'submitted',
'count': len(photo_ids),
'group_id': result.id,
'schema_name': schema_name
}
@shared_task(name='products.tasks.cleanup_temp_media_for_schema')
def cleanup_temp_media_for_schema(schema_name, ttl_hours=None):
"""
Очистка временных файлов изображений для указанной схемы тенанта.
Удаляет файлы старше TTL из папок: products/temp, kits/temp, categories/temp.
Args:
schema_name: Имя схемы тенанта
ttl_hours: Время жизни файла в часах (по умолчанию из settings.TEMP_MEDIA_TTL_HOURS)
Returns:
dict: Результат очистки с количеством удаленных файлов
"""
from django.conf import settings
try:
# Пропускаем зарезервированные схемы (public, admin и т.д.)
if schema_name in RESERVED_SCHEMA_NAMES:
ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24))
logger.info(f"[Cleanup:{schema_name}] Skipping reserved schema")
return {
'status': 'skipped',
'schema_name': schema_name,
'reason': 'reserved_schema',
'deleted': 0,
'scanned': 0,
'ttl_hours': ttl
}
# Активируем схему тенанта
connection.set_schema(schema_name)
ttl = int(ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24))
cutoff_seconds = ttl * 3600
now = time.time()
temp_dirs = ['products/temp', 'kits/temp', 'categories/temp']
deleted_count = 0
scanned_count = 0
for rel_dir in temp_dirs:
try:
# Получаем полный путь с учётом tenant_id
try:
full_dir = default_storage.path(rel_dir)
except RuntimeError as storage_error:
# Если не удается определить tenant_id (например, для public схемы)
if 'Cannot determine tenant ID' in str(storage_error):
logger.warning(f"[Cleanup:{schema_name}] Skipping {rel_dir}: {storage_error}")
continue
raise # Перебрасываем другие ошибки
if not os.path.isdir(full_dir):
continue
for filename in os.listdir(full_dir):
scanned_count += 1
full_path = os.path.join(full_dir, filename)
# Пропускаем поддиректории, работаем только с файлами
if not os.path.isfile(full_path):
continue
# Проверяем возраст файла
age_seconds = now - os.path.getmtime(full_path)
if age_seconds >= cutoff_seconds:
# Формируем относительный путь для storage.delete
storage_rel_path = os.path.join(rel_dir, filename).replace('\\', '/')
try:
if default_storage.exists(storage_rel_path):
default_storage.delete(storage_rel_path)
deleted_count += 1
logger.info(f"[Cleanup:{schema_name}] Deleted: {storage_rel_path} (age: {age_seconds/3600:.1f}h)")
except Exception as del_exc:
logger.warning(f"[Cleanup:{schema_name}] Could not delete {storage_rel_path}: {del_exc}")
except Exception as dir_exc:
logger.warning(f"[Cleanup:{schema_name}] Error scanning {rel_dir}: {dir_exc}")
logger.info(f"[Cleanup:{schema_name}] Complete. scanned={scanned_count}, deleted={deleted_count}, ttl_hours={ttl}")
return {
'status': 'success',
'schema_name': schema_name,
'deleted': deleted_count,
'scanned': scanned_count,
'ttl_hours': ttl
}
except Exception as exc:
logger.error(f"[Cleanup:{schema_name}] Failed: {exc}", exc_info=True)
return {'status': 'error', 'schema_name': schema_name, 'error': str(exc)}
@shared_task(name='products.tasks.cleanup_temp_media_all')
def cleanup_temp_media_all(ttl_hours=None):
"""
Мастер-задача: перечисляет всех тенантов и запускает очистку temp для каждого.
Запускается периодически через Celery Beat.
Args:
ttl_hours: Время жизни файла в часах (передается в подзадачи)
Returns:
dict: Информация о количестве тенантов и запущенных задачах
"""
from django.conf import settings
try:
# Работаем из public для списка тенантов
connection.set_schema('public')
from tenants.models import Client
# Фильтруем зарезервированные схемы, чтобы не запускать задачи для них
schemas = [s for s in Client.objects.values_list('schema_name', flat=True)
if s not in RESERVED_SCHEMA_NAMES]
ttl = ttl_hours or getattr(settings, 'TEMP_MEDIA_TTL_HOURS', 24)
logger.info(f"[CleanupAll] Scheduling cleanup for {len(schemas)} tenants (TTL: {ttl}h)")
for schema in schemas:
cleanup_temp_media_for_schema.delay(schema, ttl)
return {
'status': 'submitted',
'tenants_count': len(schemas),
'ttl_hours': ttl
}
except Exception as exc:
logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True)
return {'status': 'error', 'error': str(exc)}
# ============================================================================
# ИМПОРТ ТОВАРОВ
# ============================================================================
@shared_task(
bind=True,
name='products.tasks.import_products_async',
max_retries=0, # Не повторяем автоматически
time_limit=3600, # 1 час максимум
)
def import_products_async(self, job_id, file_path, update_existing, schema_name):
"""
Асинхронный импорт товаров из CSV/XLSX.
Алгоритм:
1. Читаем файл и парсим заголовки
2. Для каждой строки:
- Создаём/обновляем товар (без фото)
- Собираем URL фото для параллельной загрузки
- Обновляем прогресс в ProductImportJob
3. Запускаем параллельную загрузку всех фото (group task)
4. Удаляем временный файл
Args:
job_id: ID ProductImportJob
file_path: Путь к загруженному файлу
update_existing: Обновлять ли существующие товары
schema_name: Схема тенанта
Returns:
dict: Результат импорта
"""
from .services.import_export import ProductImporter
from .models import ProductImportJob
from celery import group
import os
try:
# Активируем схему тенанта
connection.set_schema(schema_name)
logger.info(f"[Import] Activated schema: {schema_name} for job {job_id}")
# Загружаем задачу
job = ProductImportJob.objects.get(id=job_id)
job.status = 'processing'
job.save(update_fields=['status'])
# Открываем файл
if not default_storage.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
with default_storage.open(file_path, 'rb') as file:
# Создаём importer с callback для прогресса
importer = ProductImporter()
# Callback для обновления прогресса
def progress_callback(current, total, created, updated, skipped, errors):
job.refresh_from_db()
job.processed_rows = current
job.total_rows = total
job.created_count = created
job.updated_count = updated
job.skipped_count = skipped
job.errors_count = len(errors)
job.save(update_fields=[
'processed_rows', 'total_rows', 'created_count',
'updated_count', 'skipped_count', 'errors_count'
])
# Запускаем импорт (без фото, они загрузятся параллельно)
result = importer.import_from_file(
file=file,
update_existing=update_existing,
progress_callback=progress_callback,
skip_images=True, # Не загружаем фото синхронно
schema_name=schema_name # Передаём для задач фото
)
# Обновляем результаты
job.refresh_from_db()
job.status = 'completed'
job.created_count = result['created']
job.updated_count = result['updated']
job.skipped_count = result['skipped']
job.errors_count = result.get('real_error_count', 0)
job.errors_json = result.get('real_errors', [])
job.completed_at = timezone.now()
job.save()
# Удаляем временный файл
try:
if default_storage.exists(file_path):
default_storage.delete(file_path)
logger.info(f"[Import] Deleted temp file: {file_path}")
except Exception as del_exc:
logger.warning(f"[Import] Could not delete temp file: {del_exc}")
# Запускаем параллельную загрузку фото (если есть)
if result.get('photo_tasks'):
photo_tasks = result['photo_tasks']
logger.info(f"[Import] Starting parallel download of {len(photo_tasks)} photos")
# FIX: is_main удалён из сигнатуры download_product_photo_async
# Создаём группу задач для параллельной загрузки
photo_group = group(
download_product_photo_async.s(
product_id=task['product_id'],
image_url=task['url'],
order=task['order'],
schema_name=schema_name
)
for task in photo_tasks
)
# Запускаем группу асинхронно
photo_group.apply_async()
logger.info(f"[Import] Photo download tasks submitted")
logger.info(f"[Import] Job {job_id} completed successfully: "
f"created={result['created']}, updated={result['updated']}, "
f"skipped={result['skipped']}, errors={result.get('real_error_count', 0)}")
return {
'status': 'success',
'job_id': job_id,
'result': result
}
except ProductImportJob.DoesNotExist:
logger.error(f"[Import] Job {job_id} not found in schema {schema_name}")
return {'status': 'error', 'reason': 'job_not_found'}
except Exception as exc:
logger.error(f"[Import] Job {job_id} failed: {exc}", exc_info=True)
# Обновляем статус задачи
try:
connection.set_schema(schema_name)
job = ProductImportJob.objects.get(id=job_id)
job.status = 'failed'
job.error_message = str(exc)
job.completed_at = timezone.now()
job.save()
except Exception as save_exc:
logger.error(f"[Import] Could not update job status: {save_exc}")
# Удаляем временный файл
try:
if default_storage.exists(file_path):
default_storage.delete(file_path)
except Exception:
pass
return {
'status': 'error',
'job_id': job_id,
'error': str(exc)
}
@shared_task(
bind=True,
name='products.tasks.download_product_photo_async',
max_retries=3,
default_retry_delay=10,
)
def download_product_photo_async(self, product_id, image_url, order, schema_name):
"""
Загрузка одного фото товара по URL.
Запускается параллельно для всех фото.
FIX: ProductPhoto не имеет is_main, главное фото определяется по order=0
Args:
product_id: ID товара
image_url: URL изображения
order: Порядок фото (первое фото order=0 считается главным)
schema_name: Схема тенанта
Returns:
dict: Результат загрузки
"""
import requests
from django.core.files.base import ContentFile
from .models import Product, ProductPhoto
import urllib.parse
try:
# Активируем схему
connection.set_schema(schema_name)
# Загружаем товар
product = Product.objects.get(id=product_id)
# Скачиваем изображение
response = requests.get(image_url, timeout=30)
response.raise_for_status()
# Получаем имя файла
parsed_url = urllib.parse.urlparse(image_url)
filename = parsed_url.path.split('/')[-1]
# Создаём ProductPhoto
# FIX: ProductPhoto не имеет поля is_main, используется только order
# Первое фото (order=0) автоматически считается главным
photo = ProductPhoto(
product=product,
order=order # is_main удалён — используется order для определения главного фото
)
# Сохраняем файл
# ВАЖНО: use_async=False чтобы НЕ запускать дополнительную Celery задачу
# Обработка будет выполнена синхронно в текущей задаче
photo.image.save(
filename,
ContentFile(response.content),
save=False # Не сохраняем в БД пока
)
photo.save(use_async=False) # Синхронная обработка через ImageProcessor
logger.info(f"[PhotoDownload] Downloaded photo for product {product_id}: {image_url}")
return {
'status': 'success',
'product_id': product_id,
'photo_id': photo.id,
'url': image_url
}
except Product.DoesNotExist:
logger.error(f"[PhotoDownload] Product {product_id} not found in {schema_name}")
return {'status': 'error', 'reason': 'product_not_found'}
except requests.RequestException as exc:
logger.warning(f"[PhotoDownload] Failed to download {image_url}: {exc}")
# Повторяем при ошибках сети
try:
raise self.retry(exc=exc, countdown=10)
except self.MaxRetriesExceededError:
logger.error(f"[PhotoDownload] Max retries exceeded for {image_url}")
return {'status': 'error', 'reason': 'max_retries', 'url': image_url}
except Exception as exc:
logger.error(f"[PhotoDownload] Unexpected error for {image_url}: {exc}", exc_info=True)
return {'status': 'error', 'reason': 'unexpected', 'url': image_url, 'error': str(exc)}