2
0
mirror of https://github.com/inventree/InvenTree.git synced 2025-04-29 12:06:44 +00:00

Improve logic for automatically updating part pricing (#8090)

* Improve logic for automatically updating part pricing

* Simplify logic

* Update unit tests to ensure pricing flows upwards

* Unit test update

* Add unit tests for handling of "multi level" BOM pricing

* ADjust unit tests

* Improve efficiency of operation

* Adjust testing for pricing

- Only allow pricing updates in testing if TESTING_PRICING flag is set

* Tweak when pricing updates are performed

* More tweaks
This commit is contained in:
Oliver 2024-09-11 10:26:38 +10:00 committed by GitHub
parent 9219d8307e
commit c5e3ea537d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 202 additions and 90 deletions

View File

@ -1280,6 +1280,9 @@ PLUGIN_FILE_CHECKED = False # Was the plugin file checked?
# Flag to allow table events during testing # Flag to allow table events during testing
TESTING_TABLE_EVENTS = False TESTING_TABLE_EVENTS = False
# Flag to allow pricing recalculations during testing
TESTING_PRICING = False
# User interface customization values # User interface customization values
CUSTOM_LOGO = get_custom_file( CUSTOM_LOGO = get_custom_file(
'INVENTREE_CUSTOM_LOGO', 'customize.logo', 'custom logo', lookup_media=True 'INVENTREE_CUSTOM_LOGO', 'customize.logo', 'custom logo', lookup_media=True

View File

@ -263,7 +263,7 @@ class InvenTreeAPITestCase(ExchangeRateMixin, UserMixin, APITestCase):
MAX_QUERY_TIME = 7.5 MAX_QUERY_TIME = 7.5
@contextmanager @contextmanager
def assertNumQueriesLessThan(self, value, using='default', verbose=None, url=None): def assertNumQueriesLessThan(self, value, using='default', verbose=False, url=None):
"""Context manager to check that the number of queries is less than a certain value. """Context manager to check that the number of queries is less than a certain value.
Example: Example:
@ -281,7 +281,7 @@ class InvenTreeAPITestCase(ExchangeRateMixin, UserMixin, APITestCase):
f'Query count exceeded at {url}: Expected < {value} queries, got {n}' f'Query count exceeded at {url}: Expected < {value} queries, got {n}'
) # pragma: no cover ) # pragma: no cover
if verbose or n >= value: if verbose and n >= value:
msg = f'\r\n{json.dumps(context.captured_queries, indent=4)}' # pragma: no cover msg = f'\r\n{json.dumps(context.captured_queries, indent=4)}' # pragma: no cover
else: else:
msg = None msg = None

View File

@ -1039,6 +1039,12 @@ class BuildOverallocationTest(BuildAPITest):
outputs = cls.build.build_outputs.all() outputs = cls.build.build_outputs.all()
cls.build.complete_build_output(outputs[0], cls.user) cls.build.complete_build_output(outputs[0], cls.user)
def setUp(self):
"""Basic operation as part of test suite setup"""
super().setUp()
self.generate_exchange_rates()
def test_setup(self): def test_setup(self):
"""Validate expected state after set-up.""" """Validate expected state after set-up."""
self.assertEqual(self.build.incomplete_outputs.count(), 0) self.assertEqual(self.build.incomplete_outputs.count(), 0)
@ -1067,7 +1073,7 @@ class BuildOverallocationTest(BuildAPITest):
'accept_overallocated': 'accept', 'accept_overallocated': 'accept',
}, },
expected_code=201, expected_code=201,
max_query_count=550, # TODO: Come back and refactor this max_query_count=1000, # TODO: Come back and refactor this
) )
self.build.refresh_from_db() self.build.refresh_from_db()
@ -1088,9 +1094,11 @@ class BuildOverallocationTest(BuildAPITest):
'accept_overallocated': 'trim', 'accept_overallocated': 'trim',
}, },
expected_code=201, expected_code=201,
max_query_count=600, # TODO: Come back and refactor this max_query_count=1000, # TODO: Come back and refactor this
) )
# Note: Large number of queries is due to pricing recalculation for each stock item
self.build.refresh_from_db() self.build.refresh_from_db()
# Build should have been marked as complete # Build should have been marked as complete

