feat(inventory): add support for selling in negative stock

Implement functionality to allow sales even when stock is insufficient, tracking pending quantities and resolving them when new stock arrives via incoming documents. This includes new fields in Sale model (is_pending_cost, pending_quantity), updates to batch manager for negative write-offs, and signal handlers for automatic processing.

- Add is_pending_cost and pending_quantity fields to Sale model
- Modify write_off_by_fifo to support allow_negative flag and return pending quantity
- Update incoming document service to allocate pending sales to new batches
- Enhance sale processor and signals to handle pending sales
- Remove outdated tests.py file
- Add migration for new Sale fields
This commit is contained in:
2026-01-04 12:27:10 +03:00
parent 123f330a26
commit a03f3df086
7 changed files with 223 additions and 570 deletions

View File

@@ -0,0 +1,30 @@
# Generated by Django 5.0.10 on 2026-01-04 06:53
from decimal import Decimal
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('inventory', '0002_initial'),
('orders', '0002_initial'),
('products', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='sale',
name='is_pending_cost',
field=models.BooleanField(default=False, help_text="True если продажа создана без партий (продажа 'в минус')", verbose_name='Ожидает себестоимости'),
),
migrations.AddField(
model_name='sale',
name='pending_quantity',
field=models.DecimalField(decimal_places=3, default=Decimal('0'), help_text='Количество, ожидающее привязки к партиям при приёмке', max_digits=10, verbose_name='Ожидающее количество'),
),
migrations.AddIndex(
model_name='sale',
index=models.Index(fields=['is_pending_cost'], name='inventory_s_is_pend_81a3db_idx'),
),
]

View File

@@ -147,6 +147,20 @@ class Sale(models.Model):
help_text="Название единицы продажи на момент продажи" help_text="Название единицы продажи на момент продажи"
) )
# === ПОЛЯ ДЛЯ ПРОДАЖ "В МИНУС" ===
is_pending_cost = models.BooleanField(
default=False,
verbose_name="Ожидает себестоимости",
help_text="True если продажа создана без партий (продажа 'в минус')"
)
pending_quantity = models.DecimalField(
max_digits=10,
decimal_places=3,
default=Decimal('0'),
verbose_name="Ожидающее количество",
help_text="Количество, ожидающее привязки к партиям при приёмке"
)
class Meta: class Meta:
verbose_name = "Продажа" verbose_name = "Продажа"
verbose_name_plural = "Продажи" verbose_name_plural = "Продажи"
@@ -155,6 +169,7 @@ class Sale(models.Model):
models.Index(fields=['product', 'warehouse']), models.Index(fields=['product', 'warehouse']),
models.Index(fields=['date']), models.Index(fields=['date']),
models.Index(fields=['order']), models.Index(fields=['order']),
models.Index(fields=['is_pending_cost']),
] ]
def __str__(self): def __str__(self):
@@ -727,21 +742,33 @@ class Stock(models.Model):
def refresh_from_batches(self): def refresh_from_batches(self):
""" """
Пересчитать остатки из StockBatch. Пересчитать остатки из StockBatch.
Можно вызвать для синхронизации после операций. Учитывает "ожидающие" продажи (продажи "в минус").
quantity_available может быть отрицательным!
""" """
# Сумма из активных партий
total_qty = StockBatch.objects.filter( total_qty = StockBatch.objects.filter(
product=self.product, product=self.product,
warehouse=self.warehouse, warehouse=self.warehouse,
is_active=True is_active=True
).aggregate(models.Sum('quantity'))['quantity__sum'] or Decimal('0') ).aggregate(models.Sum('quantity'))['quantity__sum'] or Decimal('0')
# Учитываем ожидающие продажи (уменьшают доступное количество)
pending_sales = Sale.objects.filter(
product=self.product,
warehouse=self.warehouse,
is_pending_cost=True
).aggregate(models.Sum('pending_quantity'))['pending_quantity__sum'] or Decimal('0')
# quantity_available может быть отрицательным при продажах "в минус"
self.quantity_available = total_qty - pending_sales
# Резервы остаются как есть
total_reserved = Reservation.objects.filter( total_reserved = Reservation.objects.filter(
product=self.product, product=self.product,
warehouse=self.warehouse, warehouse=self.warehouse,
status='reserved' status='reserved'
).aggregate(models.Sum('quantity_base'))['quantity_base__sum'] or Decimal('0') ).aggregate(models.Sum('quantity_base'))['quantity_base__sum'] or Decimal('0')
self.quantity_available = total_qty
self.quantity_reserved = total_reserved self.quantity_reserved = total_reserved
self.save() self.save()

