# -*- coding: utf-8 -*- """ ТЕСТЫ ПАРАЛЛЕЛЬНЫХ ЗАКАЗОВ: Race Conditions и конкурентный доступ Проверяем корректность работы при одновременном создании заказов на один и тот же товар. ВАЖНО: Продажа "в минус" - это ФИЧА! - Система позволяет заказывать даже то, чего нет в наличии - quantity_free может быть отрицательным - это нормально - Цель тестов: обнаружить race conditions (дублирование, потерю данных), а не запретить продажу в минус """ import threading import time from decimal import Decimal from django.test import TransactionTestCase from django.db import connection from django.contrib.auth import get_user_model from django_tenants.utils import schema_context from tenants.models import Client, Domain from products.models import Product, ProductCategory from inventory.models import ( Warehouse, StockBatch, Stock, Reservation, Sale, SaleBatchAllocation ) from orders.models import Order, OrderItem, OrderStatus from customers.models import Customer User = get_user_model() class ConcurrentOrderTest(TransactionTestCase): """ TransactionTestCase для реальных транзакций БД. Не используем TestCase, т.к. он оборачивает тест в транзакцию, которая откатывается после теста. Для тестирования параллельности нужны реальные COMMIT'ы. """ # Отключаем автоматическую очистку через TRUNCATE # (она вызывает ошибки с foreign keys) serialized_rollback = False reset_sequences = False def _fixture_teardown(self): """Переопределяем, чтобы не было flush/truncate - делаем вручную в tearDown""" # Ничего не делаем - все удаляем вручную в tearDown pass @classmethod def setUpClass(cls): """Создаём тестовый тенант один раз для всех тестов""" super().setUpClass() # Создаём тестовый тенант cls.tenant = Client.objects.create( schema_name='test_concurrent', name='Test Concurrent Orders Tenant', is_active=True ) # Создаём домен Domain.objects.create( domain='test_concurrent.localhost', tenant=cls.tenant, is_primary=True ) @classmethod def tearDownClass(cls): """Удаляем тестовый тенант после всех тестов""" try: with connection.cursor() as cursor: cursor.execute(f'DROP SCHEMA IF EXISTS {cls.tenant.schema_name} CASCADE') except Exception: pass try: cls.tenant.delete() except Exception: pass super().tearDownClass() def setUp(self): """Подготовка перед каждым тестом""" with schema_context('test_concurrent'): # Создаём системные сущности self._create_system_entities() # Создаём тестовые данные self._create_test_data() def tearDown(self): """Очистка после каждого теста""" with schema_context('test_concurrent'): # Удаляем все тестовые данные вручную # (TransactionTestCase не откатывает транзакции автоматически) Sale.objects.all().delete() SaleBatchAllocation.objects.all().delete() Reservation.objects.all().delete() OrderItem.objects.all().delete() Order.objects.all().delete() Stock.objects.all().delete() StockBatch.objects.all().delete() Product.objects.all().delete() ProductCategory.objects.all().delete() OrderStatus.objects.all().delete() def _create_system_entities(self): """Создаёт системные сущности (статусы, склад, клиента)""" # Создаём статусы заказов self.status_draft = OrderStatus.objects.create( code='draft', name='Черновик', is_system=True, is_positive_end=False, is_negative_end=False, color='#9E9E9E', order=0 ) self.status_completed = OrderStatus.objects.create( code='completed', name='Выполнен', is_system=True, is_positive_end=True, is_negative_end=False, color='#4CAF50', order=50 ) self.status_cancelled = OrderStatus.objects.create( code='cancelled', name='Отменен', is_system=True, is_positive_end=False, is_negative_end=True, color='#F44336', order=70 ) # Создаём склад self.warehouse = Warehouse.objects.create( name='Основной склад' ) # Создаём системного клиента self.customer, _ = Customer.objects.get_or_create( phone='+375291111111', defaults={ 'name': 'Тестовый клиент', 'is_system_customer': False } ) def _create_test_data(self): """Создаёт тестовые товары и партии""" # Категория category = ProductCategory.objects.create( name='Тестовая категория', is_active=True ) # Товар self.product = Product.objects.create( name='Тестовый товар (concurrent)', sku='TEST-CONCURRENT-001', status='active', price=Decimal('10.00') ) self.product.categories.add(category) # Партия товара (начальное количество будет задаваться в тестах) # Создаём минимальную партию для начала self.stock_batch = StockBatch.objects.create( product=self.product, warehouse=self.warehouse, quantity=Decimal('100.00'), cost_price=Decimal('5.00') ) # Создаём Stock self.stock, _ = Stock.objects.get_or_create( product=self.product, warehouse=self.warehouse ) self.stock.refresh_from_batches() def _reset_stock_quantity(self, quantity): """Устанавливает количество товара на складе""" self.stock_batch.quantity = quantity self.stock_batch.save() self.stock.refresh_from_batches() # ==================== ТЕСТ 1: Два заказа параллельно резервируют одинаковое количество ==================== def test_01_two_orders_reserve_same_quantity_concurrently(self): """ ТЕСТ #1: Два заказа параллельно резервируют одинаковое количество Начальное состояние: quantity_available=10 Заказ A: резервирует 10 шт (параллельно) Заказ B: резервирует 10 шт (параллельно) Ожидаемое (с учётом "продажи в минус"): ✅ Оба заказа успешны ✅ quantity_reserved = 20 (корректная сумма резервов) ✅ quantity_free = 10 - 20 = -10 (нормально, это "в минус") ✅ НЕТ дублирования резервов ✅ НЕТ потери резервов Проверяем: - Количество Reservation = 2 (по одному на каждый заказ) - Каждый резерв имеет quantity=10 - Stock.quantity_reserved = 20 (сумма обоих) - НЕТ race condition в подсчёте quantity_reserved """ with schema_context('test_concurrent'): # Устанавливаем начальное количество self._reset_stock_quantity(Decimal('10.00')) # Результаты параллельного выполнения results = {'order_a': None, 'order_b': None, 'errors': []} barrier = threading.Barrier(2) # Синхронизация старта def create_order_a(): """Поток A: создаёт заказ на 10 шт""" try: with schema_context('test_concurrent'): barrier.wait() # Ждём второй поток # Устанавливаем уникальный номер заказа вручную # (чтобы избежать race condition в генерации номеров) order = Order( customer=self.customer, status=self.status_draft, total_amount=Decimal('100.00'), order_number=1001 # Уникальный номер для потока A ) order.save() OrderItem.objects.create( order=order, product=self.product, quantity=Decimal('10.00'), price=Decimal('10.00') ) results['order_a'] = order.id except Exception as e: results['errors'].append(('A', str(e))) def create_order_b(): """Поток B: создаёт заказ на 10 шт""" try: with schema_context('test_concurrent'): barrier.wait() # Ждём первый поток # Устанавливаем уникальный номер заказа вручную order = Order( customer=self.customer, status=self.status_draft, total_amount=Decimal('100.00'), order_number=1002 # Уникальный номер для потока B ) order.save() OrderItem.objects.create( order=order, product=self.product, quantity=Decimal('10.00'), price=Decimal('10.00') ) results['order_b'] = order.id except Exception as e: results['errors'].append(('B', str(e))) # Запускаем потоки thread_a = threading.Thread(target=create_order_a) thread_b = threading.Thread(target=create_order_b) thread_a.start() thread_b.start() thread_a.join(timeout=10) thread_b.join(timeout=10) # Ждём завершения всех сигналов time.sleep(0.5) # ========== ПРОВЕРКИ ========== # 1. Проверяем отсутствие ошибок self.assertEqual( len(results['errors']), 0, f"[TEST 1] Не должно быть ошибок при создании заказов. Ошибки: {results['errors']}" ) # 2. Оба заказа созданы self.assertIsNotNone(results['order_a'], "[TEST 1] Заказ A должен быть создан") self.assertIsNotNone(results['order_b'], "[TEST 1] Заказ B должен быть создан") # 3. Проверяем количество резервов total_reservations = Reservation.objects.filter( product=self.product, warehouse=self.warehouse ) self.assertEqual( total_reservations.count(), 2, f"[TEST 1] Должно быть ровно 2 резерва (по одному на заказ), но найдено {total_reservations.count()}" ) # 4. Каждый резерв должен иметь quantity=10 for res in total_reservations: self.assertEqual( res.quantity, Decimal('10.00'), f"[TEST 1] Каждый резерв должен иметь quantity=10, но найден {res.quantity}" ) self.assertEqual( res.status, 'reserved', f"[TEST 1] Резервы должны быть в статусе 'reserved', но найден {res.status}" ) # 5. Проверяем Stock self.stock.refresh_from_db() self.assertEqual( self.stock.quantity_available, Decimal('10.00'), f"[TEST 1] quantity_available должно остаться 10, но найдено {self.stock.quantity_available}" ) self.assertEqual( self.stock.quantity_reserved, Decimal('20.00'), f"[TEST 1] quantity_reserved должно быть 20 (10+10), но найдено {self.stock.quantity_reserved}" ) # 6. quantity_free может быть отрицательным (это "продажа в минус") expected_free = Decimal('-10.00') self.assertEqual( self.stock.quantity_free, expected_free, f"[TEST 1] quantity_free должно быть -10 (10-20), но найдено {self.stock.quantity_free}" ) # 7. Проверяем отсутствие дублей резервов # Группируем по order_item и считаем from collections import Counter order_item_counts = Counter() for res in total_reservations: order_item_counts[res.order_item.id] += 1 for order_item_id, count in order_item_counts.items(): self.assertEqual( count, 1, f"[TEST 1] Для OrderItem #{order_item_id} должен быть 1 резерв, но найдено {count}" ) print("\n[OK] ТЕСТ #1 ПРОЙДЕН: Параллельные резервы созданы корректно, нет дублирования") # ==================== ТЕСТ 2: Параллельное резервирование при недостатке товара ==================== def test_02_over_reservation_when_low_stock(self): """ ТЕСТ #2: Параллельное резервирование при недостатке товара Начальное состояние: quantity_available=20 Заказ A: резервирует 15 шт (параллельно) Заказ B: резервирует 15 шт (параллельно) ВАЖНО ДЛЯ АРХИТЕКТУРЫ: - Заказ без pickup_warehouse использует первый активный склад Warehouse.objects.filter(is_active=True).first() - В тесте явно делаем self.warehouse единственным активным складом, чтобы и партии, и резервы, и Stock были на одном складе. Ожидаемое (с учётом "продажи в минус"): ✅ Оба заказа успешны (система разрешает "в минус") ✅ quantity_reserved = 30 (15+15) ✅ quantity_free = 20 - 30 = -10 (нормально) ✅ НЕТ дублирования резервов ✅ Корректный подсчёт в Stock Проверяем: - Корректность агрегации при параллельных insert - НЕТ race condition в update_stock_on_reservation_change """ with schema_context('test_concurrent'): # Устанавливаем начальное количество self._reset_stock_quantity(Decimal('20.00')) # Делаем текущий self.warehouse единственным активным складом, # чтобы сигналы резервирования использовали именно его. Warehouse.objects.all().update(is_active=False) self.warehouse.is_active = True self.warehouse.save(update_fields=['is_active']) # Результаты параллельного выполнения results = {'order_a': None, 'order_b': None, 'errors': []} barrier = threading.Barrier(2) def create_order_a(): """Поток A: создаёт заказ на 15 шт""" try: with schema_context('test_concurrent'): barrier.wait() order = Order( customer=self.customer, status=self.status_draft, total_amount=Decimal('150.00'), order_number=2001 ) order.save() OrderItem.objects.create( order=order, product=self.product, quantity=Decimal('15.00'), price=Decimal('10.00') ) results['order_a'] = order.id except Exception as e: results['errors'].append(('A', str(e))) def create_order_b(): """Поток B: создаёт заказ на 15 шт""" try: with schema_context('test_concurrent'): barrier.wait() order = Order( customer=self.customer, status=self.status_draft, total_amount=Decimal('150.00'), order_number=2002 ) order.save() OrderItem.objects.create( order=order, product=self.product, quantity=Decimal('15.00'), price=Decimal('10.00') ) results['order_b'] = order.id except Exception as e: results['errors'].append(('B', str(e))) # Запускаем потоки thread_a = threading.Thread(target=create_order_a) thread_b = threading.Thread(target=create_order_b) thread_a.start() thread_b.start() thread_a.join(timeout=10) thread_b.join(timeout=10) # Ждём завершения всех сигналов time.sleep(0.5) # ========== ПРОВЕРКИ ========== # 1. Проверяем отсутствие ошибок self.assertEqual( len(results['errors']), 0, f"[TEST 2] Не должно быть ошибок при создании заказов. Ошибки: {results['errors']}" ) # 2. Оба заказа созданы self.assertIsNotNone(results['order_a'], "[TEST 2] Заказ A должен быть создан") self.assertIsNotNone(results['order_b'], "[TEST 2] Заказ B должен быть создан") # 3. Проверяем количество резервов на том складе, который реально используется total_reservations = Reservation.objects.filter( product=self.product, warehouse=self.warehouse ) self.assertEqual( total_reservations.count(), 2, f"[TEST 2] Должно быть ровно 2 резерва, но найдено {total_reservations.count()}" ) # 4. Каждый резерв должен иметь quantity=15 и статус 'reserved' for res in total_reservations: self.assertEqual( res.quantity, Decimal('15.00'), f"[TEST 2] Каждый резерв должен иметь quantity=15, но найден {res.quantity}" ) self.assertEqual( res.status, 'reserved', f"[TEST 2] Резерв #{res.id} должен быть в статусе 'reserved', но имеет {res.status}" ) # 5. Проверяем Stock для этого же склада self.stock.refresh_from_db() self.assertEqual( self.stock.quantity_available, Decimal('20.00'), f"[TEST 2] quantity_available должно остаться 20, но найдено {self.stock.quantity_available}" ) self.assertEqual( self.stock.quantity_reserved, Decimal('30.00'), f"[TEST 2] quantity_reserved должно быть 30 (15+15), но найдено {self.stock.quantity_reserved}" ) # 6. quantity_free может быть отрицательным (это "продажа в минус") expected_free = Decimal('-10.00') self.assertEqual( self.stock.quantity_free, expected_free, f"[TEST 2] quantity_free должно быть -10 (20-30), но найдено {self.stock.quantity_free}" ) print("\n[OK] ТЕСТ #2 ПРОЙДЕН: Параллельное резервирование с недостатком товара корректно") # ==================== ТЕСТ 3: Параллельное выполнение заказов (draft → completed) ==================== def test_03_concurrent_order_completion(self): """ ТЕСТ #3: Параллельное выполнение (завершение) заказов Создаём 2 заказа со статусом draft, затем параллельно переводим оба в completed. Проверяем: - Не создаётся дублирование Sale - Резервы корректно преобразуются в продажи - Stock корректно обновляется (quantity_available уменьшается) - SaleBatchAllocation создаётся корректно (FIFO) """ print("\nТЕСТ #3: Параллельное выполнение заказов (draft -> completed)") with schema_context('test_concurrent'): # ВАЖНО: Изолируем склад, чтобы сигналы использовали именно self.warehouse Warehouse.objects.all().update(is_active=False) self.warehouse.is_active = True self.warehouse.save(update_fields=['is_active']) # Устанавливаем начальное количество self._reset_stock_quantity(Decimal('50.00')) # Создаём 2 заказа со статусом draft order_a = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('100.00'), order_number=3001 ) OrderItem.objects.create( order=order_a, product=self.product, quantity=Decimal('10.00'), price=Decimal('10.00') ) order_b = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('150.00'), order_number=3002 ) OrderItem.objects.create( order=order_b, product=self.product, quantity=Decimal('15.00'), price=Decimal('10.00') ) time.sleep(0.5) # Ждём создания резервов # Проверяем начальное состояние initial_stock = Stock.objects.get(product=self.product, warehouse=self.warehouse) self.assertEqual( initial_stock.quantity_reserved, Decimal('25.00'), f"[TEST 3] Начальный резерв должен быть 25.00, но найдено {initial_stock.quantity_reserved}" ) # Результаты параллельного выполнения results = {'errors': []} barrier = threading.Barrier(2) def complete_order_a(): """Поток A: переводит заказ A в completed""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_a.id) order.status = self.status_completed order.save() except Exception as e: results['errors'].append(('A', str(e))) def complete_order_b(): """Поток B: переводит заказ B в completed""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_b.id) order.status = self.status_completed order.save() except Exception as e: results['errors'].append(('B', str(e))) # Запускаем потоки thread_a = threading.Thread(target=complete_order_a) thread_b = threading.Thread(target=complete_order_b) thread_a.start() thread_b.start() thread_a.join(timeout=10) thread_b.join(timeout=10) # Ждём завершения всех сигналов time.sleep(0.5) # ========== ПРОВЕРКИ ========== # 1. Проверяем отсутствие ошибок self.assertEqual( len(results['errors']), 0, f"[TEST 3] Не должно быть ошибок при выполнении заказов. Ошибки: {results['errors']}" ) # 2. Должно быть ровно 2 Sale (по одному на заказ) sales = Sale.objects.filter(order__in=[order_a, order_b]) print(f"\n[DEBUG] Найдено Sale: {sales.count()}") for sale in sales: print(f" - Sale #{sale.id}: order={sale.order.order_number}, product={sale.product.name}, quantity={sale.quantity}") self.assertEqual( sales.count(), 2, f"[TEST 3] Должно быть ровно 2 Sale, но найдено {sales.count()}" ) # 3. Резервы должны быть преобразованы в продажи reservations = Reservation.objects.filter( order_item__order__in=[order_a, order_b] ) for res in reservations: self.assertEqual( res.status, 'converted_to_sale', f"[TEST 3] Резерв #{res.id} должен иметь статус converted_to_sale, но имеет {res.status}" ) # 4. Stock должен обновиться корректно # Принудительно пересчитываем Stock для проверки self.stock.refresh_from_batches() self.stock.refresh_from_db() # Проверяем партии batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse, is_active=True) print(f"\n[DEBUG] Партии после продаж:") for batch in batches: print(f" - Batch #{batch.id}: quantity={batch.quantity}, cost={batch.cost_price}") # Проверяем SaleBatchAllocation allocations = SaleBatchAllocation.objects.filter(sale__in=sales) print(f"\n[DEBUG] SaleBatchAllocation:") for alloc in allocations: print(f" - Alloc: sale={alloc.sale.order.order_number}, batch={alloc.batch_id}, qty={alloc.quantity}") print(f"\n[DEBUG] Stock после пересчета:") print(f" quantity_available: {self.stock.quantity_available}") print(f" quantity_reserved: {self.stock.quantity_reserved}") print(f" quantity_free: {self.stock.quantity_free}") # ПРОВЕРКА: Теперь с select_for_update() race condition должен быть исправлен self.assertEqual( self.stock.quantity_available, Decimal('25.00'), # 50 - 25 (продано) f"[TEST 3] quantity_available должно быть 25, но найдено {self.stock.quantity_available}" ) self.assertEqual( self.stock.quantity_reserved, Decimal('0.00'), # Резервы преобразованы f"[TEST 3] quantity_reserved должно быть 0, но найдено {self.stock.quantity_reserved}" ) # 5. SaleBatchAllocation должны быть созданы allocations = SaleBatchAllocation.objects.filter(sale__in=sales) total_allocated = sum(a.quantity for a in allocations) self.assertEqual( total_allocated, Decimal('25.00'), f"[TEST 3] Общее количество в аллокациях должно быть 25, но найдено {total_allocated}" ) print("\n[OK] ТЕСТ #3 ПРОЙДЕН: Параллельное завершение заказов корректно") # ==================== ТЕСТ 4: Параллельная отмена заказов ==================== def test_04_concurrent_order_cancellation(self): """ ТЕСТ #4: Параллельная отмена заказов (draft -> cancelled) Создаём 2 заказа со статусом draft, затем параллельно отменяем оба. Проверяем: - Резервы освобождаются корректно (reserved -> released) - Stock обновляется (quantity_reserved уменьшается) - Нет дублирования операций освобождения """ print("\nТЕСТ #4: Параллельная отмена заказов (draft -> cancelled)") with schema_context('test_concurrent'): # ВАЖНО: Изолируем склад Warehouse.objects.all().update(is_active=False) self.warehouse.is_active = True self.warehouse.save(update_fields=['is_active']) # Устанавливаем начальное количество self._reset_stock_quantity(Decimal('50.00')) # Создаём 2 заказа со статусом draft order_a = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('100.00'), order_number=4001 ) OrderItem.objects.create( order=order_a, product=self.product, quantity=Decimal('10.00'), price=Decimal('10.00') ) order_b = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('150.00'), order_number=4002 ) OrderItem.objects.create( order=order_b, product=self.product, quantity=Decimal('15.00'), price=Decimal('10.00') ) time.sleep(0.5) # Ждём создания резервов # Проверяем начальное состояние резервов initial_reservations = Reservation.objects.filter( order_item__order__in=[order_a, order_b], status='reserved' ) self.assertEqual( initial_reservations.count(), 2, f"[TEST 4] Должно быть 2 резерва, но найдено {initial_reservations.count()}" ) # Проверяем начальное состояние Stock initial_stock = Stock.objects.get(product=self.product, warehouse=self.warehouse) self.assertEqual( initial_stock.quantity_reserved, Decimal('25.00'), f"[TEST 4] Начальный резерв должен быть 25.00, но найдено {initial_stock.quantity_reserved}" ) # Результаты параллельного выполнения results = {'errors': []} barrier = threading.Barrier(2) def cancel_order_a(): """Поток A: отменяет заказ A""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_a.id) order.status = self.status_cancelled order.save() except Exception as e: results['errors'].append(('A', str(e))) def cancel_order_b(): """Поток B: отменяет заказ B""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_b.id) order.status = self.status_cancelled order.save() except Exception as e: results['errors'].append(('B', str(e))) # Запускаем потоки thread_a = threading.Thread(target=cancel_order_a) thread_b = threading.Thread(target=cancel_order_b) thread_a.start() thread_b.start() thread_a.join(timeout=10) thread_b.join(timeout=10) # Ждём завершения всех сигналов time.sleep(0.5) # ========== ПРОВЕРКИ ========== # 1. Проверяем отсутствие ошибок self.assertEqual( len(results['errors']), 0, f"[TEST 4] Не должно быть ошибок при отмене заказов. Ошибки: {results['errors']}" ) # 2. Резервы должны быть освобождены reservations = Reservation.objects.filter( order_item__order__in=[order_a, order_b] ) print(f"\n[DEBUG] Резервы после отмены:") for res in reservations: print(f" - Reservation #{res.id}: status={res.status}, quantity={res.quantity}") for res in reservations: self.assertEqual( res.status, 'released', f"[TEST 4] Резерв #{res.id} должен иметь статус released, но имеет {res.status}" ) # 3. Stock должен обновиться корректно self.stock.refresh_from_batches() self.stock.refresh_from_db() print(f"\n[DEBUG] Stock после отмены:") print(f" quantity_available: {self.stock.quantity_available}") print(f" quantity_reserved: {self.stock.quantity_reserved}") print(f" quantity_free: {self.stock.quantity_free}") self.assertEqual( self.stock.quantity_reserved, Decimal('0.00'), f"[TEST 4] quantity_reserved должно быть 0, но найдено {self.stock.quantity_reserved}" ) self.assertEqual( self.stock.quantity_available, Decimal('50.00'), # Не изменилось, т.к. заказы не были завершены f"[TEST 4] quantity_available должно быть 50, но найдено {self.stock.quantity_available}" ) self.assertEqual( self.stock.quantity_free, Decimal('50.00'), # available - reserved = 50 - 0 f"[TEST 4] quantity_free должно быть 50, но найдено {self.stock.quantity_free}" ) print("\n[OK] ТЕСТ #4 ПРОЙДЕН: Параллельная отмена заказов корректна") # ==================== ТЕСТ 5: Параллельный танец статусов (cancelled → completed) ==================== def test_05_concurrent_cancelled_to_completed(self): """ ТЕСТ #5: Параллельный танец статусов (cancelled -> draft -> completed) Создаём 2 заказа в статусе cancelled, затем параллельно переводим в completed. При этом заказ должен пройти через draft (потому что cancelled -> completed непосредственно невозможен). Проверяем: - Резервы создаются при переходе в draft - Резервы корректно преобразуются в Sale при переходе в completed - Stock корректно обновляется - Нет race condition при параллельных переходах """ print("\nТЕСТ #5: Параллельный танец статусов (cancelled -> draft -> completed)") with schema_context('test_concurrent'): # ВАЖНО: Изолируем склад Warehouse.objects.all().update(is_active=False) self.warehouse.is_active = True self.warehouse.save(update_fields=['is_active']) # Устанавливаем начальное количество self._reset_stock_quantity(Decimal('50.00')) # Создаём 2 заказа в статусе draft (создадутся резервы) order_a = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('100.00'), order_number=5001 ) OrderItem.objects.create( order=order_a, product=self.product, quantity=Decimal('10.00'), price=Decimal('10.00') ) order_b = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('150.00'), order_number=5002 ) OrderItem.objects.create( order=order_b, product=self.product, quantity=Decimal('15.00'), price=Decimal('10.00') ) time.sleep(0.5) # Ждём создания резервов # Отменяем заказы (резервы освободятся) order_a.status = self.status_cancelled order_a.save() order_b.status = self.status_cancelled order_b.save() time.sleep(0.5) # Ждём освобождения резервов # Проверяем что резервы освобождены initial_reservations = Reservation.objects.filter( order_item__order__in=[order_a, order_b], status='reserved' ) self.assertEqual( initial_reservations.count(), 0, f"[TEST 5] У cancelled заказов не должно быть активных резервов, но найдено {initial_reservations.count()}" ) # Результаты параллельного выполнения results = {'errors': []} barrier = threading.Barrier(2) def reactivate_order_a(): """Поток A: переводит заказ A в completed через draft""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_a.id) # Сначала в draft (создадутся резервы) order.status = OrderStatus.objects.get(code='draft') order.save() time.sleep(0.2) # Ждём создания резервов # Затем в completed (создадутся Sale) order.refresh_from_db() order.status = OrderStatus.objects.get(code='completed') order.save() except Exception as e: results['errors'].append(('A', str(e))) def reactivate_order_b(): """Поток B: переводит заказ B в completed через draft""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_b.id) # Сначала в draft order.status = OrderStatus.objects.get(code='draft') order.save() time.sleep(0.2) # Затем в completed order.refresh_from_db() order.status = OrderStatus.objects.get(code='completed') order.save() except Exception as e: results['errors'].append(('B', str(e))) # Запускаем потоки thread_a = threading.Thread(target=reactivate_order_a) thread_b = threading.Thread(target=reactivate_order_b) thread_a.start() thread_b.start() thread_a.join(timeout=15) thread_b.join(timeout=15) # Ждём завершения всех сигналов time.sleep(0.5) # ========== ПРОВЕРКИ ========== # 1. Проверяем отсутствие ошибок self.assertEqual( len(results['errors']), 0, f"[TEST 5] Не должно быть ошибок. Ошибки: {results['errors']}" ) # 2. Sale должны быть созданы sales = Sale.objects.filter( order__in=[order_a, order_b] ) print(f"\n[DEBUG] Найдено Sale: {sales.count()}") for sale in sales: print(f" - Sale #{sale.id}: order={sale.order.order_number}, quantity={sale.quantity}") self.assertEqual( sales.count(), 2, f"[TEST 5] Должно быть 2 Sale, но найдено {sales.count()}" ) # 3. Резервы должны быть преобразованы reservations = Reservation.objects.filter( order_item__order__in=[order_a, order_b] ) print(f"\n[DEBUG] Резервы:") for res in reservations: print(f" - Reservation #{res.id}: status={res.status}, quantity={res.quantity}") for res in reservations: self.assertEqual( res.status, 'converted_to_sale', f"[TEST 5] Резерв #{res.id} должен иметь статус converted_to_sale, но имеет {res.status}" ) # 4. Stock должен обновиться корректно self.stock.refresh_from_batches() self.stock.refresh_from_db() print(f"\n[DEBUG] Stock после завершения:") print(f" quantity_available: {self.stock.quantity_available}") print(f" quantity_reserved: {self.stock.quantity_reserved}") print(f" quantity_free: {self.stock.quantity_free}") # Проверяем партии batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse, is_active=True) print(f"\n[DEBUG] Партии после продаж:") for batch in batches: print(f" - Batch #{batch.id}: quantity={batch.quantity}") self.assertEqual( self.stock.quantity_available, Decimal('25.00'), # 50 - 25 (продано) f"[TEST 5] quantity_available должно быть 25, но найдено {self.stock.quantity_available}" ) self.assertEqual( self.stock.quantity_reserved, Decimal('0.00'), # Резервы преобразованы f"[TEST 5] quantity_reserved должно быть 0, но найдено {self.stock.quantity_reserved}" ) print("\n[OK] ТЕСТ #5 ПРОЙДЕН: Параллельный танец статусов корректен") # ==================== ТЕСТ 6: Смешанный сценарий (создание + завершение + отмена) ==================== def test_06_mixed_concurrent_operations(self): """ ТЕСТ #6: Смешанный сценарий - все операции одновременно Параллельно выполняем 3 операции: 1. Создание нового заказа (с резервированием) 2. Завершение существующего заказа (draft -> completed) 3. Отмена существующего заказа (draft -> cancelled) Проверяем: - Все операции выполняются без ошибок - Stock корректно обновляется - Резервы и Sale создаются корректно - Нет race condition при смешанных операциях """ print("\nТЕСТ #6: Смешанный сценарий - все операции одновременно") with schema_context('test_concurrent'): # ВАЖНО: Изолируем склад Warehouse.objects.all().update(is_active=False) self.warehouse.is_active = True self.warehouse.save(update_fields=['is_active']) # Устанавливаем начальное количество self._reset_stock_quantity(Decimal('100.00')) # Создаём 2 заказа в draft заранее (для завершения и отмены) order_to_complete = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('200.00'), order_number=6001 ) OrderItem.objects.create( order=order_to_complete, product=self.product, quantity=Decimal('20.00'), price=Decimal('10.00') ) order_to_cancel = Order.objects.create( customer=self.customer, status=self.status_draft, total_amount=Decimal('150.00'), order_number=6002 ) OrderItem.objects.create( order=order_to_cancel, product=self.product, quantity=Decimal('15.00'), price=Decimal('10.00') ) time.sleep(0.5) # Ждём создания резервов # Проверяем начальное состояние initial_stock = Stock.objects.get(product=self.product, warehouse=self.warehouse) self.assertEqual( initial_stock.quantity_reserved, Decimal('35.00'), # 20 + 15 f"[TEST 6] Начальный резерв должен быть 35, но найдено {initial_stock.quantity_reserved}" ) # Результаты параллельного выполнения results = {'errors': [], 'new_order_id': None} barrier = threading.Barrier(3) def create_new_order(): """Поток 1: Создаёт новый заказ""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.create( customer=Customer.objects.first(), status=OrderStatus.objects.get(code='draft'), total_amount=Decimal('100.00'), order_number=6003 ) OrderItem.objects.create( order=order, product=Product.objects.first(), quantity=Decimal('10.00'), price=Decimal('10.00') ) results['new_order_id'] = order.id except Exception as e: results['errors'].append(('CREATE', str(e))) def complete_order(): """Поток 2: Завершает заказ""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_to_complete.id) order.status = OrderStatus.objects.get(code='completed') order.save() except Exception as e: results['errors'].append(('COMPLETE', str(e))) def cancel_order(): """Поток 3: Отменяет заказ""" try: with schema_context('test_concurrent'): barrier.wait() order = Order.objects.get(id=order_to_cancel.id) order.status = OrderStatus.objects.get(code='cancelled') order.save() except Exception as e: results['errors'].append(('CANCEL', str(e))) # Запускаем все 3 потока thread_create = threading.Thread(target=create_new_order) thread_complete = threading.Thread(target=complete_order) thread_cancel = threading.Thread(target=cancel_order) thread_create.start() thread_complete.start() thread_cancel.start() thread_create.join(timeout=15) thread_complete.join(timeout=15) thread_cancel.join(timeout=15) # Ждём завершения всех сигналов time.sleep(0.5) # ========== ПРОВЕРКИ ========== # 1. Проверяем отсутствие ошибок self.assertEqual( len(results['errors']), 0, f"[TEST 6] Не должно быть ошибок. Ошибки: {results['errors']}" ) # 2. Проверяем что новый заказ создан self.assertIsNotNone( results['new_order_id'], "[TEST 6] Новый заказ должен быть создан" ) new_order = Order.objects.get(id=results['new_order_id']) print(f"\n[DEBUG] Новый заказ: #{new_order.order_number}, статус={new_order.status.code}") # 3. Проверяем Sale для завершённого заказа completed_sales = Sale.objects.filter(order=order_to_complete) self.assertEqual( completed_sales.count(), 1, f"[TEST 6] Должен быть 1 Sale для завершённого заказа, но найдено {completed_sales.count()}" ) print(f"\n[DEBUG] Sale для завершённого заказа: quantity={completed_sales.first().quantity}") # 4. Проверяем резервы all_reservations = Reservation.objects.filter( order_item__order__in=[order_to_complete, order_to_cancel, new_order] ) print(f"\n[DEBUG] Резервы:") for res in all_reservations: print(f" - Reservation #{res.id}: order={res.order_item.order.order_number}, status={res.status}, quantity={res.quantity}") # Резерв завершённого заказа должен быть converted_to_sale completed_reservations = Reservation.objects.filter(order_item__order=order_to_complete) for res in completed_reservations: self.assertEqual( res.status, 'converted_to_sale', f"[TEST 6] Резерв завершённого заказа должен использовать converted_to_sale, но {res.status}" ) # Резерв отменённого заказа должен быть released cancelled_reservations = Reservation.objects.filter(order_item__order=order_to_cancel) for res in cancelled_reservations: self.assertEqual( res.status, 'released', f"[TEST 6] Резерв отменённого заказа должен быть released, но {res.status}" ) # Резерв нового заказа должен быть reserved new_reservations = Reservation.objects.filter(order_item__order=new_order) for res in new_reservations: self.assertEqual( res.status, 'reserved', f"[TEST 6] Резерв нового заказа должен быть reserved, но {res.status}" ) # 5. Stock должен обновиться корректно self.stock.refresh_from_batches() self.stock.refresh_from_db() print(f"\n[DEBUG] Stock после смешанных операций:") print(f" quantity_available: {self.stock.quantity_available}") print(f" quantity_reserved: {self.stock.quantity_reserved}") print(f" quantity_free: {self.stock.quantity_free}") # Проверяем партии batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse, is_active=True) print(f"\n[DEBUG] Партии:") for batch in batches: print(f" - Batch #{batch.id}: quantity={batch.quantity}") # Ожидаем: # - Начально: 100 # - Завершён заказ на 20: 100 - 20 = 80 # - Отменён заказ на 15: резерв освобождён # - Создан заказ на 10: резерв 10 # quantity_available = 80, quantity_reserved = 10, quantity_free = 70 self.assertEqual( self.stock.quantity_available, Decimal('80.00'), # 100 - 20 (продано) f"[TEST 6] quantity_available должно быть 80, но найдено {self.stock.quantity_available}" ) self.assertEqual( self.stock.quantity_reserved, Decimal('10.00'), # Резерв нового заказа f"[TEST 6] quantity_reserved должно быть 10, но найдено {self.stock.quantity_reserved}" ) self.assertEqual( self.stock.quantity_free, Decimal('70.00'), # 80 - 10 f"[TEST 6] quantity_free должно быть 70, но найдено {self.stock.quantity_free}" ) print("\n[OK] ТЕСТ #6 ПРОЙДЕН: Смешанные параллельные операции корректны")