Files
octopus/myproject/integrations/views.py

309 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from django.views.generic import TemplateView
from django.http import JsonResponse
from django.views.decorators.http import require_POST
from user_roles.mixins import OwnerRequiredMixin
from .models import RecommerceIntegration, WooCommerceIntegration
from integrations.recommerce.tasks import sync_products_batch_task
# Реестр доступных интеграций
# Ключ = идентификатор для URL/JS, значение = (модель, название для UI)
INTEGRATION_REGISTRY = {
'recommerce': (RecommerceIntegration, 'Recommerce', 'Маркетплейс'),
'woocommerce': (WooCommerceIntegration, 'WooCommerce', 'Маркетплейс'),
# Добавлять новые интеграции здесь:
# 'shopify': (ShopifyIntegration, 'Shopify', 'Маркетплейс'),
}
def get_integration_model(integration_id: str):
"""Получить модель интеграции по идентификатору"""
if integration_id not in INTEGRATION_REGISTRY:
return None
return INTEGRATION_REGISTRY[integration_id][0]
def get_all_integrations_status():
"""
Получить статус всех интеграций.
Возвращает dict с информацией для UI.
"""
result = {}
for key, (model, label, category) in INTEGRATION_REGISTRY.items():
instance = model.objects.first()
result[key] = {
'id': key,
'label': label,
'category': category,
'is_active': instance.is_active if instance else False,
'is_configured': instance.is_configured if instance else False,
'exists': instance is not None,
}
return result
class IntegrationsListView(OwnerRequiredMixin, TemplateView):
"""Страница настроек интеграций (доступна только владельцу)"""
template_name = "system_settings/integrations.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
integrations = get_all_integrations_status()
context['integrations'] = integrations
# JSON для JavaScript
context['integrations_json'] = json.dumps(integrations, ensure_ascii=False)
return context
@require_POST
def toggle_integration(request, integration_id: str):
"""
API endpoint для включения/выключения интеграции.
POST /integrations/toggle/<integration_id>/
Создаёт запись если не существует (singleton).
Переключает is_active.
"""
# Проверка прав (только владелец)
if not hasattr(request, 'user') or not request.user.is_authenticated:
return JsonResponse({'error': 'Unauthorized'}, status=401)
# Получить модель
model = get_integration_model(integration_id)
if not model:
return JsonResponse({'error': f'Unknown integration: {integration_id}'}, status=404)
# Получить или создать singleton
instance, created = model.objects.get_or_create(
defaults={
'name': INTEGRATION_REGISTRY[integration_id][1],
'is_active': False,
}
)
# Переключить состояние
new_state = not instance.is_active
# Проверка: нельзя включить ненастроенную интеграцию
if new_state and not instance.is_configured:
return JsonResponse({
'error': 'Сначала настройте интеграцию (введите credentials)',
'is_active': instance.is_active,
'is_configured': False,
}, status=400)
instance.is_active = new_state
instance.save(update_fields=['is_active', 'updated_at'])
return JsonResponse({
'success': True,
'is_active': instance.is_active,
'is_configured': instance.is_configured,
})
@require_POST
def test_integration_connection(request, integration_id: str):
"""
Тестировать соединение с интеграцией.
POST /integrations/test/<integration_id>/
"""
if not hasattr(request, 'user') or not request.user.is_authenticated:
return JsonResponse({'error': 'Unauthorized'}, status=401)
model = get_integration_model(integration_id)
if not model:
return JsonResponse({'error': f'Unknown integration: {integration_id}'}, status=404)
instance = model.objects.first()
if not instance:
return JsonResponse({'error': 'Интеграция не настроена'}, status=400)
if not instance.is_configured:
return JsonResponse({'error': 'Заполните настройки интеграции'}, status=400)
# Получить сервис для интеграции
service = get_integration_service(integration_id, instance)
if not service:
return JsonResponse({'error': 'Сервис не реализован'}, status=501)
# Выполнить тест
success, message = service.test_connection()
return JsonResponse({
'success': success,
'message': message,
})
def get_integration_service(integration_id: str, instance):
"""Получить сервис для интеграции"""
if integration_id == 'recommerce':
from .services.marketplaces.recommerce import RecommerceService
return RecommerceService(instance)
elif integration_id == 'woocommerce':
# TODO: WooCommerceService
return None
return None
class RecommerceBatchSyncView(TemplateView):
"""
API View для запуска массовой синхронизации с Recommerce.
POST /integrations/recommerce/sync/
"""
def dispatch(self, request, *args, **kwargs):
# Временное логирование для отладки
from user_roles.services import RoleService
import logging
logger = logging.getLogger(__name__)
logger.info(f"User: {request.user}, Authenticated: {request.user.is_authenticated}")
user_role = RoleService.get_user_role(request.user)
logger.info(f"User role: {user_role}")
return super().dispatch(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
try:
data = json.loads(request.body)
product_ids = data.get('product_ids', [])
options = data.get('options', {})
if not product_ids:
return JsonResponse({'error': 'No products selected'}, status=400)
# Запуск Celery задачи с передачей schema_name
from django_tenants.utils import get_tenant_model
Tenant = get_tenant_model()
schema_name = request.tenant.schema_name
task = sync_products_batch_task.delay(product_ids, options, schema_name)
return JsonResponse({
'success': True,
'task_id': task.id,
'message': f'Запущена синхронизация {len(product_ids)} товаров'
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
@require_POST
def save_integration_settings(request, integration_id: str):
"""
API endpoint для сохранения настроек интеграции.
POST /integrations/settings/<integration_id>/
Body: JSON с полями для обновления
"""
if not hasattr(request, 'user') or not request.user.is_authenticated:
return JsonResponse({'error': 'Unauthorized'}, status=401)
model = get_integration_model(integration_id)
if not model:
return JsonResponse({'error': f'Unknown integration: {integration_id}'}, status=404)
# Получить или создать
instance, created = model.objects.get_or_create(
defaults={'name': INTEGRATION_REGISTRY[integration_id][1]}
)
# Парсинг данных
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# Обновить только разрешённые поля (не is_active - для этого toggle)
allowed_fields = get_editable_fields(model)
updated_fields = []
for field in allowed_fields:
if field in data:
setattr(instance, field, data[field])
updated_fields.append(field)
if updated_fields:
updated_fields.append('updated_at')
instance.save(update_fields=updated_fields)
return JsonResponse({
'success': True,
'is_configured': instance.is_configured,
'updated_fields': updated_fields,
})
def get_editable_fields(model):
"""Получить список редактируемых полей модели (исключая служебные)"""
excluded = {'id', 'integration_type', 'is_active', 'created_at', 'updated_at', 'extra_config'}
fields = []
for field in model._meta.get_fields():
if hasattr(field, 'name') and field.name not in excluded:
if not field.is_relation: # Исключаем FK/M2M
fields.append(field.name)
return fields
def get_integration_form_data(request, integration_id: str):
"""
GET endpoint для получения текущих настроек интеграции.
Используется для заполнения формы справа.
"""
model = get_integration_model(integration_id)
if not model:
return JsonResponse({'error': f'Unknown integration: {integration_id}'}, status=404)
instance = model.objects.first()
if not instance:
# Вернуть пустую структуру полей
return JsonResponse({
'exists': False,
'fields': get_form_fields_meta(model),
})
# Собрать данные полей (без чувствительных данных полностью)
data = {}
for field_name in get_editable_fields(model):
field = model._meta.get_field(field_name)
value = getattr(instance, field_name, None)
# Маскировать секреты
if 'token' in field_name.lower() or 'secret' in field_name.lower() or 'key' in field_name.lower():
data[field_name] = '••••••••' if value else ''
else:
data[field_name] = value
return JsonResponse({
'exists': True,
'is_active': instance.is_active,
'is_configured': instance.is_configured,
'data': data,
'fields': get_form_fields_meta(model),
})
def get_form_fields_meta(model):
"""Получить метаданные полей для построения формы на фронте"""
fields = []
for field_name in get_editable_fields(model):
field = model._meta.get_field(field_name)
field_info = {
'name': field_name,
'label': getattr(field, 'verbose_name', field_name),
'help_text': getattr(field, 'help_text', ''),
'required': not getattr(field, 'blank', True),
'type': 'text', # default
}
# Определить тип поля
if isinstance(field, model._meta.get_field(field_name).__class__):
if 'BooleanField' in field.__class__.__name__:
field_info['type'] = 'checkbox'
elif 'URLField' in field.__class__.__name__:
field_info['type'] = 'url'
elif 'secret' in field_name.lower() or 'token' in field_name.lower() or 'key' in field_name.lower():
field_info['type'] = 'password'
fields.append(field_info)
return fields