View File

@@ -70,11 +70,10 @@ class StockBatchManager:
return batch return batch
@staticmethod @staticmethod
def write_off_by_fifo(product, warehouse, quantity_to_write_off, exclude_order=None, exclude_transformation=None): def write_off_by_fifo(product, warehouse, quantity_to_write_off, exclude_order=None, exclude_transformation=None, allow_negative=False):
""" """
Списать товар по FIFO (старые партии первыми). Списать товар по FIFO (старые партии первыми).
ВАЖНО: Учитывает зарезервированное количество товара. ВАЖНО: Учитывает зарезервированное количество товара.
Возвращает список (batch, written_off_quantity) кортежей.
Args: Args:
product: объект Product product: объект Product
@@ -86,12 +85,16 @@ class StockBatchManager:
exclude_transformation: (опционально) объект Transformation - исключить резервы этой трансформации из расчёта. exclude_transformation: (опционально) объект Transformation - исключить резервы этой трансформации из расчёта.
Используется при переводе трансформации в 'completed', когда резервы Используется при переводе трансформации в 'completed', когда резервы
трансформации ещё не переведены в 'converted_to_transformation'. трансформации ещё не переведены в 'converted_to_transformation'.
allow_negative: (опционально) bool - разрешить продажи "в минус".
Если True и товара не хватает, возвращает pending_quantity вместо исключения.
Returns: Returns:
list: [(batch, qty_written), ...] - какие партии и сколько списано tuple: (allocations, pending_quantity)
- allocations: [(batch, qty_written), ...] - какие партии и сколько списано
- pending_quantity: Decimal - сколько не удалось списать (для продаж "в минус")
Raises: Raises:
ValueError: если недостаточно свободного товара на складе ValueError: если недостаточно свободного товара на складе и allow_negative=False
""" """
from inventory.models import Reservation from inventory.models import Reservation
@@ -191,16 +194,21 @@ class StockBatchManager:
batch.save(update_fields=['is_active']) batch.save(update_fields=['is_active'])
if remaining > 0: if remaining > 0:
raise ValueError( if allow_negative:
f"Недостаточно СВОБОДНОГО товара на складе. " # Возвращаем сколько не удалось списать (для продаж "в минус")
f"Требуется {quantity_to_write_off}, доступно {quantity_to_write_off - remaining}. " StockBatchManager.refresh_stock_cache(product, warehouse)
f"(Общий резерв: {total_reserved})" return (allocations, remaining)
) else:
raise ValueError(
f"Недостаточно СВОБОДНОГО товара на складе. "
f"Требуется {quantity_to_write_off}, доступно {quantity_to_write_off - remaining}. "
f"(Общий резерв: {total_reserved})"
)
# Обновляем кеш остатков # Обновляем кеш остатков
StockBatchManager.refresh_stock_cache(product, warehouse) StockBatchManager.refresh_stock_cache(product, warehouse)
return allocations return (allocations, Decimal('0'))
@staticmethod @staticmethod
def transfer_batch(batch, to_warehouse, quantity): def transfer_batch(batch, to_warehouse, quantity):

View File

