2
0
mirror of https://github.com/inventree/InvenTree.git synced 2026-05-28 03:49:20 +00:00

additional unit testing for further coverage

This commit is contained in:
Oliver Walters
2026-05-23 06:40:58 +00:00
parent 81d7d13096
commit 0c953b9795
+296 -7
View File
@@ -27,6 +27,7 @@ from InvenTree.unit_test import (
from order.models import PurchaseOrder, PurchaseOrderLineItem
from part.models import BomItem, BomItemSubstitute, Part, PartTestTemplate
from stock.models import StockItem, StockItemTestResult, StockLocation
from stock.status_codes import StockStatus
from users.models import Owner
logger = structlog.get_logger('inventree')
@@ -557,14 +558,29 @@ class BuildTest(BuildTestBase):
bo.clean()
def test_cancel(self):
"""Test cancellation of the build."""
# TODO
"""
self.allocate_stock(50, 50, 200, self.output_1)
self.build.cancel_build(None)
"""Test build cancellation: status is updated and allocations are removed by default."""
self.build.issue_build()
self.assertEqual(BuildItem.objects.count(), 0)
"""
self.allocate_stock(None, {self.stock_1_2: 50})
self.assertGreater(self.build.allocated_stock.count(), 0)
initial_output_count = self.build.build_outputs.filter(is_building=True).count()
self.assertGreater(initial_output_count, 0)
self.build.cancel_build(None)
self.build.refresh_from_db()
self.assertEqual(self.build.status, BuildStatus.CANCELLED)
# Allocations removed (but stock not consumed) by default
self.assertEqual(self.build.allocated_stock.count(), 0)
self.assertIsNone(StockItem.objects.get(pk=self.stock_1_2.pk).consumed_by)
# Incomplete outputs preserved by default (remove_incomplete_outputs=False)
self.assertEqual(
self.build.build_outputs.filter(is_building=True).count(),
initial_output_count,
)
def test_complete(self):
"""Test completion of a build output."""
@@ -1107,3 +1123,276 @@ class ExternalBuildTest(InvenTreeAPITestCase):
# Filter by 'not external'
response = self.get(url, {'external': 'false'})
self.assertEqual(len(response.data), 2)
class BuildTaskTests(BuildTestBase):
"""Direct unit tests for the background task functions in build/tasks.py.
These tests call task functions directly (synchronously) to verify the
business logic they encapsulate, independently of the API and offload mechanism.
"""
def setUp(self):
"""Create a stock location available to all task tests."""
super().setUp()
self.location = StockLocation.objects.create(name='Task Test Location')
def allocate_stock(self, output, allocations):
"""Create BuildItem allocations against self.build for the given output."""
items_to_create = []
for item, quantity in allocations.items():
line = BuildLine.objects.filter(
build=self.build, bom_item__sub_part=item.part
).first()
items_to_create.append(
BuildItem(
build_line=line,
stock_item=item,
quantity=quantity,
install_into=output,
)
)
BuildItem.objects.bulk_create(items_to_create)
def _setup_complete_build(self):
"""Helper: allocate stock fully and complete all outputs so the build is ready to complete."""
self.stock_1_1.quantity = 1000
self.stock_1_1.save()
self.stock_2_1.quantity = 30
self.stock_2_1.save()
self.build.issue_build()
# Allocate untracked parts
self.allocate_stock(
None, {self.stock_1_1: 50, self.stock_1_2: 10, self.stock_2_1: 30}
)
# Allocate tracked parts to each output
self.allocate_stock(self.output_1, {self.stock_3_1: 6})
self.allocate_stock(self.output_2, {self.stock_3_1: 14})
self.build.complete_build_output(self.output_1, None)
self.build.complete_build_output(self.output_2, None)
# -----------------------------------------------------------------------
# cancel_build task
# -----------------------------------------------------------------------
def test_cancel_task_discards_allocations(self):
"""cancel_build with remove_allocated_stock=False: allocations deleted, stock not consumed."""
self.build.issue_build()
self.allocate_stock(None, {self.stock_1_2: 50})
self.assertGreater(self.build.allocated_stock.count(), 0)
build.tasks.cancel_build(
self.build.pk, self.user.pk, remove_allocated_stock=False
)
# BuildItem rows gone
self.assertEqual(self.build.allocated_stock.count(), 0)
# Stock item was NOT consumed
self.assertIsNone(StockItem.objects.get(pk=self.stock_1_2.pk).consumed_by)
def test_cancel_task_consumes_allocations(self):
"""cancel_build with remove_allocated_stock=True: stock items are marked consumed."""
self.build.issue_build()
self.allocate_stock(None, {self.stock_1_2: 50})
build.tasks.cancel_build(
self.build.pk, self.user.pk, remove_allocated_stock=True
)
# All BuildItem rows gone
self.assertEqual(self.build.allocated_stock.count(), 0)
# The allocated (non-trackable) stock was consumed
self.assertGreater(self.build.consumed_stock.count(), 0)
def test_cancel_task_removes_incomplete_outputs(self):
"""cancel_build with remove_incomplete_outputs=True: in-progress outputs are deleted."""
self.build.issue_build()
initial_count = self.build.build_outputs.filter(is_building=True).count()
self.assertGreater(initial_count, 0)
build.tasks.cancel_build(
self.build.pk, self.user.pk, remove_incomplete_outputs=True
)
self.assertEqual(self.build.build_outputs.filter(is_building=True).count(), 0)
def test_cancel_task_preserves_incomplete_outputs(self):
"""cancel_build with remove_incomplete_outputs=False: in-progress outputs are kept."""
self.build.issue_build()
initial_count = self.build.build_outputs.filter(is_building=True).count()
self.assertGreater(initial_count, 0)
build.tasks.cancel_build(
self.build.pk, self.user.pk, remove_incomplete_outputs=False
)
self.assertEqual(
self.build.build_outputs.filter(is_building=True).count(), initial_count
)
# -----------------------------------------------------------------------
# complete_build task
# -----------------------------------------------------------------------
@override_settings(
TESTING_TABLE_EVENTS=True,
PLUGIN_TESTING_EVENTS=True,
PLUGIN_TESTING_EVENTS_ASYNC=True,
)
def test_complete_build_task_triggers_event(self):
"""complete_build task fires the BuildEvents.COMPLETED event."""
from django_q.models import OrmQ
from build.events import BuildEvents
set_global_setting('ENABLE_PLUGINS_EVENTS', True)
OrmQ.objects.all().delete()
self._setup_complete_build()
self.build.complete_build(self.user)
task = findOffloadedEvent(BuildEvents.COMPLETED, matching_kwargs=['id'])
self.assertIsNotNone(task)
self.assertEqual(task.kwargs()['id'], self.build.pk)
set_global_setting('ENABLE_PLUGINS_EVENTS', False)
def test_complete_build_task_trim_stock(self):
"""complete_build with trim_allocated_stock=True removes over-allocations before consuming."""
self.stock_1_2.quantity = 100
self.stock_1_2.save()
self.stock_2_1.quantity = 30
self.stock_2_1.save()
self.build.issue_build()
# Over-allocate sub_part_1: need 50, allocate 100
self.allocate_stock(None, {self.stock_1_2: 100, self.stock_2_1: 30})
self.allocate_stock(self.output_1, {self.stock_3_1: 6})
self.allocate_stock(self.output_2, {self.stock_3_1: 14})
self.assertTrue(self.build.is_overallocated())
self.build.complete_build_output(self.output_1, None)
self.build.complete_build_output(self.output_2, None)
self.assertTrue(self.build.can_complete)
self.build.complete_build(self.user, trim_allocated_stock=True)
self.build.refresh_from_db()
self.assertEqual(self.build.status, BuildStatus.COMPLETE)
# Only 50 units of sub_part_1 should have been consumed (not 100)
consumed_qty = StockItem.objects.filter(
consumed_by=self.build, part=self.sub_part_1
).aggregate(total=Sum('quantity'))['total']
self.assertEqual(consumed_qty, 50)
# -----------------------------------------------------------------------
# delete_build_outputs task
# -----------------------------------------------------------------------
def test_delete_build_outputs_skips_missing_id(self):
"""delete_build_outputs silently skips nonexistent output IDs and deletes valid ones."""
from build.tasks import delete_build_outputs
# Create an output directly to avoid serial-number requirements on the trackable assembly
output = StockItem.objects.create(
part=self.assembly, quantity=3, is_building=True, build=self.build
)
real_id = output.pk
# Mix a valid ID with a nonexistent one — must not raise
delete_build_outputs(self.build.pk, [real_id, 99999])
self.assertFalse(StockItem.objects.filter(pk=real_id).exists())
# -----------------------------------------------------------------------
# scrap_build_outputs task
# -----------------------------------------------------------------------
def test_scrap_build_outputs_discard_allocations(self):
"""scrap_build_outputs with discard_allocations=True removes allocations without consuming stock."""
from build.tasks import scrap_build_outputs
self.build.issue_build()
# Allocate tracked stock to output_1
self.allocate_stock(self.output_1, {self.stock_3_1: 6})
self.assertGreater(self.output_1.items_to_install.count(), 0)
scrap_build_outputs(
self.build.pk,
[{'output_id': self.output_1.pk, 'quantity': self.output_1.quantity}],
location_id=self.location.pk,
notes='discard test',
discard_allocations=True,
user_id=None,
)
self.output_1.refresh_from_db()
self.assertEqual(self.output_1.status, StockStatus.REJECTED.value)
self.assertFalse(self.output_1.is_building)
# Allocation rows should be gone
self.assertEqual(self.output_1.items_to_install.count(), 0)
# Stock was discarded (not consumed or installed)
self.stock_3_1.refresh_from_db()
self.assertIsNone(self.stock_3_1.consumed_by)
self.assertIsNone(self.stock_3_1.belongs_to)
def test_scrap_build_outputs_consume_allocations(self):
"""scrap_build_outputs with discard_allocations=False (default) consumes/installs stock."""
from build.tasks import scrap_build_outputs
self.build.issue_build()
self.allocate_stock(self.output_1, {self.stock_3_1: 6})
scrap_build_outputs(
self.build.pk,
[{'output_id': self.output_1.pk, 'quantity': self.output_1.quantity}],
location_id=self.location.pk,
notes='consume test',
discard_allocations=False,
user_id=None,
)
self.output_1.refresh_from_db()
self.assertEqual(self.output_1.status, StockStatus.REJECTED.value)
self.assertFalse(self.output_1.is_building)
# complete_allocation splits stock_3_1 and installs the split piece into output_1
# (stock_3_1 quantity=1000, only 6 allocated, so a child item is created)
self.assertTrue(
StockItem.objects.filter(belongs_to=self.output_1).exists(),
'Expected a tracked stock item to be installed into the output',
)
# -----------------------------------------------------------------------
# complete_build_outputs task
# -----------------------------------------------------------------------
def test_complete_build_outputs_with_status_none(self):
"""complete_build_outputs with status=None falls back to StockStatus.OK in the model."""
from build.tasks import complete_build_outputs
self.build.issue_build()
# Create output directly to avoid serial-number requirements on the trackable assembly
output = StockItem.objects.create(
part=self.assembly, quantity=5, is_building=True, build=self.build
)
complete_build_outputs(
self.build.pk,
[{'output_id': output.pk}],
location_id=self.location.pk,
status=None,
notes='status none test',
user_id=None,
)
output.refresh_from_db()
self.assertFalse(output.is_building)
# status=None should resolve to StockStatus.OK (the model default)
self.assertEqual(output.status, StockStatus.OK.value)