Files
octopus/myproject/inventory/tests/test_concurrent_orders.py
Andrey Smakotin 541ea5e561 Добавлены тесты параллельных операций с заказами
- Тест 1: Параллельное резервирование одинакового количества
- Тест 2: Резервирование при недостатке товара
- Тест 3: Параллельное завершение заказов (проверка race condition)
- Тест 4: Параллельная отмена заказов
- Тест 5: Параллельный танец статусов (cancelled -> draft -> completed)
- Тест 6: Смешанный сценарий (создание + завершение + отмена)

Все тесты проходят успешно, race condition исправлен
2026-01-05 21:29:45 +03:00

1312 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
ТЕСТЫ ПАРАЛЛЕЛЬНЫХ ЗАКАЗОВ: Race Conditions и конкурентный доступ
Проверяем корректность работы при одновременном создании заказов
на один и тот же товар.
ВАЖНО: Продажа "в минус" - это ФИЧА!
- Система позволяет заказывать даже то, чего нет в наличии
- quantity_free может быть отрицательным - это нормально
- Цель тестов: обнаружить race conditions (дублирование, потерю данных), а не запретить продажу в минус
"""
import threading
import time
from decimal import Decimal
from django.test import TransactionTestCase
from django.db import connection
from django.contrib.auth import get_user_model
from django_tenants.utils import schema_context
from tenants.models import Client, Domain
from products.models import Product, ProductCategory
from inventory.models import (
Warehouse, StockBatch, Stock, Reservation,
Sale, SaleBatchAllocation
)
from orders.models import Order, OrderItem, OrderStatus
from customers.models import Customer
User = get_user_model()
class ConcurrentOrderTest(TransactionTestCase):
"""
TransactionTestCase для реальных транзакций БД.
Не используем TestCase, т.к. он оборачивает тест в транзакцию,
которая откатывается после теста. Для тестирования параллельности
нужны реальные COMMIT'ы.
"""
# Отключаем автоматическую очистку через TRUNCATE
# (она вызывает ошибки с foreign keys)
serialized_rollback = False
reset_sequences = False
def _fixture_teardown(self):
"""Переопределяем, чтобы не было flush/truncate - делаем вручную в tearDown"""
# Ничего не делаем - все удаляем вручную в tearDown
pass
@classmethod
def setUpClass(cls):
"""Создаём тестовый тенант один раз для всех тестов"""
super().setUpClass()
# Создаём тестовый тенант
cls.tenant = Client.objects.create(
schema_name='test_concurrent',
name='Test Concurrent Orders Tenant',
is_active=True
)
# Создаём домен
Domain.objects.create(
domain='test_concurrent.localhost',
tenant=cls.tenant,
is_primary=True
)
@classmethod
def tearDownClass(cls):
"""Удаляем тестовый тенант после всех тестов"""
try:
with connection.cursor() as cursor:
cursor.execute(f'DROP SCHEMA IF EXISTS {cls.tenant.schema_name} CASCADE')
except Exception:
pass
try:
cls.tenant.delete()
except Exception:
pass
super().tearDownClass()
def setUp(self):
"""Подготовка перед каждым тестом"""
with schema_context('test_concurrent'):
# Создаём системные сущности
self._create_system_entities()
# Создаём тестовые данные
self._create_test_data()
def tearDown(self):
"""Очистка после каждого теста"""
with schema_context('test_concurrent'):
# Удаляем все тестовые данные вручную
# (TransactionTestCase не откатывает транзакции автоматически)
Sale.objects.all().delete()
SaleBatchAllocation.objects.all().delete()
Reservation.objects.all().delete()
OrderItem.objects.all().delete()
Order.objects.all().delete()
Stock.objects.all().delete()
StockBatch.objects.all().delete()
Product.objects.all().delete()
ProductCategory.objects.all().delete()
OrderStatus.objects.all().delete()
def _create_system_entities(self):
"""Создаёт системные сущности (статусы, склад, клиента)"""
# Создаём статусы заказов
self.status_draft = OrderStatus.objects.create(
code='draft',
name='Черновик',
is_system=True,
is_positive_end=False,
is_negative_end=False,
color='#9E9E9E',
order=0
)
self.status_completed = OrderStatus.objects.create(
code='completed',
name='Выполнен',
is_system=True,
is_positive_end=True,
is_negative_end=False,
color='#4CAF50',
order=50
)
self.status_cancelled = OrderStatus.objects.create(
code='cancelled',
name='Отменен',
is_system=True,
is_positive_end=False,
is_negative_end=True,
color='#F44336',
order=70
)
# Создаём склад
self.warehouse = Warehouse.objects.create(
name='Основной склад'
)
# Создаём системного клиента
self.customer, _ = Customer.objects.get_or_create(
phone='+375291111111',
defaults={
'name': 'Тестовый клиент',
'is_system_customer': False
}
)
def _create_test_data(self):
"""Создаёт тестовые товары и партии"""
# Категория
category = ProductCategory.objects.create(
name='Тестовая категория',
is_active=True
)
# Товар
self.product = Product.objects.create(
name='Тестовый товар (concurrent)',
sku='TEST-CONCURRENT-001',
status='active',
price=Decimal('10.00')
)
self.product.categories.add(category)
# Партия товара (начальное количество будет задаваться в тестах)
# Создаём минимальную партию для начала
self.stock_batch = StockBatch.objects.create(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('100.00'),
cost_price=Decimal('5.00')
)
# Создаём Stock
self.stock, _ = Stock.objects.get_or_create(
product=self.product,
warehouse=self.warehouse
)
self.stock.refresh_from_batches()
def _reset_stock_quantity(self, quantity):
"""Устанавливает количество товара на складе"""
self.stock_batch.quantity = quantity
self.stock_batch.save()
self.stock.refresh_from_batches()
# ==================== ТЕСТ 1: Два заказа параллельно резервируют одинаковое количество ====================
def test_01_two_orders_reserve_same_quantity_concurrently(self):
"""
ТЕСТ #1: Два заказа параллельно резервируют одинаковое количество
Начальное состояние: quantity_available=10
Заказ A: резервирует 10 шт (параллельно)
Заказ B: резервирует 10 шт (параллельно)
Ожидаемое (с учётом "продажи в минус"):
Оба заказа успешны
✅ quantity_reserved = 20 (корректная сумма резервов)
✅ quantity_free = 10 - 20 = -10 (нормально, это "в минус")
НЕТ дублирования резервов
НЕТ потери резервов
Проверяем:
- Количество Reservation = 2 (по одному на каждый заказ)
- Каждый резерв имеет quantity=10
- Stock.quantity_reserved = 20 (сумма обоих)
- НЕТ race condition в подсчёте quantity_reserved
"""
with schema_context('test_concurrent'):
# Устанавливаем начальное количество
self._reset_stock_quantity(Decimal('10.00'))
# Результаты параллельного выполнения
results = {'order_a': None, 'order_b': None, 'errors': []}
barrier = threading.Barrier(2) # Синхронизация старта
def create_order_a():
"""Поток A: создаёт заказ на 10 шт"""
try:
with schema_context('test_concurrent'):
barrier.wait() # Ждём второй поток
# Устанавливаем уникальный номер заказа вручную
# (чтобы избежать race condition в генерации номеров)
order = Order(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('100.00'),
order_number=1001 # Уникальный номер для потока A
)
order.save()
OrderItem.objects.create(
order=order,
product=self.product,
quantity=Decimal('10.00'),
price=Decimal('10.00')
)
results['order_a'] = order.id
except Exception as e:
results['errors'].append(('A', str(e)))
def create_order_b():
"""Поток B: создаёт заказ на 10 шт"""
try:
with schema_context('test_concurrent'):
barrier.wait() # Ждём первый поток
# Устанавливаем уникальный номер заказа вручную
order = Order(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('100.00'),
order_number=1002 # Уникальный номер для потока B
)
order.save()
OrderItem.objects.create(
order=order,
product=self.product,
quantity=Decimal('10.00'),
price=Decimal('10.00')
)
results['order_b'] = order.id
except Exception as e:
results['errors'].append(('B', str(e)))
# Запускаем потоки
thread_a = threading.Thread(target=create_order_a)
thread_b = threading.Thread(target=create_order_b)
thread_a.start()
thread_b.start()
thread_a.join(timeout=10)
thread_b.join(timeout=10)
# Ждём завершения всех сигналов
time.sleep(0.5)
# ========== ПРОВЕРКИ ==========
# 1. Проверяем отсутствие ошибок
self.assertEqual(
len(results['errors']),
0,
f"[TEST 1] Не должно быть ошибок при создании заказов. Ошибки: {results['errors']}"
)
# 2. Оба заказа созданы
self.assertIsNotNone(results['order_a'], "[TEST 1] Заказ A должен быть создан")
self.assertIsNotNone(results['order_b'], "[TEST 1] Заказ B должен быть создан")
# 3. Проверяем количество резервов
total_reservations = Reservation.objects.filter(
product=self.product,
warehouse=self.warehouse
)
self.assertEqual(
total_reservations.count(),
2,
f"[TEST 1] Должно быть ровно 2 резерва (по одному на заказ), но найдено {total_reservations.count()}"
)
# 4. Каждый резерв должен иметь quantity=10
for res in total_reservations:
self.assertEqual(
res.quantity,
Decimal('10.00'),
f"[TEST 1] Каждый резерв должен иметь quantity=10, но найден {res.quantity}"
)
self.assertEqual(
res.status,
'reserved',
f"[TEST 1] Резервы должны быть в статусе 'reserved', но найден {res.status}"
)
# 5. Проверяем Stock
self.stock.refresh_from_db()
self.assertEqual(
self.stock.quantity_available,
Decimal('10.00'),
f"[TEST 1] quantity_available должно остаться 10, но найдено {self.stock.quantity_available}"
)
self.assertEqual(
self.stock.quantity_reserved,
Decimal('20.00'),
f"[TEST 1] quantity_reserved должно быть 20 (10+10), но найдено {self.stock.quantity_reserved}"
)
# 6. quantity_free может быть отрицательным (это "продажа в минус")
expected_free = Decimal('-10.00')
self.assertEqual(
self.stock.quantity_free,
expected_free,
f"[TEST 1] quantity_free должно быть -10 (10-20), но найдено {self.stock.quantity_free}"
)
# 7. Проверяем отсутствие дублей резервов
# Группируем по order_item и считаем
from collections import Counter
order_item_counts = Counter()
for res in total_reservations:
order_item_counts[res.order_item.id] += 1
for order_item_id, count in order_item_counts.items():
self.assertEqual(
count,
1,
f"[TEST 1] Для OrderItem #{order_item_id} должен быть 1 резерв, но найдено {count}"
)
print("\n[OK] ТЕСТ #1 ПРОЙДЕН: Параллельные резервы созданы корректно, нет дублирования")
# ==================== ТЕСТ 2: Параллельное резервирование при недостатке товара ====================
def test_02_over_reservation_when_low_stock(self):
"""
ТЕСТ #2: Параллельное резервирование при недостатке товара
Начальное состояние: quantity_available=20
Заказ A: резервирует 15 шт (параллельно)
Заказ B: резервирует 15 шт (параллельно)
ВАЖНО ДЛЯ АРХИТЕКТУРЫ:
- Заказ без pickup_warehouse использует первый активный склад
Warehouse.objects.filter(is_active=True).first()
- В тесте явно делаем self.warehouse единственным активным складом,
чтобы и партии, и резервы, и Stock были на одном складе.
Ожидаемое (с учётом "продажи в минус"):
Оба заказа успешны (система разрешает "в минус")
✅ quantity_reserved = 30 (15+15)
✅ quantity_free = 20 - 30 = -10 (нормально)
НЕТ дублирования резервов
✅ Корректный подсчёт в Stock
Проверяем:
- Корректность агрегации при параллельных insert
- НЕТ race condition в update_stock_on_reservation_change
"""
with schema_context('test_concurrent'):
# Устанавливаем начальное количество
self._reset_stock_quantity(Decimal('20.00'))
# Делаем текущий self.warehouse единственным активным складом,
# чтобы сигналы резервирования использовали именно его.
Warehouse.objects.all().update(is_active=False)
self.warehouse.is_active = True
self.warehouse.save(update_fields=['is_active'])
# Результаты параллельного выполнения
results = {'order_a': None, 'order_b': None, 'errors': []}
barrier = threading.Barrier(2)
def create_order_a():
"""Поток A: создаёт заказ на 15 шт"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('150.00'),
order_number=2001
)
order.save()
OrderItem.objects.create(
order=order,
product=self.product,
quantity=Decimal('15.00'),
price=Decimal('10.00')
)
results['order_a'] = order.id
except Exception as e:
results['errors'].append(('A', str(e)))
def create_order_b():
"""Поток B: создаёт заказ на 15 шт"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('150.00'),
order_number=2002
)
order.save()
OrderItem.objects.create(
order=order,
product=self.product,
quantity=Decimal('15.00'),
price=Decimal('10.00')
)
results['order_b'] = order.id
except Exception as e:
results['errors'].append(('B', str(e)))
# Запускаем потоки
thread_a = threading.Thread(target=create_order_a)
thread_b = threading.Thread(target=create_order_b)
thread_a.start()
thread_b.start()
thread_a.join(timeout=10)
thread_b.join(timeout=10)
# Ждём завершения всех сигналов
time.sleep(0.5)
# ========== ПРОВЕРКИ ==========
# 1. Проверяем отсутствие ошибок
self.assertEqual(
len(results['errors']),
0,
f"[TEST 2] Не должно быть ошибок при создании заказов. Ошибки: {results['errors']}"
)
# 2. Оба заказа созданы
self.assertIsNotNone(results['order_a'], "[TEST 2] Заказ A должен быть создан")
self.assertIsNotNone(results['order_b'], "[TEST 2] Заказ B должен быть создан")
# 3. Проверяем количество резервов на том складе, который реально используется
total_reservations = Reservation.objects.filter(
product=self.product,
warehouse=self.warehouse
)
self.assertEqual(
total_reservations.count(),
2,
f"[TEST 2] Должно быть ровно 2 резерва, но найдено {total_reservations.count()}"
)
# 4. Каждый резерв должен иметь quantity=15 и статус 'reserved'
for res in total_reservations:
self.assertEqual(
res.quantity,
Decimal('15.00'),
f"[TEST 2] Каждый резерв должен иметь quantity=15, но найден {res.quantity}"
)
self.assertEqual(
res.status,
'reserved',
f"[TEST 2] Резерв #{res.id} должен быть в статусе 'reserved', но имеет {res.status}"
)
# 5. Проверяем Stock для этого же склада
self.stock.refresh_from_db()
self.assertEqual(
self.stock.quantity_available,
Decimal('20.00'),
f"[TEST 2] quantity_available должно остаться 20, но найдено {self.stock.quantity_available}"
)
self.assertEqual(
self.stock.quantity_reserved,
Decimal('30.00'),
f"[TEST 2] quantity_reserved должно быть 30 (15+15), но найдено {self.stock.quantity_reserved}"
)
# 6. quantity_free может быть отрицательным (это "продажа в минус")
expected_free = Decimal('-10.00')
self.assertEqual(
self.stock.quantity_free,
expected_free,
f"[TEST 2] quantity_free должно быть -10 (20-30), но найдено {self.stock.quantity_free}"
)
print("\n[OK] ТЕСТ #2 ПРОЙДЕН: Параллельное резервирование с недостатком товара корректно")
# ==================== ТЕСТ 3: Параллельное выполнение заказов (draft → completed) ====================
def test_03_concurrent_order_completion(self):
"""
ТЕСТ #3: Параллельное выполнение (завершение) заказов
Создаём 2 заказа со статусом draft, затем параллельно переводим оба в completed.
Проверяем:
- Не создаётся дублирование Sale
- Резервы корректно преобразуются в продажи
- Stock корректно обновляется (quantity_available уменьшается)
- SaleBatchAllocation создаётся корректно (FIFO)
"""
print("\nТЕСТ #3: Параллельное выполнение заказов (draft -> completed)")
with schema_context('test_concurrent'):
# ВАЖНО: Изолируем склад, чтобы сигналы использовали именно self.warehouse
Warehouse.objects.all().update(is_active=False)
self.warehouse.is_active = True
self.warehouse.save(update_fields=['is_active'])
# Устанавливаем начальное количество
self._reset_stock_quantity(Decimal('50.00'))
# Создаём 2 заказа со статусом draft
order_a = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('100.00'),
order_number=3001
)
OrderItem.objects.create(
order=order_a,
product=self.product,
quantity=Decimal('10.00'),
price=Decimal('10.00')
)
order_b = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('150.00'),
order_number=3002
)
OrderItem.objects.create(
order=order_b,
product=self.product,
quantity=Decimal('15.00'),
price=Decimal('10.00')
)
time.sleep(0.5) # Ждём создания резервов
# Проверяем начальное состояние
initial_stock = Stock.objects.get(product=self.product, warehouse=self.warehouse)
self.assertEqual(
initial_stock.quantity_reserved,
Decimal('25.00'),
f"[TEST 3] Начальный резерв должен быть 25.00, но найдено {initial_stock.quantity_reserved}"
)
# Результаты параллельного выполнения
results = {'errors': []}
barrier = threading.Barrier(2)
def complete_order_a():
"""Поток A: переводит заказ A в completed"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_a.id)
order.status = self.status_completed
order.save()
except Exception as e:
results['errors'].append(('A', str(e)))
def complete_order_b():
"""Поток B: переводит заказ B в completed"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_b.id)
order.status = self.status_completed
order.save()
except Exception as e:
results['errors'].append(('B', str(e)))
# Запускаем потоки
thread_a = threading.Thread(target=complete_order_a)
thread_b = threading.Thread(target=complete_order_b)
thread_a.start()
thread_b.start()
thread_a.join(timeout=10)
thread_b.join(timeout=10)
# Ждём завершения всех сигналов
time.sleep(0.5)
# ========== ПРОВЕРКИ ==========
# 1. Проверяем отсутствие ошибок
self.assertEqual(
len(results['errors']),
0,
f"[TEST 3] Не должно быть ошибок при выполнении заказов. Ошибки: {results['errors']}"
)
# 2. Должно быть ровно 2 Sale (по одному на заказ)
sales = Sale.objects.filter(order__in=[order_a, order_b])
print(f"\n[DEBUG] Найдено Sale: {sales.count()}")
for sale in sales:
print(f" - Sale #{sale.id}: order={sale.order.order_number}, product={sale.product.name}, quantity={sale.quantity}")
self.assertEqual(
sales.count(),
2,
f"[TEST 3] Должно быть ровно 2 Sale, но найдено {sales.count()}"
)
# 3. Резервы должны быть преобразованы в продажи
reservations = Reservation.objects.filter(
order_item__order__in=[order_a, order_b]
)
for res in reservations:
self.assertEqual(
res.status,
'converted_to_sale',
f"[TEST 3] Резерв #{res.id} должен иметь статус converted_to_sale, но имеет {res.status}"
)
# 4. Stock должен обновиться корректно
# Принудительно пересчитываем Stock для проверки
self.stock.refresh_from_batches()
self.stock.refresh_from_db()
# Проверяем партии
batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse, is_active=True)
print(f"\n[DEBUG] Партии после продаж:")
for batch in batches:
print(f" - Batch #{batch.id}: quantity={batch.quantity}, cost={batch.cost_price}")
# Проверяем SaleBatchAllocation
allocations = SaleBatchAllocation.objects.filter(sale__in=sales)
print(f"\n[DEBUG] SaleBatchAllocation:")
for alloc in allocations:
print(f" - Alloc: sale={alloc.sale.order.order_number}, batch={alloc.batch_id}, qty={alloc.quantity}")
print(f"\n[DEBUG] Stock после пересчета:")
print(f" quantity_available: {self.stock.quantity_available}")
print(f" quantity_reserved: {self.stock.quantity_reserved}")
print(f" quantity_free: {self.stock.quantity_free}")
# ПРОВЕРКА: Теперь с select_for_update() race condition должен быть исправлен
self.assertEqual(
self.stock.quantity_available,
Decimal('25.00'), # 50 - 25 (продано)
f"[TEST 3] quantity_available должно быть 25, но найдено {self.stock.quantity_available}"
)
self.assertEqual(
self.stock.quantity_reserved,
Decimal('0.00'), # Резервы преобразованы
f"[TEST 3] quantity_reserved должно быть 0, но найдено {self.stock.quantity_reserved}"
)
# 5. SaleBatchAllocation должны быть созданы
allocations = SaleBatchAllocation.objects.filter(sale__in=sales)
total_allocated = sum(a.quantity for a in allocations)
self.assertEqual(
total_allocated,
Decimal('25.00'),
f"[TEST 3] Общее количество в аллокациях должно быть 25, но найдено {total_allocated}"
)
print("\n[OK] ТЕСТ #3 ПРОЙДЕН: Параллельное завершение заказов корректно")
# ==================== ТЕСТ 4: Параллельная отмена заказов ====================
def test_04_concurrent_order_cancellation(self):
"""
ТЕСТ #4: Параллельная отмена заказов (draft -> cancelled)
Создаём 2 заказа со статусом draft, затем параллельно отменяем оба.
Проверяем:
- Резервы освобождаются корректно (reserved -> released)
- Stock обновляется (quantity_reserved уменьшается)
- Нет дублирования операций освобождения
"""
print("\nТЕСТ #4: Параллельная отмена заказов (draft -> cancelled)")
with schema_context('test_concurrent'):
# ВАЖНО: Изолируем склад
Warehouse.objects.all().update(is_active=False)
self.warehouse.is_active = True
self.warehouse.save(update_fields=['is_active'])
# Устанавливаем начальное количество
self._reset_stock_quantity(Decimal('50.00'))
# Создаём 2 заказа со статусом draft
order_a = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('100.00'),
order_number=4001
)
OrderItem.objects.create(
order=order_a,
product=self.product,
quantity=Decimal('10.00'),
price=Decimal('10.00')
)
order_b = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('150.00'),
order_number=4002
)
OrderItem.objects.create(
order=order_b,
product=self.product,
quantity=Decimal('15.00'),
price=Decimal('10.00')
)
time.sleep(0.5) # Ждём создания резервов
# Проверяем начальное состояние резервов
initial_reservations = Reservation.objects.filter(
order_item__order__in=[order_a, order_b],
status='reserved'
)
self.assertEqual(
initial_reservations.count(),
2,
f"[TEST 4] Должно быть 2 резерва, но найдено {initial_reservations.count()}"
)
# Проверяем начальное состояние Stock
initial_stock = Stock.objects.get(product=self.product, warehouse=self.warehouse)
self.assertEqual(
initial_stock.quantity_reserved,
Decimal('25.00'),
f"[TEST 4] Начальный резерв должен быть 25.00, но найдено {initial_stock.quantity_reserved}"
)
# Результаты параллельного выполнения
results = {'errors': []}
barrier = threading.Barrier(2)
def cancel_order_a():
"""Поток A: отменяет заказ A"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_a.id)
order.status = self.status_cancelled
order.save()
except Exception as e:
results['errors'].append(('A', str(e)))
def cancel_order_b():
"""Поток B: отменяет заказ B"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_b.id)
order.status = self.status_cancelled
order.save()
except Exception as e:
results['errors'].append(('B', str(e)))
# Запускаем потоки
thread_a = threading.Thread(target=cancel_order_a)
thread_b = threading.Thread(target=cancel_order_b)
thread_a.start()
thread_b.start()
thread_a.join(timeout=10)
thread_b.join(timeout=10)
# Ждём завершения всех сигналов
time.sleep(0.5)
# ========== ПРОВЕРКИ ==========
# 1. Проверяем отсутствие ошибок
self.assertEqual(
len(results['errors']),
0,
f"[TEST 4] Не должно быть ошибок при отмене заказов. Ошибки: {results['errors']}"
)
# 2. Резервы должны быть освобождены
reservations = Reservation.objects.filter(
order_item__order__in=[order_a, order_b]
)
print(f"\n[DEBUG] Резервы после отмены:")
for res in reservations:
print(f" - Reservation #{res.id}: status={res.status}, quantity={res.quantity}")
for res in reservations:
self.assertEqual(
res.status,
'released',
f"[TEST 4] Резерв #{res.id} должен иметь статус released, но имеет {res.status}"
)
# 3. Stock должен обновиться корректно
self.stock.refresh_from_batches()
self.stock.refresh_from_db()
print(f"\n[DEBUG] Stock после отмены:")
print(f" quantity_available: {self.stock.quantity_available}")
print(f" quantity_reserved: {self.stock.quantity_reserved}")
print(f" quantity_free: {self.stock.quantity_free}")
self.assertEqual(
self.stock.quantity_reserved,
Decimal('0.00'),
f"[TEST 4] quantity_reserved должно быть 0, но найдено {self.stock.quantity_reserved}"
)
self.assertEqual(
self.stock.quantity_available,
Decimal('50.00'), # Не изменилось, т.к. заказы не были завершены
f"[TEST 4] quantity_available должно быть 50, но найдено {self.stock.quantity_available}"
)
self.assertEqual(
self.stock.quantity_free,
Decimal('50.00'), # available - reserved = 50 - 0
f"[TEST 4] quantity_free должно быть 50, но найдено {self.stock.quantity_free}"
)
print("\n[OK] ТЕСТ #4 ПРОЙДЕН: Параллельная отмена заказов корректна")
# ==================== ТЕСТ 5: Параллельный танец статусов (cancelled → completed) ====================
def test_05_concurrent_cancelled_to_completed(self):
"""
ТЕСТ #5: Параллельный танец статусов (cancelled -> draft -> completed)
Создаём 2 заказа в статусе cancelled, затем параллельно переводим в completed.
При этом заказ должен пройти через draft (потому что cancelled -> completed непосредственно невозможен).
Проверяем:
- Резервы создаются при переходе в draft
- Резервы корректно преобразуются в Sale при переходе в completed
- Stock корректно обновляется
- Нет race condition при параллельных переходах
"""
print("\nТЕСТ #5: Параллельный танец статусов (cancelled -> draft -> completed)")
with schema_context('test_concurrent'):
# ВАЖНО: Изолируем склад
Warehouse.objects.all().update(is_active=False)
self.warehouse.is_active = True
self.warehouse.save(update_fields=['is_active'])
# Устанавливаем начальное количество
self._reset_stock_quantity(Decimal('50.00'))
# Создаём 2 заказа в статусе draft (создадутся резервы)
order_a = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('100.00'),
order_number=5001
)
OrderItem.objects.create(
order=order_a,
product=self.product,
quantity=Decimal('10.00'),
price=Decimal('10.00')
)
order_b = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('150.00'),
order_number=5002
)
OrderItem.objects.create(
order=order_b,
product=self.product,
quantity=Decimal('15.00'),
price=Decimal('10.00')
)
time.sleep(0.5) # Ждём создания резервов
# Отменяем заказы (резервы освободятся)
order_a.status = self.status_cancelled
order_a.save()
order_b.status = self.status_cancelled
order_b.save()
time.sleep(0.5) # Ждём освобождения резервов
# Проверяем что резервы освобождены
initial_reservations = Reservation.objects.filter(
order_item__order__in=[order_a, order_b],
status='reserved'
)
self.assertEqual(
initial_reservations.count(),
0,
f"[TEST 5] У cancelled заказов не должно быть активных резервов, но найдено {initial_reservations.count()}"
)
# Результаты параллельного выполнения
results = {'errors': []}
barrier = threading.Barrier(2)
def reactivate_order_a():
"""Поток A: переводит заказ A в completed через draft"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_a.id)
# Сначала в draft (создадутся резервы)
order.status = OrderStatus.objects.get(code='draft')
order.save()
time.sleep(0.2) # Ждём создания резервов
# Затем в completed (создадутся Sale)
order.refresh_from_db()
order.status = OrderStatus.objects.get(code='completed')
order.save()
except Exception as e:
results['errors'].append(('A', str(e)))
def reactivate_order_b():
"""Поток B: переводит заказ B в completed через draft"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_b.id)
# Сначала в draft
order.status = OrderStatus.objects.get(code='draft')
order.save()
time.sleep(0.2)
# Затем в completed
order.refresh_from_db()
order.status = OrderStatus.objects.get(code='completed')
order.save()
except Exception as e:
results['errors'].append(('B', str(e)))
# Запускаем потоки
thread_a = threading.Thread(target=reactivate_order_a)
thread_b = threading.Thread(target=reactivate_order_b)
thread_a.start()
thread_b.start()
thread_a.join(timeout=15)
thread_b.join(timeout=15)
# Ждём завершения всех сигналов
time.sleep(0.5)
# ========== ПРОВЕРКИ ==========
# 1. Проверяем отсутствие ошибок
self.assertEqual(
len(results['errors']),
0,
f"[TEST 5] Не должно быть ошибок. Ошибки: {results['errors']}"
)
# 2. Sale должны быть созданы
sales = Sale.objects.filter(
order__in=[order_a, order_b]
)
print(f"\n[DEBUG] Найдено Sale: {sales.count()}")
for sale in sales:
print(f" - Sale #{sale.id}: order={sale.order.order_number}, quantity={sale.quantity}")
self.assertEqual(
sales.count(),
2,
f"[TEST 5] Должно быть 2 Sale, но найдено {sales.count()}"
)
# 3. Резервы должны быть преобразованы
reservations = Reservation.objects.filter(
order_item__order__in=[order_a, order_b]
)
print(f"\n[DEBUG] Резервы:")
for res in reservations:
print(f" - Reservation #{res.id}: status={res.status}, quantity={res.quantity}")
for res in reservations:
self.assertEqual(
res.status,
'converted_to_sale',
f"[TEST 5] Резерв #{res.id} должен иметь статус converted_to_sale, но имеет {res.status}"
)
# 4. Stock должен обновиться корректно
self.stock.refresh_from_batches()
self.stock.refresh_from_db()
print(f"\n[DEBUG] Stock после завершения:")
print(f" quantity_available: {self.stock.quantity_available}")
print(f" quantity_reserved: {self.stock.quantity_reserved}")
print(f" quantity_free: {self.stock.quantity_free}")
# Проверяем партии
batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse, is_active=True)
print(f"\n[DEBUG] Партии после продаж:")
for batch in batches:
print(f" - Batch #{batch.id}: quantity={batch.quantity}")
self.assertEqual(
self.stock.quantity_available,
Decimal('25.00'), # 50 - 25 (продано)
f"[TEST 5] quantity_available должно быть 25, но найдено {self.stock.quantity_available}"
)
self.assertEqual(
self.stock.quantity_reserved,
Decimal('0.00'), # Резервы преобразованы
f"[TEST 5] quantity_reserved должно быть 0, но найдено {self.stock.quantity_reserved}"
)
print("\n[OK] ТЕСТ #5 ПРОЙДЕН: Параллельный танец статусов корректен")
# ==================== ТЕСТ 6: Смешанный сценарий (создание + завершение + отмена) ====================
def test_06_mixed_concurrent_operations(self):
"""
ТЕСТ #6: Смешанный сценарий - все операции одновременно
Параллельно выполняем 3 операции:
1. Создание нового заказа (с резервированием)
2. Завершение существующего заказа (draft -> completed)
3. Отмена существующего заказа (draft -> cancelled)
Проверяем:
- Все операции выполняются без ошибок
- Stock корректно обновляется
- Резервы и Sale создаются корректно
- Нет race condition при смешанных операциях
"""
print("\nТЕСТ #6: Смешанный сценарий - все операции одновременно")
with schema_context('test_concurrent'):
# ВАЖНО: Изолируем склад
Warehouse.objects.all().update(is_active=False)
self.warehouse.is_active = True
self.warehouse.save(update_fields=['is_active'])
# Устанавливаем начальное количество
self._reset_stock_quantity(Decimal('100.00'))
# Создаём 2 заказа в draft заранее (для завершения и отмены)
order_to_complete = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('200.00'),
order_number=6001
)
OrderItem.objects.create(
order=order_to_complete,
product=self.product,
quantity=Decimal('20.00'),
price=Decimal('10.00')
)
order_to_cancel = Order.objects.create(
customer=self.customer,
status=self.status_draft,
total_amount=Decimal('150.00'),
order_number=6002
)
OrderItem.objects.create(
order=order_to_cancel,
product=self.product,
quantity=Decimal('15.00'),
price=Decimal('10.00')
)
time.sleep(0.5) # Ждём создания резервов
# Проверяем начальное состояние
initial_stock = Stock.objects.get(product=self.product, warehouse=self.warehouse)
self.assertEqual(
initial_stock.quantity_reserved,
Decimal('35.00'), # 20 + 15
f"[TEST 6] Начальный резерв должен быть 35, но найдено {initial_stock.quantity_reserved}"
)
# Результаты параллельного выполнения
results = {'errors': [], 'new_order_id': None}
barrier = threading.Barrier(3)
def create_new_order():
"""Поток 1: Создаёт новый заказ"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.create(
customer=Customer.objects.first(),
status=OrderStatus.objects.get(code='draft'),
total_amount=Decimal('100.00'),
order_number=6003
)
OrderItem.objects.create(
order=order,
product=Product.objects.first(),
quantity=Decimal('10.00'),
price=Decimal('10.00')
)
results['new_order_id'] = order.id
except Exception as e:
results['errors'].append(('CREATE', str(e)))
def complete_order():
"""Поток 2: Завершает заказ"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_to_complete.id)
order.status = OrderStatus.objects.get(code='completed')
order.save()
except Exception as e:
results['errors'].append(('COMPLETE', str(e)))
def cancel_order():
"""Поток 3: Отменяет заказ"""
try:
with schema_context('test_concurrent'):
barrier.wait()
order = Order.objects.get(id=order_to_cancel.id)
order.status = OrderStatus.objects.get(code='cancelled')
order.save()
except Exception as e:
results['errors'].append(('CANCEL', str(e)))
# Запускаем все 3 потока
thread_create = threading.Thread(target=create_new_order)
thread_complete = threading.Thread(target=complete_order)
thread_cancel = threading.Thread(target=cancel_order)
thread_create.start()
thread_complete.start()
thread_cancel.start()
thread_create.join(timeout=15)
thread_complete.join(timeout=15)
thread_cancel.join(timeout=15)
# Ждём завершения всех сигналов
time.sleep(0.5)
# ========== ПРОВЕРКИ ==========
# 1. Проверяем отсутствие ошибок
self.assertEqual(
len(results['errors']),
0,
f"[TEST 6] Не должно быть ошибок. Ошибки: {results['errors']}"
)
# 2. Проверяем что новый заказ создан
self.assertIsNotNone(
results['new_order_id'],
"[TEST 6] Новый заказ должен быть создан"
)
new_order = Order.objects.get(id=results['new_order_id'])
print(f"\n[DEBUG] Новый заказ: #{new_order.order_number}, статус={new_order.status.code}")
# 3. Проверяем Sale для завершённого заказа
completed_sales = Sale.objects.filter(order=order_to_complete)
self.assertEqual(
completed_sales.count(),
1,
f"[TEST 6] Должен быть 1 Sale для завершённого заказа, но найдено {completed_sales.count()}"
)
print(f"\n[DEBUG] Sale для завершённого заказа: quantity={completed_sales.first().quantity}")
# 4. Проверяем резервы
all_reservations = Reservation.objects.filter(
order_item__order__in=[order_to_complete, order_to_cancel, new_order]
)
print(f"\n[DEBUG] Резервы:")
for res in all_reservations:
print(f" - Reservation #{res.id}: order={res.order_item.order.order_number}, status={res.status}, quantity={res.quantity}")
# Резерв завершённого заказа должен быть converted_to_sale
completed_reservations = Reservation.objects.filter(order_item__order=order_to_complete)
for res in completed_reservations:
self.assertEqual(
res.status,
'converted_to_sale',
f"[TEST 6] Резерв завершённого заказа должен использовать converted_to_sale, но {res.status}"
)
# Резерв отменённого заказа должен быть released
cancelled_reservations = Reservation.objects.filter(order_item__order=order_to_cancel)
for res in cancelled_reservations:
self.assertEqual(
res.status,
'released',
f"[TEST 6] Резерв отменённого заказа должен быть released, но {res.status}"
)
# Резерв нового заказа должен быть reserved
new_reservations = Reservation.objects.filter(order_item__order=new_order)
for res in new_reservations:
self.assertEqual(
res.status,
'reserved',
f"[TEST 6] Резерв нового заказа должен быть reserved, но {res.status}"
)
# 5. Stock должен обновиться корректно
self.stock.refresh_from_batches()
self.stock.refresh_from_db()
print(f"\n[DEBUG] Stock после смешанных операций:")
print(f" quantity_available: {self.stock.quantity_available}")
print(f" quantity_reserved: {self.stock.quantity_reserved}")
print(f" quantity_free: {self.stock.quantity_free}")
# Проверяем партии
batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse, is_active=True)
print(f"\n[DEBUG] Партии:")
for batch in batches:
print(f" - Batch #{batch.id}: quantity={batch.quantity}")
# Ожидаем:
# - Начально: 100
# - Завершён заказ на 20: 100 - 20 = 80
# - Отменён заказ на 15: резерв освобождён
# - Создан заказ на 10: резерв 10
# quantity_available = 80, quantity_reserved = 10, quantity_free = 70
self.assertEqual(
self.stock.quantity_available,
Decimal('80.00'), # 100 - 20 (продано)
f"[TEST 6] quantity_available должно быть 80, но найдено {self.stock.quantity_available}"
)
self.assertEqual(
self.stock.quantity_reserved,
Decimal('10.00'), # Резерв нового заказа
f"[TEST 6] quantity_reserved должно быть 10, но найдено {self.stock.quantity_reserved}"
)
self.assertEqual(
self.stock.quantity_free,
Decimal('70.00'), # 80 - 10
f"[TEST 6] quantity_free должно быть 70, но найдено {self.stock.quantity_free}"
)
print("\n[OK] ТЕСТ #6 ПРОЙДЕН: Смешанные параллельные операции корректны")