@@ -203,6 +203,13 @@ class IncomingDocumentService:
batches_created.append(stock_batch) batches_created.append(stock_batch)
total_cost += item.total_cost total_cost += item.total_cost
# Обрабатываем ожидающие продажи "в минус" для этого товара
cls._process_pending_sales(
product=item.product,
warehouse=document.warehouse,
new_batch=stock_batch
)
# Обновляем или создаем запись в Stock # Обновляем или создаем запись в Stock
stock, _ = Stock.objects.get_or_create( stock, _ = Stock.objects.get_or_create(
product=item.product, product=item.product,
@@ -224,6 +231,58 @@ class IncomingDocumentService:
'total_cost': total_cost 'total_cost': total_cost
} }
@classmethod
def _process_pending_sales(cls, product, warehouse, new_batch):
"""
Привязать ожидающие продажи "в минус" к новой партии по FIFO.
Себестоимость берётся из этой партии.
Args:
product: объект Product
warehouse: объект Warehouse
new_batch: объект StockBatch (только что созданная партия)
"""
from inventory.models import Sale, SaleBatchAllocation
# Ожидающие продажи по дате (старые первыми - FIFO)
pending_sales = Sale.objects.filter(
product=product,
warehouse=warehouse,
is_pending_cost=True,
pending_quantity__gt=0
).order_by('date')
available_qty = new_batch.quantity
for sale in pending_sales:
if available_qty <= 0:
break
qty_to_allocate = min(sale.pending_quantity, available_qty)
# Создаем SaleBatchAllocation с себестоимостью из приёмки
SaleBatchAllocation.objects.create(
sale=sale,
batch=new_batch,
quantity=qty_to_allocate,
cost_price=new_batch.cost_price
)
# Уменьшаем pending в Sale
sale.pending_quantity -= qty_to_allocate
if sale.pending_quantity <= 0:
sale.is_pending_cost = False
sale.save(update_fields=['pending_quantity', 'is_pending_cost'])
# Уменьшаем партию (товар уже был "продан" ранее)
new_batch.quantity -= qty_to_allocate
available_qty -= qty_to_allocate
# Сохраняем партию с оставшимся количеством
if new_batch.quantity <= 0:
new_batch.is_active = False
new_batch.save(update_fields=['quantity', 'is_active'])
@classmethod @classmethod
@transaction.atomic @transaction.atomic
def cancel_document(cls, document): def cancel_document(cls, document):

View File

@@ -104,33 +104,38 @@ class SaleProcessor:
document_number=document_number, document_number=document_number,
processed=True, # Сразу отмечаем как обработанную processed=True, # Сразу отмечаем как обработанную
sales_unit=sales_unit, sales_unit=sales_unit,
unit_name_snapshot=unit_name_snapshot unit_name_snapshot=unit_name_snapshot,
is_pending_cost=False,
pending_quantity=Decimal('0')
) )
try: # Списываем товар по FIFO в БАЗОВЫХ единицах
# Списываем товар по FIFO в БАЗОВЫХ единицах # exclude_order позволяет не считать резервы этого заказа как занятые
# exclude_order позволяет не считать резервы этого заказа как занятые # (резервы переводятся в converted_to_sale ПОСЛЕ создания Sale)
# (резервы переводятся в converted_to_sale ПОСЛЕ создания Sale) # allow_negative=True разрешает продажи "в минус"
allocations = StockBatchManager.write_off_by_fifo( allocations, pending = StockBatchManager.write_off_by_fifo(
product, warehouse, quantity_base, exclude_order=order product, warehouse, quantity_base, exclude_order=order, allow_negative=True
)
# Фиксируем распределение для аудита
for batch, qty_allocated in allocations:
SaleBatchAllocation.objects.create(
sale=sale,
batch=batch,
quantity=qty_allocated,
cost_price=batch.cost_price
) )
# Фиксируем распределение для аудита # Если есть pending - это продажа "в минус"
for batch, qty_allocated in allocations: if pending > 0:
SaleBatchAllocation.objects.create( sale.is_pending_cost = True
sale=sale, sale.pending_quantity = pending
batch=batch, sale.save(update_fields=['is_pending_cost', 'pending_quantity'])
quantity=qty_allocated,
cost_price=batch.cost_price
)
# processed уже установлен в True при создании Sale # Обновляем Stock (теперь учитывает pending_sales)
return sale StockBatchManager.refresh_stock_cache(product, warehouse)
except ValueError as e: return sale
# Если ошибка при списании - удаляем Sale и пробрасываем исключение
sale.delete()
raise
@staticmethod @staticmethod
def get_sale_cost_analysis(sale): def get_sale_cost_analysis(sale):