View File

@ -1049,7 +1049,10 @@ class SupplierPriceBreak(common.models.PriceBreak):
) )
def after_save_supplier_price(sender, instance, created, **kwargs): def after_save_supplier_price(sender, instance, created, **kwargs):
"""Callback function when a SupplierPriceBreak is created or updated.""" """Callback function when a SupplierPriceBreak is created or updated."""
if InvenTree.ready.canAppAccessDatabase() and not InvenTree.ready.isImportingData(): if (
InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING)
and not InvenTree.ready.isImportingData()
):
if instance.part and instance.part.part: if instance.part and instance.part.part:
instance.part.part.schedule_pricing_update(create=True) instance.part.part.schedule_pricing_update(create=True)
@ -1061,6 +1064,9 @@ def after_save_supplier_price(sender, instance, created, **kwargs):
) )
def after_delete_supplier_price(sender, instance, **kwargs): def after_delete_supplier_price(sender, instance, **kwargs):
"""Callback function when a SupplierPriceBreak is deleted.""" """Callback function when a SupplierPriceBreak is deleted."""
if InvenTree.ready.canAppAccessDatabase() and not InvenTree.ready.isImportingData(): if (
InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING)
and not InvenTree.ready.isImportingData()
):
if instance.part and instance.part.part: if instance.part and instance.part.part:
instance.part.part.schedule_pricing_update(create=False) instance.part.part.schedule_pricing_update(create=False)

View File

