""" Custom FileSystemStorage с поддержкой мультитенантности. Автоматически добавляет tenant_id в путь сохранения файлов для изоляции файлов между тенантами. Структура пути: media/tenants/{tenant_id}/products/{entity_id}/{photo_id}/{size}.ext media/tenants/{tenant_id}/kits/{entity_id}/{photo_id}/{size}.ext media/tenants/{tenant_id}/categories/{entity_id}/{photo_id}/{size}.ext """ import logging from django.core.files.storage import FileSystemStorage from django.db import connection logger = logging.getLogger(__name__) class TenantAwareFileSystemStorage(FileSystemStorage): """ Расширение стандартного FileSystemStorage для поддержки мультитенантности. Автоматически добавляет tenant_id в путь сохранения файлов. """ def _get_tenant_id(self): """ Получить ID текущего тенанта из контекста django-tenants. Returns: int or str: ID тенанта Raises: RuntimeError: Если не удается определить тенант """ try: # Получаем текущую схему (для django-tenants) schema_name = connection.schema_name # Если это публичная схема, это ошибка при попытке загрузить фото if schema_name == 'public': raise RuntimeError( "Cannot determine tenant ID - working in 'public' schema. " "File uploads are only allowed in tenant schemas." ) # Парсим schema_name для получения tenant_id # Стандартные форматы: # - 'tenant_' (продакшен) # - 'test' или '' (для тестов и других схем) if schema_name.startswith('tenant_'): tenant_id = schema_name.replace('tenant_', '') logger.debug(f"[Storage] Extracted tenant_id={tenant_id} from schema={schema_name}") return tenant_id else: # Используем schema_name как есть (для тестов, локальной разработки и т.д.) logger.debug(f"[Storage] Using schema_name as tenant_id: {schema_name}") return schema_name except Exception as e: logger.error(f"[Storage] Failed to get tenant_id: {str(e)}") raise RuntimeError(f"Failed to determine tenant context: {str(e)}") def _get_tenant_path(self, name): """ Добавить tenant_id в путь файла. Проверяет что путь еще не содержит tenant_id чтобы избежать двойного добавления. Args: name (str): Исходный путь (например, 'products/temp/image.jpg') Returns: str: Путь с tenant_id (например, 'tenants/1/products/temp/image.jpg') """ # Если путь уже содержит tenants/, не добавляем еще раз if name.startswith('tenants/'): logger.debug(f"[Storage] Path already has tenant prefix: {name}") return name tenant_id = self._get_tenant_id() return f"tenants/{tenant_id}/{name}" def get_available_name(self, name, max_length=None): """ Переопределяем для проверки уникальности с учетом tenant_id на диске, но возвращаем путь БЕЗ tenant_id для сохранения в БД. Диск: tenants/{tenant_id}/products/{id}/{photo_id}/file.ext БД: products/{id}/{photo_id}/file.ext Args: name (str): Исходное имя файла с путем max_length (int): Максимальная длина пути Returns: str: Путь БЕЗ tenant_id для сохранения в БД """ # Добавляем tenant_id для проверки на диске tenant_aware_name = self._get_tenant_path(name) logger.debug(f"[Storage] get_available_name: {name} → checking disk with: {tenant_aware_name}") # Вызываем родительский метод с tenant_id для проверки уникальности на диске available_tenant_aware = super().get_available_name(tenant_aware_name, max_length) # Но возвращаем путь БЕЗ tenant_id для сохранения в БД # Удаляем prefix 'tenants/{tenant_id}/' из пути available_name = available_tenant_aware if available_name.startswith('tenants/'): # Удаляем 'tenants/{tenant_id}/' из пути parts = available_name.split('/', 2) # Split на первые два / if len(parts) == 3: available_name = parts[2] logger.debug(f"[Storage] Stripped tenant prefix: {available_tenant_aware} → {available_name}") logger.debug(f"[Storage] get_available_name returns for DB: {available_name}") return available_name def _save(self, name, content): """ Переопределяем для добавления tenant_id в путь на диске, но сохраняем в БД путь БЕЗ tenant_id (для экономии и мобильности). Args: name (str): Имя файла content: Содержимое файла Returns: str: Путь БЕЗ tenant_id для сохранения в БД """ # Добавляем tenant_id в путь для сохранения на диск tenant_aware_name = self._get_tenant_path(name) logger.info(f"[Storage] _save: {name} → {tenant_aware_name} (DB will store: {name})") try: # Сохраняем файл на диск с tenant_id, но возвращаем исходный путь для БД # Django FileSystemStorage автоматически создаст директории если их нет saved_path = super()._save(tenant_aware_name, content) logger.info(f"[Storage] File saved successfully: {saved_path}") except Exception as e: logger.error(f"[Storage] Error saving file {tenant_aware_name}: {str(e)}", exc_info=True) raise # Возвращаем путь БЕЗ tenant_id для сохранения в БД # Это позволяет: # 1. На диске: tenants/{tenant_id}/products/{id}/{photo_id}/file.ext # 2. В БД: products/{id}/{photo_id}/file.ext return name def delete(self, name): """ Удалить файл, убедившись что он принадлежит текущему тенанту. Args: name (str): Путь к файлу (может быть БЕЗ tenant_id) """ # Получаем tenant_id для проверки tenant_id = self._get_tenant_id() # Если путь уже содержит tenants/, проверяем принадлежность тенанту if name.startswith("tenants/"): if not name.startswith(f"tenants/{tenant_id}/"): logger.warning( f"[Storage] Security: Attempted to delete file from different tenant! " f"Current tenant: {tenant_id}, file: {name}" ) raise RuntimeError( f"Cannot delete file - it belongs to a different tenant. " f"Current tenant: {tenant_id}" ) # Если путь уже содержит tenants/, удаляем его как есть logger.debug(f"[Storage] delete: {name} (already has tenant prefix)") return super().delete(name) # Иначе добавляем tenant_id перед удалением tenant_aware_name = self._get_tenant_path(name) logger.debug(f"[Storage] delete: {name} → {tenant_aware_name}") return super().delete(tenant_aware_name) def exists(self, name): """ Проверить существование файла, убедившись что он принадлежит текущему тенанту. Args: name (str): Путь к файлу Returns: bool: True если файл существует и принадлежит текущему тенанту """ # Получаем tenant_id tenant_id = self._get_tenant_id() logger.info(f"[Storage] exists called: name={name}, tenant_id={tenant_id}, schema={connection.schema_name}") # Если путь уже содержит tenants/, не добавляем еще раз if name.startswith("tenants/"): # Проверяем что файл принадлежит текущему тенанту if not name.startswith(f"tenants/{tenant_id}/"): logger.warning( f"[Storage] Security: Attempted to check file from different tenant! " f"Current tenant: {tenant_id}, file: {name}" ) return False result = super().exists(name) logger.info(f"[Storage] exists (with tenant): {name} → {result}") return result # Иначе добавляем tenant_id tenant_aware_name = self._get_tenant_path(name) # Получаем полный путь для отладки full_path = super().path(tenant_aware_name) result = super().exists(tenant_aware_name) logger.info(f"[Storage] exists: {name} → {tenant_aware_name} → {full_path} → {result}") # Дополнительная проверка через os.path.exists для отладки import os os_result = os.path.exists(full_path) if result != os_result: logger.warning(f"[Storage] Mismatch! storage.exists()={result}, os.path.exists()={os_result} for {full_path}") return result def url(self, name): """ Получить URL файла, убедившись что он принадлежит текущему тенанту. Args: name (str): Путь к файлу Returns: str: URL файла """ # Получаем tenant_id tenant_id = self._get_tenant_id() # Если путь уже содержит tenants/, проверяем принадлежность тенанту if name.startswith("tenants/"): if not name.startswith(f"tenants/{tenant_id}/"): logger.warning( f"[Storage] Security: Attempted to get URL for file from different tenant! " f"Current tenant: {tenant_id}, file: {name}" ) raise RuntimeError( f"Cannot get URL for file - it belongs to a different tenant. " f"Current tenant: {tenant_id}" ) return super().url(name) # Иначе добавляем tenant_id tenant_aware_name = self._get_tenant_path(name) return super().url(tenant_aware_name) def _open(self, name, mode='rb'): """ Открыть файл, добавив tenant_id в путь если необходимо. Это критически важно для Celery задач, которые читают файлы из БД. Когда ImageProcessor вызывает Image.open(photo_obj.image), Django вызывает это метод. БД содержит путь БЕЗ tenant_id (например, 'products/temp/image.jpg'), но файл находится на диске с tenant_id (tenants/{tenant_id}/products/temp/image.jpg). Args: name (str): Путь к файлу (может быть БЕЗ tenant_id) mode (str): Режим открытия файла Returns: File-like object """ # Получаем tenant_id tenant_id = self._get_tenant_id() # Если путь уже содержит tenants/, проверяем принадлежность тенанту if name.startswith("tenants/"): if not name.startswith(f"tenants/{tenant_id}/"): logger.warning( f"[Storage] Security: Attempted to open file from different tenant! " f"Current tenant: {tenant_id}, file: {name}" ) raise RuntimeError( f"Cannot open file - it belongs to a different tenant. " f"Current tenant: {tenant_id}" ) # Если путь уже содержит tenants/, используем его как есть logger.debug(f"[Storage] _open: {name} (already has tenant prefix)") return super()._open(name, mode) # Иначе добавляем tenant_id перед открытием tenant_aware_name = self._get_tenant_path(name) logger.debug(f"[Storage] _open: {name} → {tenant_aware_name}") return super()._open(tenant_aware_name, mode) def path(self, name): """ Получить полный системный путь к файлу, добавив tenant_id если необходимо. Используется для конвертации 'products/temp/image.jpg' в '/media/tenants/papa/products/temp/image.jpg' Args: name (str): Путь к файлу (может быть БЕЗ tenant_id) Returns: str: Полный системный путь к файлу """ # Получаем tenant_id tenant_id = self._get_tenant_id() # Если путь уже содержит tenants/, проверяем принадлежность тенанту if name.startswith("tenants/"): if not name.startswith(f"tenants/{tenant_id}/"): logger.warning( f"[Storage] Security: Attempted to get path for file from different tenant! " f"Current tenant: {tenant_id}, file: {name}" ) raise RuntimeError( f"Cannot get path for file - it belongs to a different tenant. " f"Current tenant: {tenant_id}" ) # Если путь уже содержит tenants/, используем его как есть logger.debug(f"[Storage] path: {name} (already has tenant prefix)") return super().path(name) # Иначе добавляем tenant_id перед получением пути tenant_aware_name = self._get_tenant_path(name) logger.debug(f"[Storage] path: {name} → {tenant_aware_name}") return super().path(tenant_aware_name)