View File

@@ -574,14 +574,27 @@ def rollback_sale_on_status_change(sender, instance, created, **kwargs):
try: try:
for sale in sales: for sale in sales:
# Запоминаем product и warehouse для обновления Stock
# (важно для pending продаж, которые учитываются в Stock.refresh_from_batches())
stocks_to_refresh.add((sale.product, sale.warehouse))
# Находим все распределения партий # Находим все распределения партий
allocations = SaleBatchAllocation.objects.filter(sale=sale).select_related('batch') allocations = SaleBatchAllocation.objects.filter(sale=sale).select_related('batch')
if not allocations.exists(): if not allocations.exists():
logger.warning( # Для pending продаж (is_pending_cost=True) это нормально -
f"⚠ Sale {sale.id} не имеет SaleBatchAllocation. " # партии ещё не были созданы. При удалении Sale
f"Удаляем Sale без восстановления товара." # Stock.refresh_from_batches() автоматически уберёт pending из расчёта.
) if sale.is_pending_cost:
logger.info(
f" Sale {sale.id} - pending продажа (в минус). "
f"Удаляем Sale, Stock обновится автоматически."
)
else:
logger.warning(
f"⚠ Sale {sale.id} не имеет SaleBatchAllocation. "
f"Удаляем Sale без восстановления товара."
)
sale.delete() sale.delete()
continue continue
@@ -1190,34 +1203,36 @@ def process_sale_fifo(sender, instance, created, **kwargs):
if instance.processed: if instance.processed:
return return
try: # Списываем товар по FIFO с allow_negative=True для поддержки продаж "в минус"
# Списываем товар по FIFO allocations, pending = StockBatchManager.write_off_by_fifo(
allocations = StockBatchManager.write_off_by_fifo( instance.product,
instance.product, instance.warehouse,
instance.warehouse, instance.quantity,
instance.quantity allow_negative=True
)
# Фиксируем распределение для аудита
for batch, qty_allocated in allocations:
SaleBatchAllocation.objects.create(
sale=instance,
batch=batch,
quantity=qty_allocated,
cost_price=batch.cost_price
) )
# Фиксируем распределение для аудита # Если есть pending - это продажа "в минус"
for batch, qty_allocated in allocations: update_fields = ['processed']
SaleBatchAllocation.objects.create( instance.processed = True
sale=instance, if pending > 0:
batch=batch, instance.is_pending_cost = True
quantity=qty_allocated, instance.pending_quantity = pending
cost_price=batch.cost_price update_fields.extend(['is_pending_cost', 'pending_quantity'])
)
# Отмечаем продажу как обработанную (используем update() чтобы избежать повторного срабатывания сигнала) # Отмечаем продажу как обработанную (используем update() чтобы избежать повторного срабатывания сигнала)
Sale.objects.filter(pk=instance.pk).update(processed=True) Sale.objects.filter(pk=instance.pk).update(**{field: getattr(instance, field) for field in update_fields})
# Stock уже обновлен в StockBatchManager.write_off_by_fifo() → refresh_stock_cache() # Stock уже обновлен в StockBatchManager.write_off_by_fifo() → refresh_stock_cache()
# Не нужно вызывать ещё раз чтобы избежать race condition # Не нужно вызывать ещё раз чтобы избежать race condition
except ValueError as e:
# Логируем ошибку, но не прерываем процесс
import logging
logger = logging.getLogger(__name__)
logger.error(f"Ошибка при обработке Sale {instance.id}: {str(e)}")
@receiver(pre_delete, sender=Sale) @receiver(pre_delete, sender=Sale)
@@ -1227,11 +1242,21 @@ def update_order_on_sale_delete(sender, instance, **kwargs):
Вызывается ДО удаления, чтобы можно было получить order. Вызывается ДО удаления, чтобы можно было получить order.
""" """
if instance.order: if instance.order:
# Используем on_commit чтобы обновить после завершения транзакции # Сохраняем order_id для использования в post_delete
from django.db import transaction # (instance.order может быть недоступен после удаления)
transaction.on_commit( instance._order_for_update = instance.order
lambda: update_is_returned_flag(instance.order)
)
@receiver(post_delete, sender=Sale)
def update_order_on_sale_post_delete(sender, instance, **kwargs):
"""
Обновляет флаг is_returned заказа ПОСЛЕ удаления Sale.
Использует order, сохранённый в pre_delete.
"""
order = getattr(instance, '_order_for_update', None)
if order:
# Обновляем флаг напрямую после удаления Sale
update_is_returned_flag(order)
# Сигнал process_inventory_reconciliation удален # Сигнал process_inventory_reconciliation удален
@@ -1582,11 +1607,12 @@ def process_transformation_on_complete(sender, instance, created, **kwargs):
total_input_cost = Decimal('0') total_input_cost = Decimal('0')
for trans_input in instance.inputs.all(): for trans_input in instance.inputs.all():
allocations = StockBatchManager.write_off_by_fifo( allocations, pending = StockBatchManager.write_off_by_fifo(
product=trans_input.product, product=trans_input.product,
warehouse=instance.warehouse, warehouse=instance.warehouse,
quantity_to_write_off=trans_input.quantity, quantity_to_write_off=trans_input.quantity,
exclude_transformation=instance # Исключаем резервы этой трансформации exclude_transformation=instance, # Исключаем резервы этой трансформации
allow_negative=False # Трансформация требует наличия товара
) )
# Суммируем себестоимость списанного # Суммируем себестоимость списанного