@ -1949,7 +1949,7 @@ class Part(
return pricing return pricing
def schedule_pricing_update(self, create: bool = False, test: bool = False): def schedule_pricing_update(self, create: bool = False):
"""Helper function to schedule a pricing update. """Helper function to schedule a pricing update.
Importantly, catches any errors which may occur during deletion of related objects, Importantly, catches any errors which may occur during deletion of related objects,
@ -1959,7 +1959,6 @@ class Part(
Arguments: Arguments:
create: Whether or not a new PartPricing object should be created if it does not already exist create: Whether or not a new PartPricing object should be created if it does not already exist
test: Whether or not the pricing update is allowed during unit tests
""" """
try: try:
self.refresh_from_db() self.refresh_from_db()
@ -1970,7 +1969,7 @@ class Part(
pricing = self.pricing pricing = self.pricing
if create or pricing.pk: if create or pricing.pk:
pricing.schedule_for_update(test=test) pricing.schedule_for_update()
except IntegrityError: except IntegrityError:
# If this part instance has been deleted, # If this part instance has been deleted,
# some post-delete or post-save signals may still be fired # some post-delete or post-save signals may still be fired
@ -2550,7 +2549,8 @@ class PartPricing(common.models.MetaMixin):
- Detailed pricing information is very context specific in any case - Detailed pricing information is very context specific in any case
""" """
price_modified = False # When calculating assembly pricing, we limit the depth of the calculation
MAX_PRICING_DEPTH = 50
@property @property
def is_valid(self): def is_valid(self):
@ -2579,14 +2579,10 @@ class PartPricing(common.models.MetaMixin):
return result return result
def schedule_for_update(self, counter: int = 0, test: bool = False): def schedule_for_update(self, counter: int = 0):
"""Schedule this pricing to be updated.""" """Schedule this pricing to be updated."""
import InvenTree.ready import InvenTree.ready
# If we are running within CI, only schedule the update if the test flag is set
if settings.TESTING and not test:
return
# If importing data, skip pricing update # If importing data, skip pricing update
if InvenTree.ready.isImportingData(): if InvenTree.ready.isImportingData():
return return
@ -2630,7 +2626,7 @@ class PartPricing(common.models.MetaMixin):
logger.debug('Pricing for %s already scheduled for update - skipping', p) logger.debug('Pricing for %s already scheduled for update - skipping', p)
return return
if counter > 25: if counter > self.MAX_PRICING_DEPTH:
# Prevent infinite recursion / stack depth issues # Prevent infinite recursion / stack depth issues
logger.debug( logger.debug(
counter, f'Skipping pricing update for {p} - maximum depth exceeded' counter, f'Skipping pricing update for {p} - maximum depth exceeded'
@ -2649,16 +2645,36 @@ class PartPricing(common.models.MetaMixin):
import part.tasks as part_tasks import part.tasks as part_tasks
# Pricing calculations are performed in the background,
# unless the TESTING_PRICING flag is set
background = not settings.TESTING or not settings.TESTING_PRICING
# Offload task to update the pricing # Offload task to update the pricing
# Force async, to prevent running in the foreground # Force async, to prevent running in the foreground (unless in testing mode)
InvenTree.tasks.offload_task( InvenTree.tasks.offload_task(
part_tasks.update_part_pricing, self, counter=counter, force_async=True part_tasks.update_part_pricing,
self,
counter=counter,
force_async=background,
) )
def update_pricing(self, counter: int = 0, cascade: bool = True): def update_pricing(
"""Recalculate all cost data for the referenced Part instance.""" self,
# If importing data, skip pricing update counter: int = 0,
cascade: bool = True,
previous_min=None,
previous_max=None,
):
"""Recalculate all cost data for the referenced Part instance.
Arguments:
counter: Recursion counter (used to prevent infinite recursion)
cascade: If True, update pricing for all assemblies and templates which use this part
previous_min: Previous minimum price (used to prevent further updates if unchanged)
previous_max: Previous maximum price (used to prevent further updates if unchanged)
"""
# If importing data, skip pricing update
if InvenTree.ready.isImportingData(): if InvenTree.ready.isImportingData():
return return
@ -2689,18 +2705,25 @@ class PartPricing(common.models.MetaMixin):
# Background worker processes may try to concurrently update # Background worker processes may try to concurrently update
pass pass
pricing_changed = False
# Without previous pricing data, we assume that the pricing has changed
if previous_min != self.overall_min or previous_max != self.overall_max:
pricing_changed = True
# Update parent assemblies and templates # Update parent assemblies and templates
if cascade and self.price_modified: if pricing_changed and cascade:
self.update_assemblies(counter) self.update_assemblies(counter)
self.update_templates(counter) self.update_templates(counter)
def update_assemblies(self, counter: int = 0): def update_assemblies(self, counter: int = 0):
"""Schedule updates for any assemblies which use this part.""" """Schedule updates for any assemblies which use this part."""
# If the linked Part is used in any assemblies, schedule a pricing update for those assemblies # If the linked Part is used in any assemblies, schedule a pricing update for those assemblies
used_in_parts = self.part.get_used_in() used_in_parts = self.part.get_used_in()
for p in used_in_parts: for p in used_in_parts:
p.pricing.schedule_for_update(counter + 1) p.pricing.schedule_for_update(counter=counter + 1)
def update_templates(self, counter: int = 0): def update_templates(self, counter: int = 0):
"""Schedule updates for any template parts above this part.""" """Schedule updates for any template parts above this part."""
@ -2716,13 +2739,13 @@ class PartPricing(common.models.MetaMixin):
try: try:
self.update_overall_cost() self.update_overall_cost()
except IntegrityError: except Exception:
# If something has happened to the Part model, might throw an error # If something has happened to the Part model, might throw an error
pass pass
try: try:
super().save(*args, **kwargs) super().save(*args, **kwargs)
except IntegrityError: except Exception:
# This error may be thrown if there is already duplicate pricing data # This error may be thrown if there is already duplicate pricing data
pass pass
@ -2790,9 +2813,6 @@ class PartPricing(common.models.MetaMixin):
any_max_elements = True any_max_elements = True
old_bom_cost_min = self.bom_cost_min
old_bom_cost_max = self.bom_cost_max
if any_min_elements: if any_min_elements:
self.bom_cost_min = cumulative_min self.bom_cost_min = cumulative_min
else: else:
@ -2803,12 +2823,6 @@ class PartPricing(common.models.MetaMixin):
else: else:
self.bom_cost_max = None self.bom_cost_max = None
if (
old_bom_cost_min != self.bom_cost_min
or old_bom_cost_max != self.bom_cost_max
):
self.price_modified = True
if save: if save:
self.save() self.save()
@ -2872,12 +2886,6 @@ class PartPricing(common.models.MetaMixin):
if purchase_max is None or cost > purchase_max: if purchase_max is None or cost > purchase_max:
purchase_max = cost purchase_max = cost
if (
self.purchase_cost_min != purchase_min
or self.purchase_cost_max != purchase_max
):
self.price_modified = True
self.purchase_cost_min = purchase_min self.purchase_cost_min = purchase_min
self.purchase_cost_max = purchase_max self.purchase_cost_max = purchase_max
@ -2904,12 +2912,6 @@ class PartPricing(common.models.MetaMixin):
if max_int_cost is None or cost > max_int_cost: if max_int_cost is None or cost > max_int_cost:
max_int_cost = cost max_int_cost = cost
if (
self.internal_cost_min != min_int_cost
or self.internal_cost_max != max_int_cost
):
self.price_modified = True
self.internal_cost_min = min_int_cost self.internal_cost_min = min_int_cost
self.internal_cost_max = max_int_cost self.internal_cost_max = max_int_cost
@ -2945,12 +2947,6 @@ class PartPricing(common.models.MetaMixin):
if max_sup_cost is None or cost > max_sup_cost: if max_sup_cost is None or cost > max_sup_cost:
max_sup_cost = cost max_sup_cost = cost
if (
self.supplier_price_min != min_sup_cost
or self.supplier_price_max != max_sup_cost
):
self.price_modified = True
self.supplier_price_min = min_sup_cost self.supplier_price_min = min_sup_cost
self.supplier_price_max = max_sup_cost self.supplier_price_max = max_sup_cost
@ -2986,9 +2982,6 @@ class PartPricing(common.models.MetaMixin):
if variant_max is None or v_max > variant_max: if variant_max is None or v_max > variant_max:
variant_max = v_max variant_max = v_max
if self.variant_cost_min != variant_min or self.variant_cost_max != variant_max:
self.price_modified = True
self.variant_cost_min = variant_min self.variant_cost_min = variant_min
self.variant_cost_max = variant_max self.variant_cost_max = variant_max
@ -3109,12 +3102,6 @@ class PartPricing(common.models.MetaMixin):
if max_sell_history is None or cost > max_sell_history: if max_sell_history is None or cost > max_sell_history:
max_sell_history = cost max_sell_history = cost
if (
self.sale_history_min != min_sell_history
or self.sale_history_max != max_sell_history
):
self.price_modified = True
self.sale_history_min = min_sell_history self.sale_history_min = min_sell_history
self.sale_history_max = max_sell_history self.sale_history_max = max_sell_history
@ -4525,7 +4512,10 @@ def update_bom_build_lines(sender, instance, created, **kwargs):
def update_pricing_after_edit(sender, instance, created, **kwargs): def update_pricing_after_edit(sender, instance, created, **kwargs):
"""Callback function when a part price break is created or updated.""" """Callback function when a part price break is created or updated."""
# Update part pricing *unless* we are importing data # Update part pricing *unless* we are importing data
if InvenTree.ready.canAppAccessDatabase() and not InvenTree.ready.isImportingData(): if (
InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING)
and not InvenTree.ready.isImportingData()
):
if instance.part: if instance.part:
instance.part.schedule_pricing_update(create=True) instance.part.schedule_pricing_update(create=True)
@ -4542,7 +4532,10 @@ def update_pricing_after_edit(sender, instance, created, **kwargs):
def update_pricing_after_delete(sender, instance, **kwargs): def update_pricing_after_delete(sender, instance, **kwargs):
"""Callback function when a part price break is deleted.""" """Callback function when a part price break is deleted."""
# Update part pricing *unless* we are importing data # Update part pricing *unless* we are importing data
if InvenTree.ready.canAppAccessDatabase() and not InvenTree.ready.isImportingData(): if (
InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING)
and not InvenTree.ready.isImportingData()
):
if instance.part: if instance.part:
instance.part.schedule_pricing_update(create=False) instance.part.schedule_pricing_update(create=False)

View File

@ -73,7 +73,11 @@ def update_part_pricing(pricing: part.models.PartPricing, counter: int = 0):
""" """
logger.info('Updating part pricing for %s', pricing.part) logger.info('Updating part pricing for %s', pricing.part)
pricing.update_pricing(counter=counter) pricing.update_pricing(
counter=counter,
previous_min=pricing.overall_min,
previous_max=pricing.overall_max,
)
@scheduled_task(ScheduledTask.DAILY) @scheduled_task(ScheduledTask.DAILY)

View File

@ -1,6 +1,7 @@
"""Unit tests for Part pricing calculations.""" """Unit tests for Part pricing calculations."""
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from django.test.utils import override_settings
from djmoney.contrib.exchange.models import convert_money from djmoney.contrib.exchange.models import convert_money
from djmoney.money import Money from djmoney.money import Money
@ -88,6 +89,7 @@ class PartPricingTests(InvenTreeTestCase):
part=self.sp_2, quantity=10, price=4.55, price_currency='GBP' part=self.sp_2, quantity=10, price=4.55, price_currency='GBP'
) )
@override_settings(TESTING_PRICING=True)
def test_pricing_data(self): def test_pricing_data(self):
"""Test link between Part and PartPricing model.""" """Test link between Part and PartPricing model."""
# Initially there is no associated Pricing data # Initially there is no associated Pricing data
@ -112,6 +114,7 @@ class PartPricingTests(InvenTreeTestCase):
def test_invalid_rate(self): def test_invalid_rate(self):
"""Ensure that conversion behaves properly with missing rates.""" """Ensure that conversion behaves properly with missing rates."""
@override_settings(TESTING_PRICING=True)
def test_simple(self): def test_simple(self):
"""Tests for hard-coded values.""" """Tests for hard-coded values."""
pricing = self.part.pricing pricing = self.part.pricing
@ -143,6 +146,7 @@ class PartPricingTests(InvenTreeTestCase):
self.assertEqual(pricing.overall_min, Money('0.111111', 'USD')) self.assertEqual(pricing.overall_min, Money('0.111111', 'USD'))
self.assertEqual(pricing.overall_max, Money('25', 'USD')) self.assertEqual(pricing.overall_max, Money('25', 'USD'))
@override_settings(TESTING_PRICING=True)
def test_supplier_part_pricing(self): def test_supplier_part_pricing(self):
"""Test for supplier part pricing.""" """Test for supplier part pricing."""
pricing = self.part.pricing pricing = self.part.pricing
@ -156,19 +160,22 @@ class PartPricingTests(InvenTreeTestCase):
# Creating price breaks will cause the pricing to be updated # Creating price breaks will cause the pricing to be updated
self.create_price_breaks() self.create_price_breaks()
pricing.update_pricing() pricing = self.part.pricing
pricing.refresh_from_db()
self.assertAlmostEqual(float(pricing.overall_min.amount), 2.015, places=2) self.assertAlmostEqual(float(pricing.overall_min.amount), 2.015, places=2)
self.assertAlmostEqual(float(pricing.overall_max.amount), 3.06, places=2) self.assertAlmostEqual(float(pricing.overall_max.amount), 3.06, places=2)
# Delete all supplier parts and re-calculate # Delete all supplier parts and re-calculate
self.part.supplier_parts.all().delete() self.part.supplier_parts.all().delete()
pricing.update_pricing()
pricing = self.part.pricing
pricing.refresh_from_db() pricing.refresh_from_db()
self.assertIsNone(pricing.supplier_price_min) self.assertIsNone(pricing.supplier_price_min)
self.assertIsNone(pricing.supplier_price_max) self.assertIsNone(pricing.supplier_price_max)
@override_settings(TESTING_PRICING=True)
def test_internal_pricing(self): def test_internal_pricing(self):
"""Tests for internal price breaks.""" """Tests for internal price breaks."""
# Ensure internal pricing is enabled # Ensure internal pricing is enabled
@ -188,7 +195,8 @@ class PartPricingTests(InvenTreeTestCase):
part=self.part, quantity=ii + 1, price=10 - ii, price_currency=currency part=self.part, quantity=ii + 1, price=10 - ii, price_currency=currency
) )
pricing.update_internal_cost() pricing = self.part.pricing
pricing.refresh_from_db()
# Expected money value # Expected money value
m_expected = Money(10 - ii, currency) m_expected = Money(10 - ii, currency)
@ -201,6 +209,7 @@ class PartPricingTests(InvenTreeTestCase):
self.assertEqual(pricing.internal_cost_max, Money(10, currency)) self.assertEqual(pricing.internal_cost_max, Money(10, currency))
self.assertEqual(pricing.overall_max, Money(10, currency)) self.assertEqual(pricing.overall_max, Money(10, currency))
@override_settings(TESTING_PRICING=True)
def test_stock_item_pricing(self): def test_stock_item_pricing(self):
"""Test for stock item pricing data.""" """Test for stock item pricing data."""
# Create a part # Create a part
@ -243,6 +252,7 @@ class PartPricingTests(InvenTreeTestCase):
self.assertEqual(pricing.overall_min, Money(1.176471, 'USD')) self.assertEqual(pricing.overall_min, Money(1.176471, 'USD'))
self.assertEqual(pricing.overall_max, Money(6.666667, 'USD')) self.assertEqual(pricing.overall_max, Money(6.666667, 'USD'))
@override_settings(TESTING_PRICING=True)
def test_bom_pricing(self): def test_bom_pricing(self):
"""Unit test for BOM pricing calculations.""" """Unit test for BOM pricing calculations."""
pricing = self.part.pricing pricing = self.part.pricing
@ -252,7 +262,8 @@ class PartPricingTests(InvenTreeTestCase):
currency = 'AUD' currency = 'AUD'
for ii in range(10): # Create pricing out of order, to ensure min/max values are calculated correctly
for ii in range(5):
# Create a new part for the BOM # Create a new part for the BOM
sub_part = part.models.Part.objects.create( sub_part = part.models.Part.objects.create(
name=f'Sub Part {ii}', name=f'Sub Part {ii}',
@ -273,15 +284,21 @@ class PartPricingTests(InvenTreeTestCase):
part=self.part, sub_part=sub_part, quantity=5 part=self.part, sub_part=sub_part, quantity=5
) )
pricing.update_bom_cost()
# Check that the values have been updated correctly # Check that the values have been updated correctly
self.assertEqual(pricing.currency, 'USD') self.assertEqual(pricing.currency, 'USD')
# Final overall pricing checks # Price range should have been automatically updated
self.assertEqual(pricing.overall_min, Money('366.666665', 'USD')) self.part.refresh_from_db()
self.assertEqual(pricing.overall_max, Money('550', 'USD')) pricing = self.part.pricing
expected_min = 100
expected_max = 150
# Final overall pricing checks
self.assertEqual(pricing.overall_min, Money(expected_min, 'USD'))
self.assertEqual(pricing.overall_max, Money(expected_max, 'USD'))
@override_settings(TESTING_PRICING=True)
def test_purchase_pricing(self): def test_purchase_pricing(self):
"""Unit tests for historical purchase pricing.""" """Unit tests for historical purchase pricing."""
self.create_price_breaks() self.create_price_breaks()
@ -349,6 +366,7 @@ class PartPricingTests(InvenTreeTestCase):
# Max cost in USD # Max cost in USD
self.assertAlmostEqual(float(pricing.purchase_cost_max.amount), 6.95, places=2) self.assertAlmostEqual(float(pricing.purchase_cost_max.amount), 6.95, places=2)
@override_settings(TESTING_PRICING=True)
def test_delete_with_pricing(self): def test_delete_with_pricing(self):
"""Test for deleting a part which has pricing information.""" """Test for deleting a part which has pricing information."""
# Create some pricing data # Create some pricing data
@ -373,6 +391,7 @@ class PartPricingTests(InvenTreeTestCase):
with self.assertRaises(part.models.PartPricing.DoesNotExist): with self.assertRaises(part.models.PartPricing.DoesNotExist):
pricing.refresh_from_db() pricing.refresh_from_db()
@override_settings(TESTING_PRICING=True)
def test_delete_without_pricing(self): def test_delete_without_pricing(self):
"""Test that we can delete a part which does not have pricing information.""" """Test that we can delete a part which does not have pricing information."""
pricing = self.part.pricing pricing = self.part.pricing
@ -388,6 +407,7 @@ class PartPricingTests(InvenTreeTestCase):
with self.assertRaises(part.models.Part.DoesNotExist): with self.assertRaises(part.models.Part.DoesNotExist):
self.part.refresh_from_db() self.part.refresh_from_db()
@override_settings(TESTING_PRICING=True)
def test_check_missing_pricing(self): def test_check_missing_pricing(self):
"""Tests for check_missing_pricing background task. """Tests for check_missing_pricing background task.
@ -411,6 +431,7 @@ class PartPricingTests(InvenTreeTestCase):
# Check that PartPricing objects have been created # Check that PartPricing objects have been created
self.assertEqual(part.models.PartPricing.objects.count(), 101) self.assertEqual(part.models.PartPricing.objects.count(), 101)
@override_settings(TESTING_PRICING=True)
def test_delete_part_with_stock_items(self): def test_delete_part_with_stock_items(self):
"""Test deleting a part instance with stock items. """Test deleting a part instance with stock items.
@ -431,7 +452,7 @@ class PartPricingTests(InvenTreeTestCase):
) )
# Manually schedule a pricing update (does not happen automatically in testing) # Manually schedule a pricing update (does not happen automatically in testing)
p.schedule_pricing_update(create=True, test=True) p.schedule_pricing_update(create=True)
# Check that a PartPricing object exists # Check that a PartPricing object exists
self.assertTrue(part.models.PartPricing.objects.filter(part=p).exists()) self.assertTrue(part.models.PartPricing.objects.filter(part=p).exists())
@ -443,5 +464,84 @@ class PartPricingTests(InvenTreeTestCase):
self.assertFalse(part.models.PartPricing.objects.filter(part=p).exists()) self.assertFalse(part.models.PartPricing.objects.filter(part=p).exists())
# Try to update pricing (should fail gracefully as the Part has been deleted) # Try to update pricing (should fail gracefully as the Part has been deleted)
p.schedule_pricing_update(create=False, test=True) p.schedule_pricing_update(create=False)
self.assertFalse(part.models.PartPricing.objects.filter(part=p).exists()) self.assertFalse(part.models.PartPricing.objects.filter(part=p).exists())
@override_settings(TESTING_PRICING=True)
def test_multi_level_bom(self):
"""Test that pricing for multi-level BOMs is calculated correctly."""
# Create some parts
A1 = part.models.Part.objects.create(
name='A1', description='A1', assembly=True, component=True
)
B1 = part.models.Part.objects.create(
name='B1', description='B1', assembly=True, component=True
)
C1 = part.models.Part.objects.create(
name='C1', description='C1', assembly=True, component=True
)
D1 = part.models.Part.objects.create(
name='D1', description='D1', assembly=True, component=True
)
D2 = part.models.Part.objects.create(
name='D2', description='D2', assembly=True, component=True
)
D3 = part.models.Part.objects.create(
name='D3', description='D3', assembly=True, component=True
)
# BOM Items
part.models.BomItem.objects.create(part=A1, sub_part=B1, quantity=10)
part.models.BomItem.objects.create(part=B1, sub_part=C1, quantity=2)
part.models.BomItem.objects.create(part=C1, sub_part=D1, quantity=3)
part.models.BomItem.objects.create(part=C1, sub_part=D2, quantity=4)
part.models.BomItem.objects.create(part=C1, sub_part=D3, quantity=5)
# Pricing data (only for low-level D parts)
P1 = D1.pricing
P1.override_min = 4.50
P1.override_max = 5.50
P1.save()
P1.update_pricing()
P2 = D2.pricing
P2.override_min = 6.50
P2.override_max = 7.50
P2.save()
P2.update_pricing()
P3 = D3.pricing
P3.override_min = 8.50
P3.override_max = 9.50
P3.save()
P3.update_pricing()
# Simple checks for low-level BOM items
self.assertEqual(D1.pricing.overall_min, Money(4.50, 'USD'))
self.assertEqual(D1.pricing.overall_max, Money(5.50, 'USD'))
self.assertEqual(D2.pricing.overall_min, Money(6.50, 'USD'))
self.assertEqual(D2.pricing.overall_max, Money(7.50, 'USD'))
self.assertEqual(D3.pricing.overall_min, Money(8.50, 'USD'))
self.assertEqual(D3.pricing.overall_max, Money(9.50, 'USD'))
# Calculate pricing for "C" level part
c_min = 3 * 4.50 + 4 * 6.50 + 5 * 8.50
c_max = 3 * 5.50 + 4 * 7.50 + 5 * 9.50
self.assertEqual(C1.pricing.overall_min, Money(c_min, 'USD'))
self.assertEqual(C1.pricing.overall_max, Money(c_max, 'USD'))
# Calculate pricing for "A" and "B" level parts
b_min = 2 * c_min
b_max = 2 * c_max
a_min = 10 * b_min
a_max = 10 * b_max
self.assertEqual(B1.pricing.overall_min, Money(b_min, 'USD'))
self.assertEqual(B1.pricing.overall_max, Money(b_max, 'USD'))
self.assertEqual(A1.pricing.overall_min, Money(a_min, 'USD'))
self.assertEqual(A1.pricing.overall_max, Money(a_max, 'USD'))

