Files
octopus/myproject/products/tasks.py
Andrey Smakotin ff40a9c1f0 Fix: Implement tenant-aware file storage for photo isolation
Resolves critical bug where photos of products with the same ID in different
tenants were overwriting each other. Implemented complete isolation of media
files between tenants using custom Django storage backend.

## Changes

### New Files
- products/utils/storage.py: TenantAwareFileSystemStorage backend
  * Automatically adds tenant_id to file paths on disk
  * Prevents cross-tenant file access with security checks
  * Stores clean paths in DB for portability

- products/tests/test_multi_tenant_photos.py: Comprehensive tests
  * 5 tests covering isolation, security, and configuration
  * All tests passing 

- MULTITENANT_PHOTO_FIX.md: Complete documentation

### Modified Files
- settings.py: Configured DEFAULT_FILE_STORAGE to use TenantAwareFileSystemStorage
- products/models/photos.py:
  * Converted upload_to from strings to callable functions
  * Updated ProductPhoto, ProductKitPhoto, ProductCategoryPhoto
  * Added tenant isolation documentation

- products/tasks.py: Added documentation about file structure
- products/utils/image_processor.py: Added documentation
- products/utils/image_service.py: Added documentation

## Architecture

**On disk:** media/tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
**In DB:** products/{entity_id}/{photo_id}/{size}.ext

Tenant ID is automatically added/removed during file operations.

## Security
- Storage rejects cross-tenant file access
- Proper tenant context validation
- Integration with django-tenants schema system

## Testing
- All 5 multi-tenant photo tests pass
- Verified photo paths are isolated per tenant
- Verified storage rejects cross-tenant access
- Verified configuration is correct

## Future-proof
- Ready for S3 migration (just change storage backend)
- No breaking changes to existing code
- Clean separation of concerns

Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 20:05:20 +03:00

