Resolves critical bug where photos of products with the same ID in different tenants were overwriting each other. Implemented complete isolation of media files between tenants using custom Django storage backend. ## Changes ### New Files - products/utils/storage.py: TenantAwareFileSystemStorage backend * Automatically adds tenant_id to file paths on disk * Prevents cross-tenant file access with security checks * Stores clean paths in DB for portability - products/tests/test_multi_tenant_photos.py: Comprehensive tests * 5 tests covering isolation, security, and configuration * All tests passing ✅ - MULTITENANT_PHOTO_FIX.md: Complete documentation ### Modified Files - settings.py: Configured DEFAULT_FILE_STORAGE to use TenantAwareFileSystemStorage - products/models/photos.py: * Converted upload_to from strings to callable functions * Updated ProductPhoto, ProductKitPhoto, ProductCategoryPhoto * Added tenant isolation documentation - products/tasks.py: Added documentation about file structure - products/utils/image_processor.py: Added documentation - products/utils/image_service.py: Added documentation ## Architecture **On disk:** media/tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext **In DB:** products/{entity_id}/{photo_id}/{size}.ext Tenant ID is automatically added/removed during file operations. ## Security - Storage rejects cross-tenant file access - Proper tenant context validation - Integration with django-tenants schema system ## Testing - All 5 multi-tenant photo tests pass - Verified photo paths are isolated per tenant - Verified storage rejects cross-tenant access - Verified configuration is correct ## Future-proof - Ready for S3 migration (just change storage backend) - No breaking changes to existing code - Clean separation of concerns Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
223 lines
9.8 KiB
Python
223 lines
9.8 KiB
Python
"""
|
||
Custom FileSystemStorage с поддержкой мультитенантности.
|
||
Автоматически добавляет tenant_id в путь сохранения файлов для изоляции файлов между тенантами.
|
||
|
||
Структура пути:
|
||
media/tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
|
||
media/tenants/{tenant_id}/kits/{entity_id}/{photo_id}/{size}.ext
|
||
media/tenants/{tenant_id}/categories/{entity_id}/{photo_id}/{size}.ext
|
||
"""
|
||
import logging
|
||
from django.core.files.storage import FileSystemStorage
|
||
from django.db import connection
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TenantAwareFileSystemStorage(FileSystemStorage):
|
||
"""
|
||
Расширение стандартного FileSystemStorage для поддержки мультитенантности.
|
||
Автоматически добавляет tenant_id в путь сохранения файлов.
|
||
"""
|
||
|
||
def _get_tenant_id(self):
|
||
"""
|
||
Получить ID текущего тенанта из контекста django-tenants.
|
||
|
||
Returns:
|
||
int or str: ID тенанта
|
||
|
||
Raises:
|
||
RuntimeError: Если не удается определить тенант
|
||
"""
|
||
try:
|
||
# Получаем текущую схему (для django-tenants)
|
||
schema_name = connection.schema_name
|
||
|
||
# Если это публичная схема, это ошибка при попытке загрузить фото
|
||
if schema_name == 'public':
|
||
raise RuntimeError(
|
||
"Cannot determine tenant ID - working in 'public' schema. "
|
||
"File uploads are only allowed in tenant schemas."
|
||
)
|
||
|
||
# Парсим schema_name для получения tenant_id
|
||
# Стандартные форматы:
|
||
# - 'tenant_<id>' (продакшен)
|
||
# - 'test' или '<name>' (для тестов и других схем)
|
||
if schema_name.startswith('tenant_'):
|
||
tenant_id = schema_name.replace('tenant_', '')
|
||
logger.debug(f"[Storage] Extracted tenant_id={tenant_id} from schema={schema_name}")
|
||
return tenant_id
|
||
else:
|
||
# Используем schema_name как есть (для тестов, локальной разработки и т.д.)
|
||
logger.debug(f"[Storage] Using schema_name as tenant_id: {schema_name}")
|
||
return schema_name
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Storage] Failed to get tenant_id: {str(e)}")
|
||
raise RuntimeError(f"Failed to determine tenant context: {str(e)}")
|
||
|
||
def _get_tenant_path(self, name):
|
||
"""
|
||
Добавить tenant_id в путь файла.
|
||
Проверяет что путь еще не содержит tenant_id чтобы избежать двойного добавления.
|
||
|
||
Args:
|
||
name (str): Исходный путь (например, 'products/temp/image.jpg')
|
||
|
||
Returns:
|
||
str: Путь с tenant_id (например, 'tenants/1/products/temp/image.jpg')
|
||
"""
|
||
# Если путь уже содержит tenants/, не добавляем еще раз
|
||
if name.startswith('tenants/'):
|
||
logger.debug(f"[Storage] Path already has tenant prefix: {name}")
|
||
return name
|
||
|
||
tenant_id = self._get_tenant_id()
|
||
return f"tenants/{tenant_id}/{name}"
|
||
|
||
def get_available_name(self, name, max_length=None):
|
||
"""
|
||
Переопределяем для проверки уникальности с учетом tenant_id на диске,
|
||
но возвращаем путь БЕЗ tenant_id для сохранения в БД.
|
||
|
||
Диск: tenants/{tenant_id}/products/{id}/{photo_id}/file.ext
|
||
БД: products/{id}/{photo_id}/file.ext
|
||
|
||
Args:
|
||
name (str): Исходное имя файла с путем
|
||
max_length (int): Максимальная длина пути
|
||
|
||
Returns:
|
||
str: Путь БЕЗ tenant_id для сохранения в БД
|
||
"""
|
||
# Добавляем tenant_id для проверки на диске
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
|
||
logger.debug(f"[Storage] get_available_name: {name} → checking disk with: {tenant_aware_name}")
|
||
|
||
# Вызываем родительский метод с tenant_id для проверки уникальности на диске
|
||
available_tenant_aware = super().get_available_name(tenant_aware_name, max_length)
|
||
|
||
# Но возвращаем путь БЕЗ tenant_id для сохранения в БД
|
||
# Удаляем prefix 'tenants/{tenant_id}/' из пути
|
||
available_name = available_tenant_aware
|
||
if available_name.startswith('tenants/'):
|
||
# Удаляем 'tenants/{tenant_id}/' из пути
|
||
parts = available_name.split('/', 2) # Split на первые два /
|
||
if len(parts) == 3:
|
||
available_name = parts[2]
|
||
logger.debug(f"[Storage] Stripped tenant prefix: {available_tenant_aware} → {available_name}")
|
||
|
||
logger.debug(f"[Storage] get_available_name returns for DB: {available_name}")
|
||
return available_name
|
||
|
||
def _save(self, name, content):
|
||
"""
|
||
Переопределяем для добавления tenant_id в путь на диске,
|
||
но сохраняем в БД путь БЕЗ tenant_id (для экономии и мобильности).
|
||
|
||
Args:
|
||
name (str): Имя файла
|
||
content: Содержимое файла
|
||
|
||
Returns:
|
||
str: Путь БЕЗ tenant_id для сохранения в БД
|
||
"""
|
||
# Добавляем tenant_id в путь для сохранения на диск
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
|
||
logger.debug(f"[Storage] _save: {name} → {tenant_aware_name} (DB will store: {name})")
|
||
|
||
# Сохраняем файл на диск с tenant_id, но возвращаем исходный путь для БД
|
||
super()._save(tenant_aware_name, content)
|
||
|
||
# Возвращаем путь БЕЗ tenant_id для сохранения в БД
|
||
# Это позволяет:
|
||
# 1. На диске: tenants/{tenant_id}/products/{id}/{photo_id}/file.ext
|
||
# 2. В БД: products/{id}/{photo_id}/file.ext
|
||
return name
|
||
|
||
def delete(self, name):
|
||
"""
|
||
Удалить файл, убедившись что он принадлежит текущему тенанту.
|
||
|
||
Args:
|
||
name (str): Путь к файлу
|
||
"""
|
||
# Получаем tenant_id для проверки
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Проверяем что файл принадлежит текущему тенанту
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to delete file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
raise RuntimeError(
|
||
f"Cannot delete file - it belongs to a different tenant. "
|
||
f"Current tenant: {tenant_id}"
|
||
)
|
||
|
||
logger.debug(f"[Storage] delete: {name}")
|
||
return super().delete(name)
|
||
|
||
def exists(self, name):
|
||
"""
|
||
Проверить существование файла, убедившись что он принадлежит текущему тенанту.
|
||
|
||
Args:
|
||
name (str): Путь к файлу
|
||
|
||
Returns:
|
||
bool: True если файл существует и принадлежит текущему тенанту
|
||
"""
|
||
# Получаем tenant_id
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Если путь уже содержит tenants/, не добавляем еще раз
|
||
if name.startswith("tenants/"):
|
||
# Проверяем что файл принадлежит текущему тенанту
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to check file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
return False
|
||
return super().exists(name)
|
||
|
||
# Иначе добавляем tenant_id
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
return super().exists(tenant_aware_name)
|
||
|
||
def url(self, name):
|
||
"""
|
||
Получить URL файла, убедившись что он принадлежит текущему тенанту.
|
||
|
||
Args:
|
||
name (str): Путь к файлу
|
||
|
||
Returns:
|
||
str: URL файла
|
||
"""
|
||
# Получаем tenant_id
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Если путь уже содержит tenants/, проверяем принадлежность тенанту
|
||
if name.startswith("tenants/"):
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to get URL for file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
raise RuntimeError(
|
||
f"Cannot get URL for file - it belongs to a different tenant. "
|
||
f"Current tenant: {tenant_id}"
|
||
)
|
||
return super().url(name)
|
||
|
||
# Иначе добавляем tenant_id
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
return super().url(tenant_aware_name)
|