From 0c953b9795f3c2160e311a609d572362f2f88301 Mon Sep 17 00:00:00 2001 From: Oliver Walters Date: Sat, 23 May 2026 06:40:58 +0000 Subject: [PATCH] additional unit testing for further coverage --- src/backend/InvenTree/build/test_build.py | 303 +++++++++++++++++++++- 1 file changed, 296 insertions(+), 7 deletions(-) diff --git a/src/backend/InvenTree/build/test_build.py b/src/backend/InvenTree/build/test_build.py index bb1007dff2..5c571252ec 100644 --- a/src/backend/InvenTree/build/test_build.py +++ b/src/backend/InvenTree/build/test_build.py @@ -27,6 +27,7 @@ from InvenTree.unit_test import ( from order.models import PurchaseOrder, PurchaseOrderLineItem from part.models import BomItem, BomItemSubstitute, Part, PartTestTemplate from stock.models import StockItem, StockItemTestResult, StockLocation +from stock.status_codes import StockStatus from users.models import Owner logger = structlog.get_logger('inventree') @@ -557,14 +558,29 @@ class BuildTest(BuildTestBase): bo.clean() def test_cancel(self): - """Test cancellation of the build.""" - # TODO - """ - self.allocate_stock(50, 50, 200, self.output_1) - self.build.cancel_build(None) + """Test build cancellation: status is updated and allocations are removed by default.""" + self.build.issue_build() - self.assertEqual(BuildItem.objects.count(), 0) - """ + self.allocate_stock(None, {self.stock_1_2: 50}) + self.assertGreater(self.build.allocated_stock.count(), 0) + + initial_output_count = self.build.build_outputs.filter(is_building=True).count() + self.assertGreater(initial_output_count, 0) + + self.build.cancel_build(None) + self.build.refresh_from_db() + + self.assertEqual(self.build.status, BuildStatus.CANCELLED) + + # Allocations removed (but stock not consumed) by default + self.assertEqual(self.build.allocated_stock.count(), 0) + self.assertIsNone(StockItem.objects.get(pk=self.stock_1_2.pk).consumed_by) + + # Incomplete outputs preserved by default (remove_incomplete_outputs=False) + self.assertEqual( + self.build.build_outputs.filter(is_building=True).count(), + initial_output_count, + ) def test_complete(self): """Test completion of a build output.""" @@ -1107,3 +1123,276 @@ class ExternalBuildTest(InvenTreeAPITestCase): # Filter by 'not external' response = self.get(url, {'external': 'false'}) self.assertEqual(len(response.data), 2) + + +class BuildTaskTests(BuildTestBase): + """Direct unit tests for the background task functions in build/tasks.py. + + These tests call task functions directly (synchronously) to verify the + business logic they encapsulate, independently of the API and offload mechanism. + """ + + def setUp(self): + """Create a stock location available to all task tests.""" + super().setUp() + self.location = StockLocation.objects.create(name='Task Test Location') + + def allocate_stock(self, output, allocations): + """Create BuildItem allocations against self.build for the given output.""" + items_to_create = [] + for item, quantity in allocations.items(): + line = BuildLine.objects.filter( + build=self.build, bom_item__sub_part=item.part + ).first() + items_to_create.append( + BuildItem( + build_line=line, + stock_item=item, + quantity=quantity, + install_into=output, + ) + ) + BuildItem.objects.bulk_create(items_to_create) + + def _setup_complete_build(self): + """Helper: allocate stock fully and complete all outputs so the build is ready to complete.""" + self.stock_1_1.quantity = 1000 + self.stock_1_1.save() + self.stock_2_1.quantity = 30 + self.stock_2_1.save() + + self.build.issue_build() + + # Allocate untracked parts + self.allocate_stock( + None, {self.stock_1_1: 50, self.stock_1_2: 10, self.stock_2_1: 30} + ) + # Allocate tracked parts to each output + self.allocate_stock(self.output_1, {self.stock_3_1: 6}) + self.allocate_stock(self.output_2, {self.stock_3_1: 14}) + + self.build.complete_build_output(self.output_1, None) + self.build.complete_build_output(self.output_2, None) + + # ----------------------------------------------------------------------- + # cancel_build task + # ----------------------------------------------------------------------- + + def test_cancel_task_discards_allocations(self): + """cancel_build with remove_allocated_stock=False: allocations deleted, stock not consumed.""" + self.build.issue_build() + self.allocate_stock(None, {self.stock_1_2: 50}) + self.assertGreater(self.build.allocated_stock.count(), 0) + + build.tasks.cancel_build( + self.build.pk, self.user.pk, remove_allocated_stock=False + ) + + # BuildItem rows gone + self.assertEqual(self.build.allocated_stock.count(), 0) + # Stock item was NOT consumed + self.assertIsNone(StockItem.objects.get(pk=self.stock_1_2.pk).consumed_by) + + def test_cancel_task_consumes_allocations(self): + """cancel_build with remove_allocated_stock=True: stock items are marked consumed.""" + self.build.issue_build() + self.allocate_stock(None, {self.stock_1_2: 50}) + + build.tasks.cancel_build( + self.build.pk, self.user.pk, remove_allocated_stock=True + ) + + # All BuildItem rows gone + self.assertEqual(self.build.allocated_stock.count(), 0) + # The allocated (non-trackable) stock was consumed + self.assertGreater(self.build.consumed_stock.count(), 0) + + def test_cancel_task_removes_incomplete_outputs(self): + """cancel_build with remove_incomplete_outputs=True: in-progress outputs are deleted.""" + self.build.issue_build() + initial_count = self.build.build_outputs.filter(is_building=True).count() + self.assertGreater(initial_count, 0) + + build.tasks.cancel_build( + self.build.pk, self.user.pk, remove_incomplete_outputs=True + ) + + self.assertEqual(self.build.build_outputs.filter(is_building=True).count(), 0) + + def test_cancel_task_preserves_incomplete_outputs(self): + """cancel_build with remove_incomplete_outputs=False: in-progress outputs are kept.""" + self.build.issue_build() + initial_count = self.build.build_outputs.filter(is_building=True).count() + self.assertGreater(initial_count, 0) + + build.tasks.cancel_build( + self.build.pk, self.user.pk, remove_incomplete_outputs=False + ) + + self.assertEqual( + self.build.build_outputs.filter(is_building=True).count(), initial_count + ) + + # ----------------------------------------------------------------------- + # complete_build task + # ----------------------------------------------------------------------- + + @override_settings( + TESTING_TABLE_EVENTS=True, + PLUGIN_TESTING_EVENTS=True, + PLUGIN_TESTING_EVENTS_ASYNC=True, + ) + def test_complete_build_task_triggers_event(self): + """complete_build task fires the BuildEvents.COMPLETED event.""" + from django_q.models import OrmQ + + from build.events import BuildEvents + + set_global_setting('ENABLE_PLUGINS_EVENTS', True) + OrmQ.objects.all().delete() + + self._setup_complete_build() + self.build.complete_build(self.user) + + task = findOffloadedEvent(BuildEvents.COMPLETED, matching_kwargs=['id']) + self.assertIsNotNone(task) + self.assertEqual(task.kwargs()['id'], self.build.pk) + + set_global_setting('ENABLE_PLUGINS_EVENTS', False) + + def test_complete_build_task_trim_stock(self): + """complete_build with trim_allocated_stock=True removes over-allocations before consuming.""" + self.stock_1_2.quantity = 100 + self.stock_1_2.save() + self.stock_2_1.quantity = 30 + self.stock_2_1.save() + + self.build.issue_build() + + # Over-allocate sub_part_1: need 50, allocate 100 + self.allocate_stock(None, {self.stock_1_2: 100, self.stock_2_1: 30}) + self.allocate_stock(self.output_1, {self.stock_3_1: 6}) + self.allocate_stock(self.output_2, {self.stock_3_1: 14}) + + self.assertTrue(self.build.is_overallocated()) + + self.build.complete_build_output(self.output_1, None) + self.build.complete_build_output(self.output_2, None) + self.assertTrue(self.build.can_complete) + + self.build.complete_build(self.user, trim_allocated_stock=True) + self.build.refresh_from_db() + self.assertEqual(self.build.status, BuildStatus.COMPLETE) + + # Only 50 units of sub_part_1 should have been consumed (not 100) + consumed_qty = StockItem.objects.filter( + consumed_by=self.build, part=self.sub_part_1 + ).aggregate(total=Sum('quantity'))['total'] + self.assertEqual(consumed_qty, 50) + + # ----------------------------------------------------------------------- + # delete_build_outputs task + # ----------------------------------------------------------------------- + + def test_delete_build_outputs_skips_missing_id(self): + """delete_build_outputs silently skips nonexistent output IDs and deletes valid ones.""" + from build.tasks import delete_build_outputs + + # Create an output directly to avoid serial-number requirements on the trackable assembly + output = StockItem.objects.create( + part=self.assembly, quantity=3, is_building=True, build=self.build + ) + real_id = output.pk + + # Mix a valid ID with a nonexistent one — must not raise + delete_build_outputs(self.build.pk, [real_id, 99999]) + + self.assertFalse(StockItem.objects.filter(pk=real_id).exists()) + + # ----------------------------------------------------------------------- + # scrap_build_outputs task + # ----------------------------------------------------------------------- + + def test_scrap_build_outputs_discard_allocations(self): + """scrap_build_outputs with discard_allocations=True removes allocations without consuming stock.""" + from build.tasks import scrap_build_outputs + + self.build.issue_build() + # Allocate tracked stock to output_1 + self.allocate_stock(self.output_1, {self.stock_3_1: 6}) + self.assertGreater(self.output_1.items_to_install.count(), 0) + + scrap_build_outputs( + self.build.pk, + [{'output_id': self.output_1.pk, 'quantity': self.output_1.quantity}], + location_id=self.location.pk, + notes='discard test', + discard_allocations=True, + user_id=None, + ) + + self.output_1.refresh_from_db() + self.assertEqual(self.output_1.status, StockStatus.REJECTED.value) + self.assertFalse(self.output_1.is_building) + + # Allocation rows should be gone + self.assertEqual(self.output_1.items_to_install.count(), 0) + # Stock was discarded (not consumed or installed) + self.stock_3_1.refresh_from_db() + self.assertIsNone(self.stock_3_1.consumed_by) + self.assertIsNone(self.stock_3_1.belongs_to) + + def test_scrap_build_outputs_consume_allocations(self): + """scrap_build_outputs with discard_allocations=False (default) consumes/installs stock.""" + from build.tasks import scrap_build_outputs + + self.build.issue_build() + self.allocate_stock(self.output_1, {self.stock_3_1: 6}) + + scrap_build_outputs( + self.build.pk, + [{'output_id': self.output_1.pk, 'quantity': self.output_1.quantity}], + location_id=self.location.pk, + notes='consume test', + discard_allocations=False, + user_id=None, + ) + + self.output_1.refresh_from_db() + self.assertEqual(self.output_1.status, StockStatus.REJECTED.value) + self.assertFalse(self.output_1.is_building) + + # complete_allocation splits stock_3_1 and installs the split piece into output_1 + # (stock_3_1 quantity=1000, only 6 allocated, so a child item is created) + self.assertTrue( + StockItem.objects.filter(belongs_to=self.output_1).exists(), + 'Expected a tracked stock item to be installed into the output', + ) + + # ----------------------------------------------------------------------- + # complete_build_outputs task + # ----------------------------------------------------------------------- + + def test_complete_build_outputs_with_status_none(self): + """complete_build_outputs with status=None falls back to StockStatus.OK in the model.""" + from build.tasks import complete_build_outputs + + self.build.issue_build() + # Create output directly to avoid serial-number requirements on the trackable assembly + output = StockItem.objects.create( + part=self.assembly, quantity=5, is_building=True, build=self.build + ) + + complete_build_outputs( + self.build.pk, + [{'output_id': output.pk}], + location_id=self.location.pk, + status=None, + notes='status none test', + user_id=None, + ) + + output.refresh_from_db() + self.assertFalse(output.is_building) + # status=None should resolve to StockStatus.OK (the model default) + self.assertEqual(output.status, StockStatus.OK.value)