View File

@@ -1,502 +0,0 @@
"""
Тесты для складского учета с FIFO логикой.
"""
from decimal import Decimal
from django.test import TestCase
from products.models import Product
from inventory.models import Warehouse, StockBatch, Sale, Transfer, Inventory, InventoryLine, Reservation, Stock
from inventory.services import StockBatchManager, SaleProcessor, InventoryProcessor
from orders.models import Order, OrderItem
from customers.models import Customer
class WarehouseModelTest(TestCase):
"""Тесты модели Warehouse."""
def setUp(self):
self.warehouse = Warehouse.objects.create(
name='Основной склад',
description='Главный склад компании'
)
def test_warehouse_creation(self):
"""Тест создания склада."""
self.assertEqual(self.warehouse.name, 'Основной склад')
self.assertTrue(self.warehouse.is_active)
self.assertIsNotNone(self.warehouse.created_at)
def test_warehouse_str(self):
"""Тест строкового представления склада."""
self.assertEqual(str(self.warehouse), 'Основной склад')
class StockBatchManagerFIFOTest(TestCase):
"""Тесты FIFO логики для партий товаров."""
def setUp(self):
"""Подготовка тестовых данных."""
# Создаем склад
self.warehouse = Warehouse.objects.create(name='Склад 1')
# Создаем товар
self.product = Product.objects.create(
name='Роза красная',
cost_price=Decimal('10.00'),
sale_price=Decimal('30.00')
)
def test_create_batch(self):
"""Тест создания новой партии."""
batch = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('100'),
cost_price=Decimal('10.00')
)
self.assertEqual(batch.quantity, Decimal('100'))
self.assertEqual(batch.cost_price, Decimal('10.00'))
self.assertTrue(batch.is_active)
def test_fifo_write_off_single_batch(self):
"""Тест FIFO списания из одной партии."""
# Создаем партию
batch = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('100'),
cost_price=Decimal('10.00')
)
# Списываем 50 шт
allocations = StockBatchManager.write_off_by_fifo(
product=self.product,
warehouse=self.warehouse,
quantity_to_write_off=Decimal('50')
)
# Проверяем результат
self.assertEqual(len(allocations), 1)
self.assertEqual(allocations[0][1], Decimal('50')) # qty_written
# Проверяем остаток в партии
batch.refresh_from_db()
self.assertEqual(batch.quantity, Decimal('50'))
self.assertTrue(batch.is_active)
def test_fifo_write_off_multiple_batches(self):
"""Тест FIFO списания из нескольких партий (старые первыми)."""
# Создаем 3 партии в разные моменты
batch1 = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('30'),
cost_price=Decimal('10.00') # Старейшая
)
batch2 = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('40'),
cost_price=Decimal('12.00')
)
batch3 = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('50'),
cost_price=Decimal('15.00') # Новейшая
)
# Списываем 100 шт (должно быть: вся batch1, вся batch2, 30 из batch3)
allocations = StockBatchManager.write_off_by_fifo(
product=self.product,
warehouse=self.warehouse,
quantity_to_write_off=Decimal('100')
)
# Проверяем FIFO порядок
self.assertEqual(len(allocations), 3)
self.assertEqual(allocations[0][0].id, batch1.id) # Первая списана batch1
self.assertEqual(allocations[0][1], Decimal('30')) # Всё из batch1
self.assertEqual(allocations[1][0].id, batch2.id) # Вторая списана batch2
self.assertEqual(allocations[1][1], Decimal('40')) # Всё из batch2
self.assertEqual(allocations[2][0].id, batch3.id) # Третья batch3
self.assertEqual(allocations[2][1], Decimal('30')) # 30 из batch3
# Проверяем остатки
batch1.refresh_from_db()
batch2.refresh_from_db()
batch3.refresh_from_db()
self.assertEqual(batch1.quantity, Decimal('0'))
self.assertFalse(batch1.is_active) # Деактивирована
self.assertEqual(batch2.quantity, Decimal('0'))
self.assertFalse(batch2.is_active)
self.assertEqual(batch3.quantity, Decimal('20'))
self.assertTrue(batch3.is_active)
def test_insufficient_stock_error(self):
"""Тест ошибки при недостаточном товаре на складе."""
batch = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('50'),
cost_price=Decimal('10.00')
)
# Пытаемся списать больше, чем есть
with self.assertRaises(ValueError) as context:
StockBatchManager.write_off_by_fifo(
product=self.product,
warehouse=self.warehouse,
quantity_to_write_off=Decimal('100')
)
self.assertIn('Недостаточно товара', str(context.exception))
def test_transfer_batch(self):
"""Тест перемещения товара между складами с сохранением цены."""
warehouse2 = Warehouse.objects.create(name='Склад 2')
# Создаем партию на первом складе
batch1 = StockBatchManager.create_batch(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('100'),
cost_price=Decimal('10.00')
)
# Переносим 40 шт на второй склад
new_batch = StockBatchManager.transfer_batch(
batch=batch1,
to_warehouse=warehouse2,
quantity=Decimal('40')
)
# Проверяем результаты
batch1.refresh_from_db()
self.assertEqual(batch1.quantity, Decimal('60'))
self.assertEqual(new_batch.warehouse, warehouse2)
self.assertEqual(new_batch.quantity, Decimal('40'))
self.assertEqual(new_batch.cost_price, Decimal('10.00')) # Цена сохранена!
class SaleProcessorTest(TestCase):
"""Тесты обработки продаж с FIFO списанием."""
def setUp(self):
self.warehouse = Warehouse.objects.create(name='Склад 1')
self.product = Product.objects.create(
name='Гвоздика',
cost_price=Decimal('5.00'),
sale_price=Decimal('20.00')
)
def test_create_sale_with_fifo(self):
"""Тест создания продажи с FIFO списанием."""
# Создаем партии
batch1 = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('30'), Decimal('5.00')
)
batch2 = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('50'), Decimal('6.00')
)
# Создаем продажу 40 шт
sale = SaleProcessor.create_sale(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('40'),
sale_price=Decimal('20.00')
)
# Проверяем Sale
self.assertTrue(sale.processed)
self.assertEqual(sale.quantity, Decimal('40'))
# Проверяем FIFO распределение
allocations = list(sale.batch_allocations.all())
self.assertEqual(len(allocations), 2)
self.assertEqual(allocations[0].quantity, Decimal('30')) # Всё из batch1
self.assertEqual(allocations[1].quantity, Decimal('10')) # 10 из batch2
def test_sale_cost_analysis(self):
"""Тест анализа себестоимости продажи."""
# Создаем партии с разными ценами
batch1 = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('30'), Decimal('5.00')
)
batch2 = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('50'), Decimal('10.00')
)
# Создаем продажу
sale = SaleProcessor.create_sale(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('40'),
sale_price=Decimal('25.00')
)
# Анализируем прибыль
analysis = SaleProcessor.get_sale_cost_analysis(sale)
# Проверяем финансы
# batch1: 30 * 5 = 150 себестоимость, 30 * 25 = 750 выручка
# batch2: 10 * 10 = 100 себестоимость, 10 * 25 = 250 выручка
# Итого: 250 себестоимость, 1000 выручка, 750 прибыль
self.assertEqual(analysis['total_cost'], Decimal('250'))
self.assertEqual(analysis['total_revenue'], Decimal('1000'))
self.assertEqual(analysis['total_profit'], Decimal('750'))
self.assertEqual(analysis['profit_margin'], Decimal('75.00')) # 750/1000*100
class InventoryProcessorTest(TestCase):
"""Тесты обработки инвентаризации."""
def setUp(self):
self.warehouse = Warehouse.objects.create(name='Склад 1')
self.product = Product.objects.create(
name='Тюльпан',
cost_price=Decimal('8.00'),
sale_price=Decimal('25.00')
)
def test_process_inventory_deficit(self):
"""Тест обработки недостачи при инвентаризации."""
# Создаем партию
batch = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('100'), Decimal('8.00')
)
# Создаем инвентаризацию
inventory = Inventory.objects.create(
warehouse=self.warehouse,
status='draft'
)
# Строка: в системе 100, по факту 85 (недостача 15)
line = InventoryLine.objects.create(
inventory=inventory,
product=self.product,
quantity_system=Decimal('100'),
quantity_fact=Decimal('85')
)
# Обрабатываем инвентаризацию
result = InventoryProcessor.process_inventory(inventory.id)
# Проверяем результат
self.assertEqual(result['processed_lines'], 1)
self.assertIsNotNone(result['writeoff_document'])
self.assertIsNone(result['incoming_document'])
# Проверяем, что создался документ списания (черновик)
writeoff_doc = result['writeoff_document']
self.assertEqual(writeoff_doc.status, 'draft')
self.assertEqual(writeoff_doc.inventory, inventory)
# Проверяем, что в документе есть позиция
items = writeoff_doc.items.all()
self.assertEqual(items.count(), 1)
self.assertEqual(items.first().product, self.product)
self.assertEqual(items.first().quantity, Decimal('15'))
# Проверяем, что документ еще не проведен - остаток не изменился
batch.refresh_from_db()
self.assertEqual(batch.quantity, Decimal('100')) # Остаток не изменился, т.к. документ не проведен
def test_process_inventory_surplus(self):
"""Тест обработки излишка при инвентаризации."""
# Создаем партию
batch = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('100'), Decimal('8.00')
)
# Создаем инвентаризацию
inventory = Inventory.objects.create(
warehouse=self.warehouse,
status='draft'
)
# Строка: в системе 100, по факту 120 (излишек 20)
line = InventoryLine.objects.create(
inventory=inventory,
product=self.product,
quantity_system=Decimal('100'),
quantity_fact=Decimal('120')
)
# Обрабатываем инвентаризацию
result = InventoryProcessor.process_inventory(inventory.id)
# Проверяем результат
self.assertEqual(result['processed_lines'], 1)
self.assertIsNone(result['writeoff_document'])
self.assertIsNotNone(result['incoming_document'])
# Проверяем, что создался документ оприходования (черновик)
incoming_doc = result['incoming_document']
self.assertEqual(incoming_doc.status, 'draft')
self.assertEqual(incoming_doc.inventory, inventory)
self.assertEqual(incoming_doc.receipt_type, 'inventory')
# Проверяем, что в документе есть позиция
items = incoming_doc.items.all()
self.assertEqual(items.count(), 1)
self.assertEqual(items.first().product, self.product)
self.assertEqual(items.first().quantity, Decimal('20'))
# Проверяем, что документ еще не проведен - новый StockBatch не создан
from inventory.models import StockBatch
batches = StockBatch.objects.filter(product=self.product, warehouse=self.warehouse)
self.assertEqual(batches.count(), 1) # Только исходная партия, новая не создана
class ReservationSignalsTest(TestCase):
"""Тесты автоматического резервирования через сигналы."""
def setUp(self):
self.warehouse = Warehouse.objects.create(name='Склад 1')
self.product = Product.objects.create(
name='Нарцисс',
cost_price=Decimal('6.00'),
sale_price=Decimal('18.00')
)
self.customer = Customer.objects.create(
name='Иван Иванов',
phone='+375291234567'
)
def test_reservation_on_order_create(self):
"""Тест создания резервирования при создании заказа."""
# Создаем заказ
order = Order.objects.create(
customer=self.customer,
order_number='ORD-20250101-0001',
delivery_type='courier'
)
# Добавляем товар в заказ
item = OrderItem.objects.create(
order=order,
product=self.product,
quantity=5,
price=Decimal('18.00')
)
# Проверяем, что резерв создан
reservations = Reservation.objects.filter(order_item=item)
self.assertEqual(reservations.count(), 1)
res = reservations.first()
self.assertEqual(res.quantity, Decimal('5'))
self.assertEqual(res.status, 'reserved')
def test_release_reservation_on_order_delete(self):
"""Тест освобождения резервирования при удалении заказа."""
# Создаем заказ с товаром
order = Order.objects.create(
customer=self.customer,
order_number='ORD-20250101-0002',
delivery_type='courier'
)
item = OrderItem.objects.create(
order=order,
product=self.product,
quantity=10,
price=Decimal('18.00')
)
# Проверяем, что резерв создан
res = Reservation.objects.get(order_item=item)
self.assertEqual(res.status, 'reserved')
# Удаляем заказ
order.delete()
# Проверяем, что резерв освобожден
res.refresh_from_db()
self.assertEqual(res.status, 'released')
self.assertIsNotNone(res.released_at)
class StockCacheTest(TestCase):
"""Тесты кеширования остатков в модели Stock."""
def setUp(self):
self.warehouse = Warehouse.objects.create(name='Склад 1')
self.product = Product.objects.create(
name='Лилия',
cost_price=Decimal('12.00'),
sale_price=Decimal('40.00')
)
def test_stock_refresh_from_batches(self):
"""Тест пересчета остатков из партий."""
# Создаем партии
batch1 = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('50'), Decimal('12.00')
)
batch2 = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('75'), Decimal('13.00')
)
# Получаем или создаем Stock
stock, created = Stock.objects.get_or_create(
product=self.product,
warehouse=self.warehouse
)
# Обновляем из батчей
stock.refresh_from_batches()
# Проверяем результат
self.assertEqual(stock.quantity_available, Decimal('125'))
def test_stock_quantity_free(self):
"""Тест расчета свободного количества."""
batch = StockBatchManager.create_batch(
self.product, self.warehouse,
Decimal('100'), Decimal('12.00')
)
# Создаем резерв
Reservation.objects.create(
product=self.product,
warehouse=self.warehouse,
quantity=Decimal('30'),
status='reserved'
)
# Получаем Stock и обновляем
stock, created = Stock.objects.get_or_create(
product=self.product,
warehouse=self.warehouse
)
stock.refresh_from_batches()
# Проверяем: доступно 100, зарезервировано 30, свободно 70
self.assertEqual(stock.quantity_available, Decimal('100'))
self.assertEqual(stock.quantity_reserved, Decimal('30'))
self.assertEqual(stock.quantity_free, Decimal('70'))