diff --git a/myproject/docker/entrypoint.sh b/myproject/docker/entrypoint.sh index a359174..263a8ec 100644 --- a/myproject/docker/entrypoint.sh +++ b/myproject/docker/entrypoint.sh @@ -214,11 +214,10 @@ case "$1" in wait_for_postgres wait_for_redis setup_directories - echo "Starting Celery Worker..." + echo "Starting Celery Worker for photo processing and product import..." exec celery -A myproject worker \ -l info \ - -Q celery,photo_processing \ - --concurrency=2 + --concurrency=4 ;; celery-beat) wait_for_postgres diff --git a/myproject/products/migrations/0002_productimportjob.py b/myproject/products/migrations/0002_productimportjob.py new file mode 100644 index 0000000..d474db6 --- /dev/null +++ b/myproject/products/migrations/0002_productimportjob.py @@ -0,0 +1,45 @@ +# Generated by Django 5.0.10 on 2026-01-06 03:40 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('products', '0001_initial'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='ProductImportJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('task_id', models.CharField(max_length=255, unique=True, verbose_name='ID задачи Celery')), + ('file_name', models.CharField(max_length=255, verbose_name='Имя файла')), + ('file_path', models.CharField(help_text='Временный путь для обработки', max_length=500, verbose_name='Путь к файлу')), + ('update_existing', models.BooleanField(default=False, verbose_name='Обновлять существующие')), + ('status', models.CharField(choices=[('pending', 'Ожидает'), ('processing', 'Обрабатывается'), ('completed', 'Завершён'), ('failed', 'Ошибка')], db_index=True, default='pending', max_length=20, verbose_name='Статус')), + ('total_rows', models.IntegerField(default=0, verbose_name='Всего строк')), + ('processed_rows', models.IntegerField(default=0, verbose_name='Обработано строк')), + ('created_count', models.IntegerField(default=0, verbose_name='Создано товаров')), + ('updated_count', models.IntegerField(default=0, verbose_name='Обновлено товаров')), + ('skipped_count', models.IntegerField(default=0, verbose_name='Пропущено')), + ('errors_count', models.IntegerField(default=0, verbose_name='Ошибок')), + ('errors_json', models.JSONField(blank=True, default=list, verbose_name='Детали ошибок')), + ('error_message', models.TextField(blank=True, verbose_name='Сообщение об ошибке')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='Создано')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='Обновлено')), + ('completed_at', models.DateTimeField(blank=True, null=True, verbose_name='Завершено')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='product_import_jobs', to=settings.AUTH_USER_MODEL, verbose_name='Пользователь')), + ], + options={ + 'verbose_name': 'Задача импорта товаров', + 'verbose_name_plural': 'Задачи импорта товаров', + 'ordering': ['-created_at'], + 'indexes': [models.Index(fields=['task_id'], name='products_pr_task_id_d8dc9f_idx'), models.Index(fields=['status', '-created_at'], name='products_pr_status_326f2c_idx'), models.Index(fields=['user', '-created_at'], name='products_pr_user_id_e32ca9_idx')], + }, + ), + ] diff --git a/myproject/products/models/__init__.py b/myproject/products/models/__init__.py index f7fb840..6fd455a 100644 --- a/myproject/products/models/__init__.py +++ b/myproject/products/models/__init__.py @@ -46,6 +46,9 @@ from .units import UnitOfMeasure, ProductSalesUnit # Фотографии from .photos import BasePhoto, ProductPhoto, ProductKitPhoto, ProductCategoryPhoto, PhotoProcessingStatus +# Задачи импорта +from .import_job import ProductImportJob + # Явно указываем, что экспортируется при импорте * __all__ = [ # Managers @@ -92,4 +95,7 @@ __all__ = [ 'ProductKitPhoto', 'ProductCategoryPhoto', 'PhotoProcessingStatus', + + # Import Jobs + 'ProductImportJob', ] diff --git a/myproject/products/models/base.py b/myproject/products/models/base.py index b4037bb..185bffa 100644 --- a/myproject/products/models/base.py +++ b/myproject/products/models/base.py @@ -53,6 +53,55 @@ class SKUCounter(models.Model): """ Получить следующее значение счетчика (thread-safe). Использует select_for_update для предотвращения race conditions. + + DEPRECATED: Используйте peek_next_value() + increment_counter() вместо этого метода. + Этот метод инкрементирует счётчик немедленно, что может привести к пропускам номеров + при ошибках сохранения объекта. + """ + with transaction.atomic(): + counter, created = cls.objects.select_for_update().get_or_create( + counter_type=counter_type, + defaults={'current_value': 0} + ) + counter.current_value += 1 + counter.save() + return counter.current_value + + @classmethod + def peek_next_value(cls, counter_type): + """ + FIX: SKU counter bug - increment only after successful save + + Получить следующее значение счетчика БЕЗ инкремента (thread-safe). + Используется для генерации SKU перед сохранением объекта. + Фактический инкремент выполняется в post_save сигнале после успешного создания. + + Args: + counter_type: Тип счётчика ('product', 'kit', 'category', 'configurable') + + Returns: + int: Следующее значение счётчика (current_value + 1) + """ + with transaction.atomic(): + counter, created = cls.objects.select_for_update().get_or_create( + counter_type=counter_type, + defaults={'current_value': 0} + ) + return counter.current_value + 1 + + @classmethod + def increment_counter(cls, counter_type): + """ + FIX: SKU counter bug - increment only after successful save + + Инкрементировать счётчик (thread-safe). + Вызывается в post_save сигнале после успешного создания объекта с автогенерированным SKU. + + Args: + counter_type: Тип счётчика ('product', 'kit', 'category', 'configurable') + + Returns: + int: Новое значение счётчика после инкремента """ with transaction.atomic(): counter, created = cls.objects.select_for_update().get_or_create( diff --git a/myproject/products/models/import_job.py b/myproject/products/models/import_job.py new file mode 100644 index 0000000..5dfbeaa --- /dev/null +++ b/myproject/products/models/import_job.py @@ -0,0 +1,142 @@ +""" +Модель для отслеживания задач импорта товаров. +""" +from django.db import models +from django.contrib.auth import get_user_model + +User = get_user_model() + + +class ProductImportJob(models.Model): + """ + Задача импорта товаров через Celery. + Отслеживает прогресс и результаты импорта. + """ + STATUS_CHOICES = [ + ('pending', 'Ожидает'), + ('processing', 'Обрабатывается'), + ('completed', 'Завершён'), + ('failed', 'Ошибка'), + ] + + # Celery task ID + task_id = models.CharField( + max_length=255, + unique=True, + verbose_name="ID задачи Celery" + ) + + # Пользователь + user = models.ForeignKey( + User, + on_delete=models.CASCADE, + related_name='product_import_jobs', + verbose_name="Пользователь" + ) + + # Информация о файле + file_name = models.CharField( + max_length=255, + verbose_name="Имя файла" + ) + file_path = models.CharField( + max_length=500, + verbose_name="Путь к файлу", + help_text="Временный путь для обработки" + ) + + # Параметры импорта + update_existing = models.BooleanField( + default=False, + verbose_name="Обновлять существующие" + ) + + # Статус + status = models.CharField( + max_length=20, + choices=STATUS_CHOICES, + default='pending', + db_index=True, + verbose_name="Статус" + ) + + # Прогресс + total_rows = models.IntegerField( + default=0, + verbose_name="Всего строк" + ) + processed_rows = models.IntegerField( + default=0, + verbose_name="Обработано строк" + ) + + # Результаты + created_count = models.IntegerField( + default=0, + verbose_name="Создано товаров" + ) + updated_count = models.IntegerField( + default=0, + verbose_name="Обновлено товаров" + ) + skipped_count = models.IntegerField( + default=0, + verbose_name="Пропущено" + ) + errors_count = models.IntegerField( + default=0, + verbose_name="Ошибок" + ) + + # Детали ошибок (JSON) + errors_json = models.JSONField( + default=list, + blank=True, + verbose_name="Детали ошибок" + ) + + # Сообщение об ошибке (если task упал) + error_message = models.TextField( + blank=True, + verbose_name="Сообщение об ошибке" + ) + + # Временные метки + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name="Создано" + ) + updated_at = models.DateTimeField( + auto_now=True, + verbose_name="Обновлено" + ) + completed_at = models.DateTimeField( + null=True, + blank=True, + verbose_name="Завершено" + ) + + class Meta: + verbose_name = "Задача импорта товаров" + verbose_name_plural = "Задачи импорта товаров" + ordering = ['-created_at'] + indexes = [ + models.Index(fields=['task_id']), + models.Index(fields=['status', '-created_at']), + models.Index(fields=['user', '-created_at']), + ] + + def __str__(self): + return f"Импорт {self.file_name} ({self.get_status_display()})" + + @property + def progress_percent(self): + """Процент выполнения""" + if self.total_rows == 0: + return 0 + return int((self.processed_rows / self.total_rows) * 100) + + @property + def is_finished(self): + """Завершена ли задача (успешно или с ошибкой)""" + return self.status in ['completed', 'failed'] diff --git a/myproject/products/services/import_export.py b/myproject/products/services/import_export.py new file mode 100644 index 0000000..8201f12 --- /dev/null +++ b/myproject/products/services/import_export.py @@ -0,0 +1,616 @@ +""" +Сервис для импорта и экспорта товаров. + +Этот модуль содержит логику импорта/экспорта товаров в различных форматах (CSV, Excel). +Разделение на отдельный модуль улучшает организацию кода и следует принципам SRP. +""" +import csv +import io +from decimal import Decimal, InvalidOperation +from django.utils import timezone +from ..models import Product + +try: + from openpyxl import load_workbook +except ImportError: + load_workbook = None + +import re + + +class ProductImporter: + """ + Универсальный импорт товаров из CSV/XLSX. + + Поддерживаемые форматы: + - CSV (UTF-8, заголовок в первой строке) + - XLSX (первая строка — заголовки) + + Алгоритм: + - Читаем файл → получаем headers и list[dict] строк + - По headers строим маппинг на поля Product + - Для каждой строки: + - пропускаем полностью пустые строки + - ищем товар по sku, потом по name + - если update_existing=False и товар найден → пропуск + - если update_existing=True → обновляем найденного + - если не найден → создаём нового + - Вся валидация делается через Product.full_clean() + """ + + FIELD_ALIASES = { + "name": ["name", "название", "наименование", "товар", "продукт", "имя"], + "sku": ["sku", "артикул", "код", "code", "article"], + "price": ["price", "цена", "ценапродажи", "стоимость", "pricesale"], + "description": ["description", "описание", "desc"], + "short_description": ["shortdescription", "краткоеописание", "короткоеописание", "краткое"], + "unit": ["unit", "единица", "ед", "едизм", "единицаизмерения"], + "cost_price": ["costprice", "себестоимость", "закупочнаяцена", "cost"], + "sale_price": ["saleprice", "ценасоскидкой", "скидка", "discount", "discountprice"], + "images": ["images", "изображения", "фото", "картинки", "photos", "pictures", "image"], + } + + def __init__(self): + self.errors = [] + self.success_count = 0 + self.update_count = 0 + self.skip_count = 0 + # Сохраняем исходные данные для генерации error-файла + self.original_headers = [] + self.original_rows = [] + self.file_format = None + self.real_errors = [] + # Для Celery: собираем задачи загрузки фото + self.photo_tasks = [] + + def import_from_file(self, file, update_existing: bool = False, progress_callback=None, skip_images: bool = False, schema_name: str = None) -> dict: + """ + Импорт товаров из загруженного файла. + + Args: + file: UploadedFile + update_existing: обновлять ли существующих товаров (по sku/name) + progress_callback: callback для обновления прогресса (current, total, created, updated, skipped, errors) + skip_images: пропустить загрузку фото (для Celery) + schema_name: схема тенанта (для Celery задач фото) + + Returns: + dict: результат импорта + """ + file_format = self._detect_format(file) + + if file_format is None: + return { + "success": False, + "message": "Неподдерживаемый формат файла. Ожидается CSV или XLSX.", + "created": 0, + "updated": 0, + "skipped": 0, + "errors": [{"row": None, "reason": "Unsupported file type"}], + } + + if file_format == "xlsx" and load_workbook is None: + return { + "success": False, + "message": "Для импорта XLSX необходим пакет openpyxl. Установите его и повторите попытку.", + "created": 0, + "updated": 0, + "skipped": 0, + "errors": [{"row": None, "reason": "openpyxl is not installed"}], + } + + # Сохраняем формат файла + self.file_format = file_format + + try: + if file_format == "csv": + headers, rows = self._read_csv(file) + else: + headers, rows = self._read_xlsx(file) + + # Сохраняем исходные данные + self.original_headers = headers + self.original_rows = rows + except Exception as exc: + return { + "success": False, + "message": f"Ошибка чтения файла: {exc}", + "created": 0, + "updated": 0, + "skipped": 0, + "errors": [{"row": None, "reason": str(exc)}], + } + + if not headers: + return { + "success": False, + "message": "В файле не найдены заголовки.", + "created": 0, + "updated": 0, + "skipped": 0, + "errors": [{"row": None, "reason": "Empty header row"}], + } + + mapping = self._build_mapping(headers) + + # Минимальное требование: name ИЛИ sku + if not any(field in mapping for field in ("name", "sku")): + return { + "success": False, + "message": "Не удалось сопоставить обязательные поля (название или артикул).", + "created": 0, + "updated": 0, + "skipped": len(rows), + "errors": [ + { + "row": None, + "reason": "No required fields (name/sku) mapped from headers", + } + ], + } + + for index, row in enumerate(rows, start=2): # первая строка — заголовки + self._process_row(index, row, mapping, update_existing, skip_images, schema_name) + + # Обновляем прогресс (если есть callback) + if progress_callback: + progress_callback( + current=index - 1, # текущая строка + total=len(rows), + created=self.success_count, + updated=self.update_count, + skipped=self.skip_count, + errors=self.errors + ) + + total_errors = len(self.errors) + real_error_count = len(self.real_errors) + success = (self.success_count + self.update_count) > 0 + + if success and total_errors == 0: + message = "Импорт завершён успешно." + elif success and total_errors > 0: + message = "Импорт завершён с ошибками." + else: + message = "Не удалось импортировать данные." + + return { + "success": success, + "message": message, + "created": self.success_count, + "updated": self.update_count, + "skipped": self.skip_count, + "errors": self.errors, + "real_errors": self.real_errors, + "real_error_count": real_error_count, + "photo_tasks": self.photo_tasks, # Для Celery + } + + def _detect_format(self, file) -> str | None: + name = (getattr(file, "name", None) or "").lower() + if name.endswith(".csv"): + return "csv" + if name.endswith(".xlsx") or name.endswith(".xls"): + return "xlsx" + return None + + def _read_csv(self, file): + file.seek(0) + raw = file.read() + if isinstance(raw, bytes): + text = raw.decode("utf-8-sig") + else: + text = raw + f = io.StringIO(text) + reader = csv.DictReader(f) + headers = reader.fieldnames or [] + rows = list(reader) + return headers, rows + + def _read_xlsx(self, file): + file.seek(0) + wb = load_workbook(file, read_only=True, data_only=True) + ws = wb.active + + headers = [] + rows = [] + first_row = True + + for row in ws.iter_rows(values_only=True): + if first_row: + headers = [str(v).strip() if v is not None else "" for v in row] + first_row = False + continue + + if not any(row): + continue + + row_dict = {} + for idx, value in enumerate(row): + if idx < len(headers): + header = headers[idx] or f"col_{idx}" + row_dict[header] = value + rows.append(row_dict) + + return headers, rows + + def _normalize_header(self, header: str) -> str: + if header is None: + return "" + cleaned = "".join(ch for ch in str(header).strip().lower() if ch.isalnum()) + return cleaned + + def _build_mapping(self, headers): + mapping = {} + normalized_aliases = { + field: {self._normalize_header(a) for a in aliases} + for field, aliases in self.FIELD_ALIASES.items() + } + + for header in headers: + norm = self._normalize_header(header) + if not norm: + continue + for field, alias_set in normalized_aliases.items(): + if norm in alias_set and field not in mapping: + mapping[field] = header + break + + return mapping + + def _clean_value(self, value): + if value is None: + return "" + return str(value).strip() + + def _parse_decimal(self, value: str) -> Decimal | None: + """ + Парсинг decimal значения (цена, себестоимость). + Обрабатывает разные форматы: запятая/точка, пробелы. + """ + if not value: + return None + + # Убираем пробелы + value = value.replace(' ', '') + + # Заменяем запятую на точку + value = value.replace(',', '.') + + try: + return Decimal(value) + except (InvalidOperation, ValueError): + return None + + def _process_row(self, row_number: int, row: dict, mapping: dict, update_existing: bool, skip_images: bool = False, schema_name: str = None): + name = self._clean_value(row.get(mapping.get("name", ""), "")) + sku = self._clean_value(row.get(mapping.get("sku", ""), "")) + price_raw = self._clean_value(row.get(mapping.get("price", ""), "")) + description = self._clean_value(row.get(mapping.get("description", ""), "")) + short_description = self._clean_value(row.get(mapping.get("short_description", ""), "")) + unit = self._clean_value(row.get(mapping.get("unit", ""), "")) + cost_price_raw = self._clean_value(row.get(mapping.get("cost_price", ""), "")) + sale_price_raw = self._clean_value(row.get(mapping.get("sale_price", ""), "")) + images_raw = self._clean_value(row.get(mapping.get("images", ""), "")) + + # Пропускаем полностью пустые строки + if not any([name, sku, price_raw, description]): + self.skip_count += 1 + return + + # Минимальное требование: name ИЛИ sku + if not name and not sku: + self.skip_count += 1 + error_record = { + "row": row_number, + "name": name or None, + "sku": sku or None, + "reason": "Требуется хотя бы одно: название или артикул", + } + self.errors.append(error_record) + self.real_errors.append(error_record) + return + + # Парсим цены + price = self._parse_decimal(price_raw) + cost_price = self._parse_decimal(cost_price_raw) + sale_price = self._parse_decimal(sale_price_raw) + + # Цена обязательна + if price is None: + self.skip_count += 1 + error_record = { + "row": row_number, + "name": name or None, + "sku": sku or None, + "reason": "Требуется корректная цена (price)", + } + self.errors.append(error_record) + self.real_errors.append(error_record) + return + + # Единица измерения по умолчанию + if not unit: + unit = 'шт' + + # Валидация единицы измерения + valid_units = [choice[0] for choice in Product.UNIT_CHOICES] + if unit not in valid_units: + unit = 'шт' # fallback + + # Пытаемся найти существующего товара + existing = None + if sku: + existing = Product.objects.filter(sku=sku, status='active').first() + if existing is None and name: + existing = Product.objects.filter(name=name, status='active').first() + + if existing and not update_existing: + self.skip_count += 1 + self.errors.append({ + "row": row_number, + "name": name or None, + "sku": sku or None, + "reason": "Товар с таким артикулом/названием уже существует, обновление отключено.", + }) + return + + if existing and update_existing: + # Обновление существующего товара + if name: + existing.name = name + if sku: + existing.sku = sku + if description: + existing.description = description + if short_description: + existing.short_description = short_description + if unit: + existing.unit = unit + + existing.price = price + if cost_price is not None: + existing.cost_price = cost_price + if sale_price is not None: + existing.sale_price = sale_price + + try: + existing.full_clean() + existing.save() + self.update_count += 1 + + # Обрабатываем изображения (если есть) + if images_raw: + if skip_images: + # Для Celery: собираем задачи загрузки фото + self._collect_photo_tasks(existing, images_raw) + else: + # Синхронно загружаем фото + self._process_product_images(existing, images_raw) + + except Exception as exc: + self.skip_count += 1 + error_record = { + "row": row_number, + "name": name or None, + "sku": sku or None, + "reason": str(exc), + } + self.errors.append(error_record) + self.real_errors.append(error_record) + return + + # Создание нового товара + product = Product( + name=name or f"Товар {sku}", # fallback если нет имени + sku=sku or None, + description=description or "", + short_description=short_description or "", + unit=unit, + price=price, + cost_price=cost_price or 0, # Устанавливаем 0 вместо None (для CostPriceHistory) + sale_price=sale_price, + status='active', + ) + + try: + product.full_clean() + product.save() + self.success_count += 1 + + # Обрабатываем изображения (если есть) + if images_raw: + if skip_images: + # Для Celery: собираем задачи загрузки фото + self._collect_photo_tasks(product, images_raw) + else: + # Синхронно загружаем фото + self._process_product_images(product, images_raw) + + except Exception as exc: + self.skip_count += 1 + error_record = { + "row": row_number, + "name": name or None, + "sku": sku or None, + "reason": str(exc), + } + self.errors.append(error_record) + self.real_errors.append(error_record) + + def _collect_photo_tasks(self, product, images_raw: str): + """ + Собираем задачи загрузки фото для параллельной обработки через Celery. + + Args: + product: Объект Product + images_raw: Строка с URL изображений + """ + # Разбиваем на отдельные URL + urls = [] + for line in images_raw.split('\n'): + line = line.strip() + if not line: + continue + # Если в строке несколько URL через запятую + for url in line.split(','): + url = url.strip() + if url.startswith('http'): + urls.append(url) + + if not urls: + return + + # Создаём задачи для каждого URL + for idx, url in enumerate(urls): + # FIX: ProductPhoto не имеет is_main, используется только order + task = { + 'product_id': product.id, + 'url': url, + 'order': idx, # Первое фото (order=0) автоматически главное + } + self.photo_tasks.append(task) + + def _process_product_images(self, product, images_raw: str): + """ + Обработка изображений товара из URL. + + Поддерживаемые форматы: + - Один URL: https://example.com/image.jpg + - Несколько URL через перенос строки или запятую + + Args: + product: Объект Product + images_raw: Строка с URL изображений + """ + import requests + from django.core.files.base import ContentFile + from ..models import ProductPhoto + import urllib.parse + + # Разбиваем на отдельные URL + urls = [] + for line in images_raw.split('\n'): + line = line.strip() + if not line: + continue + # Если в строке несколько URL через запятую + for url in line.split(','): + url = url.strip() + if url.startswith('http'): + urls.append(url) + + if not urls: + return + + # Скачиваем и сохраняем каждое изображение + for idx, url in enumerate(urls): + try: + # Скачиваем изображение + response = requests.get(url, timeout=10) + response.raise_for_status() + + # Получаем имя файла из URL + parsed_url = urllib.parse.urlparse(url) + filename = parsed_url.path.split('/')[-1] + + # Создаём ProductPhoto + # FIX: ProductPhoto не имеет is_main, используется только order + photo = ProductPhoto( + product=product, + order=idx # Первое фото (order=0) автоматически главное + ) + + # Сохраняем файл + photo.image.save( + filename, + ContentFile(response.content), + save=True + ) + + except Exception as e: + # Игнорируем ошибки загрузки изображений + # чтобы не прерывать импорт товара + continue + + def generate_error_file(self) -> tuple[bytes, str] | None: + """ + Генерирует файл с ошибочными строками. + + Возвращает тот же формат, что был загружен (CSV или XLSX). + Добавляет колонку "Ошибка" с описанием проблемы. + + Returns: + tuple[bytes, str]: (file_content, filename) или None если нет ошибок + """ + if not self.real_errors or not self.original_headers: + return None + + # Создаём mapping row_number -> error + error_map = {err['row']: err for err in self.real_errors if err.get('row')} + + if not error_map: + return None + + # Собираем ошибочные строки + error_rows = [] + for index, row in enumerate(self.original_rows, start=2): + if index in error_map: + error_info = error_map[index] + row_with_error = dict(row) + row_with_error['Ошибка'] = error_info['reason'] + error_rows.append(row_with_error) + + if not error_rows: + return None + + headers_with_error = list(self.original_headers) + ['Ошибка'] + timestamp = timezone.now().strftime("%Y%m%d_%H%M%S") + + if self.file_format == 'csv': + return self._generate_csv_error_file(headers_with_error, error_rows, timestamp) + else: + return self._generate_xlsx_error_file(headers_with_error, error_rows, timestamp) + + def _generate_csv_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str]: + """Генерирует CSV файл с ошибками.""" + output = io.StringIO() + output.write('\ufeff') # BOM для Excel + + writer = csv.DictWriter(output, fieldnames=headers, extrasaction='ignore') + writer.writeheader() + writer.writerows(rows) + + content = output.getvalue().encode('utf-8') + filename = f'product_import_errors_{timestamp}.csv' + + return content, filename + + def _generate_xlsx_error_file(self, headers: list, rows: list[dict], timestamp: str) -> tuple[bytes, str] | None: + """Генерирует XLSX файл с ошибками.""" + if load_workbook is None: + return self._generate_csv_error_file(headers, rows, timestamp) + + try: + from openpyxl import Workbook + + wb = Workbook() + ws = wb.active + ws.title = "Ошибки импорта" + + ws.append(headers) + + for row_dict in rows: + row_data = [row_dict.get(h, '') for h in headers] + ws.append(row_data) + + output = io.BytesIO() + wb.save(output) + output.seek(0) + + content = output.read() + filename = f'product_import_errors_{timestamp}.xlsx' + + return content, filename + except Exception: + return self._generate_csv_error_file(headers, rows, timestamp) diff --git a/myproject/products/signals.py b/myproject/products/signals.py index dfa33cf..39a6e92 100644 --- a/myproject/products/signals.py +++ b/myproject/products/signals.py @@ -2,13 +2,15 @@ Signals для приложения products. Логирует изменения себестоимости товара через CostPriceHistory. +FIX: SKU counter bug - инкрементирует счётчик после успешного создания Product. """ from django.db.models.signals import post_save from django.dispatch import receiver from django.db import models +import re -from .models import Product, CostPriceHistory +from .models import Product, CostPriceHistory, SKUCounter @receiver(post_save, sender=Product) @@ -48,3 +50,24 @@ def log_cost_price_changes(sender, instance, created, **kwargs): reason='recalculation', notes='Себестоимость пересчитана на основе партий товара' ) + + +@receiver(post_save, sender=Product) +def increment_sku_counter_after_save(sender, instance, created, **kwargs): + """ + FIX: SKU counter bug - increment only after successful save + + Инкрементирует счётчик SKU ПОСЛЕ успешного создания товара с автогенерированным артикулом. + + Это предотвращает пропуски номеров при ошибках сохранения (например, валидация cost_price). + Счётчик инкрементируется только если: + - Товар только что создан (created=True) + - SKU соответствует автогенерированному формату (PROD-XXXXXX) + """ + if not created: + return + + # Проверяем что SKU был автогенерирован (формат PROD-XXXXXX) + if instance.sku and re.match(r'^PROD-\d{6}$', instance.sku): + # Инкрементируем счётчик product + SKUCounter.increment_counter('product') diff --git a/myproject/products/tasks.py b/myproject/products/tasks.py index d7f5ab0..adecf7a 100644 --- a/myproject/products/tasks.py +++ b/myproject/products/tasks.py @@ -371,3 +371,251 @@ def cleanup_temp_media_all(ttl_hours=None): except Exception as exc: logger.error(f"[CleanupAll] Failed to schedule: {exc}", exc_info=True) return {'status': 'error', 'error': str(exc)} + + +# ============================================================================ +# ИМПОРТ ТОВАРОВ +# ============================================================================ + +@shared_task( + bind=True, + name='products.tasks.import_products_async', + max_retries=0, # Не повторяем автоматически + time_limit=3600, # 1 час максимум +) +def import_products_async(self, job_id, file_path, update_existing, schema_name): + """ + Асинхронный импорт товаров из CSV/XLSX. + + Алгоритм: + 1. Читаем файл и парсим заголовки + 2. Для каждой строки: + - Создаём/обновляем товар (без фото) + - Собираем URL фото для параллельной загрузки + - Обновляем прогресс в ProductImportJob + 3. Запускаем параллельную загрузку всех фото (group task) + 4. Удаляем временный файл + + Args: + job_id: ID ProductImportJob + file_path: Путь к загруженному файлу + update_existing: Обновлять ли существующие товары + schema_name: Схема тенанта + + Returns: + dict: Результат импорта + """ + from .services.import_export import ProductImporter + from .models import ProductImportJob + from celery import group + import os + + try: + # Активируем схему тенанта + connection.set_schema(schema_name) + logger.info(f"[Import] Activated schema: {schema_name} for job {job_id}") + + # Загружаем задачу + job = ProductImportJob.objects.get(id=job_id) + job.status = 'processing' + job.save(update_fields=['status']) + + # Открываем файл + if not default_storage.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + with default_storage.open(file_path, 'rb') as file: + # Создаём importer с callback для прогресса + importer = ProductImporter() + + # Callback для обновления прогресса + def progress_callback(current, total, created, updated, skipped, errors): + job.refresh_from_db() + job.processed_rows = current + job.total_rows = total + job.created_count = created + job.updated_count = updated + job.skipped_count = skipped + job.errors_count = len(errors) + job.save(update_fields=[ + 'processed_rows', 'total_rows', 'created_count', + 'updated_count', 'skipped_count', 'errors_count' + ]) + + # Запускаем импорт (без фото, они загрузятся параллельно) + result = importer.import_from_file( + file=file, + update_existing=update_existing, + progress_callback=progress_callback, + skip_images=True, # Не загружаем фото синхронно + schema_name=schema_name # Передаём для задач фото + ) + + # Обновляем результаты + job.refresh_from_db() + job.status = 'completed' + job.created_count = result['created'] + job.updated_count = result['updated'] + job.skipped_count = result['skipped'] + job.errors_count = result.get('real_error_count', 0) + job.errors_json = result.get('real_errors', []) + job.completed_at = timezone.now() + job.save() + + # Удаляем временный файл + try: + if default_storage.exists(file_path): + default_storage.delete(file_path) + logger.info(f"[Import] Deleted temp file: {file_path}") + except Exception as del_exc: + logger.warning(f"[Import] Could not delete temp file: {del_exc}") + + # Запускаем параллельную загрузку фото (если есть) + if result.get('photo_tasks'): + photo_tasks = result['photo_tasks'] + logger.info(f"[Import] Starting parallel download of {len(photo_tasks)} photos") + + # FIX: is_main удалён из сигнатуры download_product_photo_async + # Создаём группу задач для параллельной загрузки + photo_group = group( + download_product_photo_async.s( + product_id=task['product_id'], + image_url=task['url'], + order=task['order'], + schema_name=schema_name + ) + for task in photo_tasks + ) + + # Запускаем группу асинхронно + photo_group.apply_async() + logger.info(f"[Import] Photo download tasks submitted") + + logger.info(f"[Import] Job {job_id} completed successfully: " + f"created={result['created']}, updated={result['updated']}, " + f"skipped={result['skipped']}, errors={result.get('real_error_count', 0)}") + + return { + 'status': 'success', + 'job_id': job_id, + 'result': result + } + + except ProductImportJob.DoesNotExist: + logger.error(f"[Import] Job {job_id} not found in schema {schema_name}") + return {'status': 'error', 'reason': 'job_not_found'} + + except Exception as exc: + logger.error(f"[Import] Job {job_id} failed: {exc}", exc_info=True) + + # Обновляем статус задачи + try: + connection.set_schema(schema_name) + job = ProductImportJob.objects.get(id=job_id) + job.status = 'failed' + job.error_message = str(exc) + job.completed_at = timezone.now() + job.save() + except Exception as save_exc: + logger.error(f"[Import] Could not update job status: {save_exc}") + + # Удаляем временный файл + try: + if default_storage.exists(file_path): + default_storage.delete(file_path) + except Exception: + pass + + return { + 'status': 'error', + 'job_id': job_id, + 'error': str(exc) + } + + +@shared_task( + bind=True, + name='products.tasks.download_product_photo_async', + max_retries=3, + default_retry_delay=10, +) +def download_product_photo_async(self, product_id, image_url, order, schema_name): + """ + Загрузка одного фото товара по URL. + Запускается параллельно для всех фото. + + FIX: ProductPhoto не имеет is_main, главное фото определяется по order=0 + + Args: + product_id: ID товара + image_url: URL изображения + order: Порядок фото (первое фото order=0 считается главным) + schema_name: Схема тенанта + + Returns: + dict: Результат загрузки + """ + import requests + from django.core.files.base import ContentFile + from .models import Product, ProductPhoto + import urllib.parse + + try: + # Активируем схему + connection.set_schema(schema_name) + + # Загружаем товар + product = Product.objects.get(id=product_id) + + # Скачиваем изображение + response = requests.get(image_url, timeout=30) + response.raise_for_status() + + # Получаем имя файла + parsed_url = urllib.parse.urlparse(image_url) + filename = parsed_url.path.split('/')[-1] + + # Создаём ProductPhoto + # FIX: ProductPhoto не имеет поля is_main, используется только order + # Первое фото (order=0) автоматически считается главным + photo = ProductPhoto( + product=product, + order=order # is_main удалён — используется order для определения главного фото + ) + + # Сохраняем файл + # ВАЖНО: use_async=False чтобы НЕ запускать дополнительную Celery задачу + # Обработка будет выполнена синхронно в текущей задаче + photo.image.save( + filename, + ContentFile(response.content), + save=False # Не сохраняем в БД пока + ) + photo.save(use_async=False) # Синхронная обработка через ImageProcessor + + logger.info(f"[PhotoDownload] Downloaded photo for product {product_id}: {image_url}") + + return { + 'status': 'success', + 'product_id': product_id, + 'photo_id': photo.id, + 'url': image_url + } + + except Product.DoesNotExist: + logger.error(f"[PhotoDownload] Product {product_id} not found in {schema_name}") + return {'status': 'error', 'reason': 'product_not_found'} + + except requests.RequestException as exc: + logger.warning(f"[PhotoDownload] Failed to download {image_url}: {exc}") + + # Повторяем при ошибках сети + try: + raise self.retry(exc=exc, countdown=10) + except self.MaxRetriesExceededError: + logger.error(f"[PhotoDownload] Max retries exceeded for {image_url}") + return {'status': 'error', 'reason': 'max_retries', 'url': image_url} + + except Exception as exc: + logger.error(f"[PhotoDownload] Unexpected error for {image_url}: {exc}", exc_info=True) + return {'status': 'error', 'reason': 'unexpected', 'url': image_url, 'error': str(exc)} diff --git a/myproject/products/templates/products/product_import.html b/myproject/products/templates/products/product_import.html new file mode 100644 index 0000000..06d1a5a --- /dev/null +++ b/myproject/products/templates/products/product_import.html @@ -0,0 +1,158 @@ +{% extends "base.html" %} +{% load static %} + +{% block title %}Импорт товаров{% endblock %} + +{% block content %} +
Поддерживаемые форматы: CSV, XLSX
+Обязательные колонки:
+Название или Артикул (хотя бы одно)Цена (обязательно для новых товаров)Опциональные колонки:
+ОписаниеКраткое описаниеЕдиница (шт, м, г, л, кг)СебестоимостьЦена со скидкойИзображения (URL изображений, каждый с новой строки)+ Система автоматически распознает колонки на русском и английском языках. +
+| Строка | +Название | +Артикул | +Причина | +
|---|---|---|---|
| {{ error.row|default:"-" }} | +{{ error.name|default:"-" }} | +{{ error.sku|default:"-" }} | +{{ error.reason }} | +
Название,Цена,Артикул,Описание,Единица,Себестоимость,Цена со скидкой
+Роза красная 50см,150.00,R-001,Красивая роза,шт,80.00,
+Лента атласная,50.00,L-001,,м,,45.00
+Упаковка крафт,30.00,P-001,Крафт-бумага,шт,15.00,
+