- Fix MEDIA_ROOT path to match Docker volume mount (/app/myproject/media) - Update docker-compose.yml volume mounts to match MEDIA_ROOT - Add setup_directories() function in entrypoint.sh to create media directories with proper permissions - Add logging to TenantAwareFileSystemStorage for debugging - Fix is_returned flag logic improvements (from previous work)
313 lines
15 KiB
Python
313 lines
15 KiB
Python
"""
|
||
Custom FileSystemStorage с поддержкой мультитенантности.
|
||
Автоматически добавляет tenant_id в путь сохранения файлов для изоляции файлов между тенантами.
|
||
|
||
Структура пути:
|
||
media/tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext
|
||
media/tenants/{tenant_id}/kits/{entity_id}/{photo_id}/{size}.ext
|
||
media/tenants/{tenant_id}/categories/{entity_id}/{photo_id}/{size}.ext
|
||
"""
|
||
import logging
|
||
from django.core.files.storage import FileSystemStorage
|
||
from django.db import connection
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class TenantAwareFileSystemStorage(FileSystemStorage):
|
||
"""
|
||
Расширение стандартного FileSystemStorage для поддержки мультитенантности.
|
||
Автоматически добавляет tenant_id в путь сохранения файлов.
|
||
"""
|
||
|
||
def _get_tenant_id(self):
|
||
"""
|
||
Получить ID текущего тенанта из контекста django-tenants.
|
||
|
||
Returns:
|
||
int or str: ID тенанта
|
||
|
||
Raises:
|
||
RuntimeError: Если не удается определить тенант
|
||
"""
|
||
try:
|
||
# Получаем текущую схему (для django-tenants)
|
||
schema_name = connection.schema_name
|
||
|
||
# Если это публичная схема, это ошибка при попытке загрузить фото
|
||
if schema_name == 'public':
|
||
raise RuntimeError(
|
||
"Cannot determine tenant ID - working in 'public' schema. "
|
||
"File uploads are only allowed in tenant schemas."
|
||
)
|
||
|
||
# Парсим schema_name для получения tenant_id
|
||
# Стандартные форматы:
|
||
# - 'tenant_<id>' (продакшен)
|
||
# - 'test' или '<name>' (для тестов и других схем)
|
||
if schema_name.startswith('tenant_'):
|
||
tenant_id = schema_name.replace('tenant_', '')
|
||
logger.debug(f"[Storage] Extracted tenant_id={tenant_id} from schema={schema_name}")
|
||
return tenant_id
|
||
else:
|
||
# Используем schema_name как есть (для тестов, локальной разработки и т.д.)
|
||
logger.debug(f"[Storage] Using schema_name as tenant_id: {schema_name}")
|
||
return schema_name
|
||
|
||
except Exception as e:
|
||
logger.error(f"[Storage] Failed to get tenant_id: {str(e)}")
|
||
raise RuntimeError(f"Failed to determine tenant context: {str(e)}")
|
||
|
||
def _get_tenant_path(self, name):
|
||
"""
|
||
Добавить tenant_id в путь файла.
|
||
Проверяет что путь еще не содержит tenant_id чтобы избежать двойного добавления.
|
||
|
||
Args:
|
||
name (str): Исходный путь (например, 'products/temp/image.jpg')
|
||
|
||
Returns:
|
||
str: Путь с tenant_id (например, 'tenants/1/products/temp/image.jpg')
|
||
"""
|
||
# Если путь уже содержит tenants/, не добавляем еще раз
|
||
if name.startswith('tenants/'):
|
||
logger.debug(f"[Storage] Path already has tenant prefix: {name}")
|
||
return name
|
||
|
||
tenant_id = self._get_tenant_id()
|
||
return f"tenants/{tenant_id}/{name}"
|
||
|
||
def get_available_name(self, name, max_length=None):
|
||
"""
|
||
Переопределяем для проверки уникальности с учетом tenant_id на диске,
|
||
но возвращаем путь БЕЗ tenant_id для сохранения в БД.
|
||
|
||
Диск: tenants/{tenant_id}/products/{id}/{photo_id}/file.ext
|
||
БД: products/{id}/{photo_id}/file.ext
|
||
|
||
Args:
|
||
name (str): Исходное имя файла с путем
|
||
max_length (int): Максимальная длина пути
|
||
|
||
Returns:
|
||
str: Путь БЕЗ tenant_id для сохранения в БД
|
||
"""
|
||
# Добавляем tenant_id для проверки на диске
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
|
||
logger.debug(f"[Storage] get_available_name: {name} → checking disk with: {tenant_aware_name}")
|
||
|
||
# Вызываем родительский метод с tenant_id для проверки уникальности на диске
|
||
available_tenant_aware = super().get_available_name(tenant_aware_name, max_length)
|
||
|
||
# Но возвращаем путь БЕЗ tenant_id для сохранения в БД
|
||
# Удаляем prefix 'tenants/{tenant_id}/' из пути
|
||
available_name = available_tenant_aware
|
||
if available_name.startswith('tenants/'):
|
||
# Удаляем 'tenants/{tenant_id}/' из пути
|
||
parts = available_name.split('/', 2) # Split на первые два /
|
||
if len(parts) == 3:
|
||
available_name = parts[2]
|
||
logger.debug(f"[Storage] Stripped tenant prefix: {available_tenant_aware} → {available_name}")
|
||
|
||
logger.debug(f"[Storage] get_available_name returns for DB: {available_name}")
|
||
return available_name
|
||
|
||
def _save(self, name, content):
|
||
"""
|
||
Переопределяем для добавления tenant_id в путь на диске,
|
||
но сохраняем в БД путь БЕЗ tenant_id (для экономии и мобильности).
|
||
|
||
Args:
|
||
name (str): Имя файла
|
||
content: Содержимое файла
|
||
|
||
Returns:
|
||
str: Путь БЕЗ tenant_id для сохранения в БД
|
||
"""
|
||
# Добавляем tenant_id в путь для сохранения на диск
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
|
||
logger.info(f"[Storage] _save: {name} → {tenant_aware_name} (DB will store: {name})")
|
||
|
||
try:
|
||
# Сохраняем файл на диск с tenant_id, но возвращаем исходный путь для БД
|
||
# Django FileSystemStorage автоматически создаст директории если их нет
|
||
saved_path = super()._save(tenant_aware_name, content)
|
||
logger.info(f"[Storage] File saved successfully: {saved_path}")
|
||
except Exception as e:
|
||
logger.error(f"[Storage] Error saving file {tenant_aware_name}: {str(e)}", exc_info=True)
|
||
raise
|
||
|
||
# Возвращаем путь БЕЗ tenant_id для сохранения в БД
|
||
# Это позволяет:
|
||
# 1. На диске: tenants/{tenant_id}/products/{id}/{photo_id}/file.ext
|
||
# 2. В БД: products/{id}/{photo_id}/file.ext
|
||
return name
|
||
|
||
def delete(self, name):
|
||
"""
|
||
Удалить файл, убедившись что он принадлежит текущему тенанту.
|
||
|
||
Args:
|
||
name (str): Путь к файлу (может быть БЕЗ tenant_id)
|
||
"""
|
||
# Получаем tenant_id для проверки
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Если путь уже содержит tenants/, проверяем принадлежность тенанту
|
||
if name.startswith("tenants/"):
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to delete file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
raise RuntimeError(
|
||
f"Cannot delete file - it belongs to a different tenant. "
|
||
f"Current tenant: {tenant_id}"
|
||
)
|
||
# Если путь уже содержит tenants/, удаляем его как есть
|
||
logger.debug(f"[Storage] delete: {name} (already has tenant prefix)")
|
||
return super().delete(name)
|
||
|
||
# Иначе добавляем tenant_id перед удалением
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
logger.debug(f"[Storage] delete: {name} → {tenant_aware_name}")
|
||
return super().delete(tenant_aware_name)
|
||
|
||
def exists(self, name):
|
||
"""
|
||
Проверить существование файла, убедившись что он принадлежит текущему тенанту.
|
||
|
||
Args:
|
||
name (str): Путь к файлу
|
||
|
||
Returns:
|
||
bool: True если файл существует и принадлежит текущему тенанту
|
||
"""
|
||
# Получаем tenant_id
|
||
tenant_id = self._get_tenant_id()
|
||
logger.info(f"[Storage] exists called: name={name}, tenant_id={tenant_id}, schema={connection.schema_name}")
|
||
|
||
# Если путь уже содержит tenants/, не добавляем еще раз
|
||
if name.startswith("tenants/"):
|
||
# Проверяем что файл принадлежит текущему тенанту
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to check file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
return False
|
||
result = super().exists(name)
|
||
logger.info(f"[Storage] exists (with tenant): {name} → {result}")
|
||
return result
|
||
|
||
# Иначе добавляем tenant_id
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
result = super().exists(tenant_aware_name)
|
||
logger.info(f"[Storage] exists: {name} → {tenant_aware_name} → {result}")
|
||
return result
|
||
|
||
def url(self, name):
|
||
"""
|
||
Получить URL файла, убедившись что он принадлежит текущему тенанту.
|
||
|
||
Args:
|
||
name (str): Путь к файлу
|
||
|
||
Returns:
|
||
str: URL файла
|
||
"""
|
||
# Получаем tenant_id
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Если путь уже содержит tenants/, проверяем принадлежность тенанту
|
||
if name.startswith("tenants/"):
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to get URL for file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
raise RuntimeError(
|
||
f"Cannot get URL for file - it belongs to a different tenant. "
|
||
f"Current tenant: {tenant_id}"
|
||
)
|
||
return super().url(name)
|
||
|
||
# Иначе добавляем tenant_id
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
return super().url(tenant_aware_name)
|
||
|
||
def _open(self, name, mode='rb'):
|
||
"""
|
||
Открыть файл, добавив tenant_id в путь если необходимо.
|
||
Это критически важно для Celery задач, которые читают файлы из БД.
|
||
|
||
Когда ImageProcessor вызывает Image.open(photo_obj.image), Django вызывает
|
||
это метод. БД содержит путь БЕЗ tenant_id (например, 'products/temp/image.jpg'),
|
||
но файл находится на диске с tenant_id (tenants/{tenant_id}/products/temp/image.jpg).
|
||
|
||
Args:
|
||
name (str): Путь к файлу (может быть БЕЗ tenant_id)
|
||
mode (str): Режим открытия файла
|
||
|
||
Returns:
|
||
File-like object
|
||
"""
|
||
# Получаем tenant_id
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Если путь уже содержит tenants/, проверяем принадлежность тенанту
|
||
if name.startswith("tenants/"):
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to open file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
raise RuntimeError(
|
||
f"Cannot open file - it belongs to a different tenant. "
|
||
f"Current tenant: {tenant_id}"
|
||
)
|
||
# Если путь уже содержит tenants/, используем его как есть
|
||
logger.debug(f"[Storage] _open: {name} (already has tenant prefix)")
|
||
return super()._open(name, mode)
|
||
|
||
# Иначе добавляем tenant_id перед открытием
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
logger.debug(f"[Storage] _open: {name} → {tenant_aware_name}")
|
||
return super()._open(tenant_aware_name, mode)
|
||
|
||
def path(self, name):
|
||
"""
|
||
Получить полный системный путь к файлу, добавив tenant_id если необходимо.
|
||
Используется для конвертации 'products/temp/image.jpg' в '/media/tenants/papa/products/temp/image.jpg'
|
||
|
||
Args:
|
||
name (str): Путь к файлу (может быть БЕЗ tenant_id)
|
||
|
||
Returns:
|
||
str: Полный системный путь к файлу
|
||
"""
|
||
# Получаем tenant_id
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# Если путь уже содержит tenants/, проверяем принадлежность тенанту
|
||
if name.startswith("tenants/"):
|
||
if not name.startswith(f"tenants/{tenant_id}/"):
|
||
logger.warning(
|
||
f"[Storage] Security: Attempted to get path for file from different tenant! "
|
||
f"Current tenant: {tenant_id}, file: {name}"
|
||
)
|
||
raise RuntimeError(
|
||
f"Cannot get path for file - it belongs to a different tenant. "
|
||
f"Current tenant: {tenant_id}"
|
||
)
|
||
# Если путь уже содержит tenants/, используем его как есть
|
||
logger.debug(f"[Storage] path: {name} (already has tenant prefix)")
|
||
return super().path(name)
|
||
|
||
# Иначе добавляем tenant_id перед получением пути
|
||
tenant_aware_name = self._get_tenant_path(name)
|
||
logger.debug(f"[Storage] path: {name} → {tenant_aware_name}")
|
||
return super().path(tenant_aware_name)
|