183 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Celery задачи для асинхронной обработки фото товаров.
ВАЖНО: django-tenants мультитенантность!
Все задачи получают schema_name и активируют нужную схему для изоляции данных.
МУЛЬТИТЕНАНТНОСТЬ В ФАЙЛАХ:
На диске файлы хранятся как: tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
TenantAwareFileSystemStorage добавляет tenant_id при сохранении/удалении файлов.
Безопасность: Каждый Celery worker активирует правильную схему БД через connection.set_schema().
"""
import logging
from celery import shared_task
from django.db import connection
from django.apps import apps
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
@shared_task(
bind=True,
name='products.tasks.process_product_photo_async',
max_retries=3,
default_retry_delay=60, # Повторить через 60 секунд при ошибке
)
def process_product_photo_async(self, photo_id, photo_model_class, schema_name):
"""
Асинхронная обработка загруженного фото.
Args:
photo_id: ID объекта ProductPhoto/ProductKitPhoto/ProductCategoryPhoto
photo_model_class: Строка с путем к модели ('products.ProductPhoto')
schema_name: Имя схемы тенанта для активации правильной БД
Returns:
dict: Результат обработки с информацией о качестве и путях к файлам
"""
from .utils.image_processor import ImageProcessor
try:
# КРИТИЧНО: Активируем схему тенанта
# Это гарантирует что мы работаем с данными правильного тенанта
connection.set_schema(schema_name)
logger.info(f"[Celery] Activated schema: {schema_name} for photo_id: {photo_id}")
# Получаем модель по строке пути ('products.ProductPhoto')
app_label, model_name = photo_model_class.split('.')
PhotoModel = apps.get_model(app_label, model_name)
# Загружаем объект фото из БД
photo_obj = PhotoModel.objects.get(pk=photo_id)
entity = photo_obj.get_entity()
logger.info(f"[Celery] Processing photo {photo_id} for {entity.__class__.__name__} #{entity.id}")
# Проверяем что фото еще не обработано
if not photo_obj.image:
logger.warning(f"[Celery] Photo {photo_id} has no image file")
return {'status': 'error', 'reason': 'no_image'}
# Сохраняем путь к временному файлу до перезаписи поля image
temp_path = photo_obj.image.name
# Получаем entity type для правильного пути сохранения
entity_type = photo_obj.get_entity_type()
# ОСНОВНАЯ РАБОТА: Обрабатываем изображение
# Это операция занимает время (resize, convert formats, etc)
logger.info(f"[Celery] Starting image processing for photo {photo_id} in {schema_name}")
processed_paths = ImageProcessor.process_image(
photo_obj.image,
entity_type,
entity_id=entity.id,
photo_id=photo_obj.id
)
# Обновляем объект фото с новыми путями и метаданными качества
photo_obj.image = processed_paths['original']
photo_obj.quality_level = processed_paths.get('quality_level', 'acceptable')
photo_obj.quality_warning = processed_paths.get('quality_warning', False)
photo_obj.save(update_fields=['image', 'quality_level', 'quality_warning'])
# Удаляем временный файл из temp после успешной обработки
try:
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted temp file: {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file {temp_path}: {del_exc}")
logger.info(f"[Celery] ✓ Photo {photo_id} processed successfully "
f"(quality: {processed_paths.get('quality_level')})")
return {
'status': 'success',
'photo_id': photo_id,
'schema_name': schema_name,
'quality_level': processed_paths.get('quality_level'),
'paths': {
'original': processed_paths['original'],
'large': processed_paths.get('large'),
'medium': processed_paths.get('medium'),
'thumbnail': processed_paths.get('thumbnail'),
}
}
except PhotoModel.DoesNotExist as e:
logger.error(f"[Celery] Photo {photo_id} not found in schema {schema_name}")
# Retry briefly to allow DB transaction to commit (race condition on first photo)
try:
raise self.retry(exc=e, countdown=5)
except self.MaxRetriesExceededError:
# Final failure: attempt to delete the orphan temp file if we recorded it
try:
from .models.photos import PhotoProcessingStatus
status = (PhotoProcessingStatus.objects
.filter(photo_id=photo_id, photo_model=photo_model_class)
.order_by('-created_at')
.first())
temp_path = (status.result_data or {}).get('temp_path') if status else None
if temp_path and default_storage.exists(temp_path):
default_storage.delete(temp_path)
logger.info(f"[Celery] Deleted temp file (not_found): {temp_path}")
except Exception as del_exc:
logger.warning(f"[Celery] Could not delete temp file for photo {photo_id} on not_found: {del_exc}")
return {'status': 'error', 'reason': 'not_found', 'photo_id': photo_id}
except Exception as exc:
logger.error(f"[Celery] Error processing photo {photo_id} in {schema_name}: {str(exc)}",
exc_info=True)
# Повторить задачу при ошибке (макс 3 раза с 60 сек интервалом)
try:
raise self.retry(exc=exc, countdown=60)
except self.MaxRetriesExceededError:
logger.error(f"[Celery] Max retries exceeded for photo {photo_id}. Task failed permanently.")
return {
'status': 'error',
'reason': 'max_retries_exceeded',
'photo_id': photo_id,
'error': str(exc)
}
@shared_task(name='products.tasks.process_multiple_photos_async')
def process_multiple_photos_async(photo_ids, photo_model_class, schema_name):
"""
Обработка нескольких фото параллельно (chord pattern).
Это позволяет обрабатывать несколько фото одновременно
если загружено много фото за раз.
Args:
photo_ids: Список ID фотографий
photo_model_class: Путь к модели ('products.ProductPhoto')
schema_name: Схема тенанта
Returns:
dict: Информация о submitted задачах
"""
from celery import group
logger.info(f"[Celery] Submitting {len(photo_ids)} photos for batch processing in {schema_name}")
# Создаем группу задач для параллельной обработки
# Celery автоматически распределит их между доступными workers
job = group(
process_product_photo_async.s(photo_id, photo_model_class, schema_name)
for photo_id in photo_ids
)
# Запускаем группу задач асинхронно
result = job.apply_async()
return {
'status': 'submitted',
'count': len(photo_ids),
'group_id': result.id,
'schema_name': schema_name
}