View File

@ -2278,14 +2278,16 @@ def after_delete_stock_item(sender, instance: StockItem, **kwargs):
"""Function to be executed after a StockItem object is deleted.""" """Function to be executed after a StockItem object is deleted."""
from part import tasks as part_tasks from part import tasks as part_tasks
if not InvenTree.ready.isImportingData() and InvenTree.ready.canAppAccessDatabase( if InvenTree.ready.isImportingData():
allow_test=True return
):
if InvenTree.ready.canAppAccessDatabase(allow_test=True):
# Run this check in the background # Run this check in the background
InvenTree.tasks.offload_task( InvenTree.tasks.offload_task(
part_tasks.notify_low_stock_if_required, instance.part part_tasks.notify_low_stock_if_required, instance.part
) )
if InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING):
# Schedule an update on parent part pricing # Schedule an update on parent part pricing
if instance.part: if instance.part:
instance.part.schedule_pricing_update(create=False) instance.part.schedule_pricing_update(create=False)
@ -2296,19 +2298,15 @@ def after_save_stock_item(sender, instance: StockItem, created, **kwargs):
"""Hook function to be executed after StockItem object is saved/updated.""" """Hook function to be executed after StockItem object is saved/updated."""
from part import tasks as part_tasks from part import tasks as part_tasks
if ( if created and not InvenTree.ready.isImportingData():
created if InvenTree.ready.canAppAccessDatabase(allow_test=True):
and not InvenTree.ready.isImportingData() InvenTree.tasks.offload_task(
and InvenTree.ready.canAppAccessDatabase(allow_test=True) part_tasks.notify_low_stock_if_required, instance.part
): )
# Run this check in the background
InvenTree.tasks.offload_task(
part_tasks.notify_low_stock_if_required, instance.part
)
# Schedule an update on parent part pricing if InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING):
if instance.part: if instance.part:
instance.part.schedule_pricing_update(create=True) instance.part.schedule_pricing_update(create=True)
class StockItemTracking(InvenTree.models.InvenTreeModel): class StockItemTracking(InvenTree.models.InvenTreeModel):