mirror of
https://github.com/inventree/InvenTree.git
synced 2025-04-30 20:46:47 +00:00
Task group (#8332)
* Add 'group' to offload_task - Make use of 'group' field in AsyncTask model - Allows better db filtering * Log error if low_stock check cannot be performed * Ensure low-stock checks are performed by the background worker * Change encoding of arguments to 'notify_low_stock_if_required' - Pass part ID, not part instance - Safer, but requires DB hit * Fix typo * Fix to allow tests to run
This commit is contained in:
parent
ba3bac10a7
commit
fb17dcce9a
@ -86,4 +86,5 @@ def send_email(subject, body, recipients, from_email=None, html_message=None):
|
||||
recipients,
|
||||
fail_silently=False,
|
||||
html_message=html_message,
|
||||
group='notification',
|
||||
)
|
||||
|
@ -174,6 +174,9 @@ def offload_task(
|
||||
"""
|
||||
from InvenTree.exceptions import log_error
|
||||
|
||||
# Extract group information from kwargs
|
||||
group = kwargs.pop('group', 'inventree')
|
||||
|
||||
try:
|
||||
import importlib
|
||||
|
||||
@ -200,7 +203,7 @@ def offload_task(
|
||||
if force_async or (is_worker_running() and not force_sync):
|
||||
# Running as asynchronous task
|
||||
try:
|
||||
task = AsyncTask(taskname, *args, **kwargs)
|
||||
task = AsyncTask(taskname, *args, group=group, **kwargs)
|
||||
task.run()
|
||||
except ImportError:
|
||||
raise_warning(f"WARNING: '{taskname}' not offloaded - Function not found")
|
||||
|
@ -645,7 +645,8 @@ class Build(
|
||||
if not InvenTree.tasks.offload_task(
|
||||
build.tasks.complete_build_allocations,
|
||||
self.pk,
|
||||
user.pk if user else None
|
||||
user.pk if user else None,
|
||||
group='build'
|
||||
):
|
||||
raise ValidationError(_("Failed to offload task to complete build allocations"))
|
||||
|
||||
@ -772,7 +773,8 @@ class Build(
|
||||
if not InvenTree.tasks.offload_task(
|
||||
build.tasks.complete_build_allocations,
|
||||
self.pk,
|
||||
user.pk if user else None
|
||||
user.pk if user else None,
|
||||
group='build',
|
||||
):
|
||||
raise ValidationError(_("Failed to offload task to complete build allocations"))
|
||||
|
||||
@ -1441,7 +1443,11 @@ def after_save_build(sender, instance: Build, created: bool, **kwargs):
|
||||
instance.create_build_line_items()
|
||||
|
||||
# Run checks on required parts
|
||||
InvenTree.tasks.offload_task(build_tasks.check_build_stock, instance)
|
||||
InvenTree.tasks.offload_task(
|
||||
build_tasks.check_build_stock,
|
||||
instance,
|
||||
group='build'
|
||||
)
|
||||
|
||||
# Notify the responsible users that the build order has been created
|
||||
InvenTree.helpers_model.notify_responsible(instance, sender, exclude=instance.issued_by)
|
||||
|
@ -191,6 +191,7 @@ class BuildSerializer(NotesFieldMixin, DataImportExportSerializerMixin, InvenTre
|
||||
InvenTree.tasks.offload_task(
|
||||
build.tasks.create_child_builds,
|
||||
build_order.pk,
|
||||
group='build',
|
||||
)
|
||||
|
||||
return build_order
|
||||
@ -1134,7 +1135,8 @@ class BuildAutoAllocationSerializer(serializers.Serializer):
|
||||
exclude_location=data.get('exclude_location', None),
|
||||
interchangeable=data['interchangeable'],
|
||||
substitutes=data['substitutes'],
|
||||
optional_items=data['optional_items']
|
||||
optional_items=data['optional_items'],
|
||||
group='build'
|
||||
):
|
||||
raise ValidationError(_("Failed to start auto-allocation task"))
|
||||
|
||||
|
@ -216,7 +216,8 @@ def create_child_builds(build_id: int) -> None:
|
||||
# Offload the child build order creation to the background task queue
|
||||
InvenTree.tasks.offload_task(
|
||||
create_child_builds,
|
||||
sub_order.pk
|
||||
sub_order.pk,
|
||||
group='build'
|
||||
)
|
||||
|
||||
|
||||
|
@ -113,7 +113,9 @@ def after_change_currency(setting) -> None:
|
||||
InvenTree.tasks.update_exchange_rates(force=True)
|
||||
|
||||
# Offload update of part prices to a background task
|
||||
InvenTree.tasks.offload_task(part_tasks.check_missing_pricing, force_async=True)
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.check_missing_pricing, force_async=True, group='pricing'
|
||||
)
|
||||
|
||||
|
||||
def validate_currency_codes(value):
|
||||
|
@ -1919,6 +1919,7 @@ class SalesOrderShipment(
|
||||
order.tasks.complete_sales_order_shipment,
|
||||
shipment_id=self.pk,
|
||||
user_id=user.pk if user else None,
|
||||
group='sales_order',
|
||||
)
|
||||
|
||||
trigger_event('salesordershipment.completed', id=self.pk)
|
||||
|
@ -2550,7 +2550,7 @@ class Part(
|
||||
@receiver(post_save, sender=Part, dispatch_uid='part_post_save_log')
|
||||
def after_save_part(sender, instance: Part, created, **kwargs):
|
||||
"""Function to be executed after a Part is saved."""
|
||||
from pickle import PicklingError
|
||||
from django.conf import settings
|
||||
|
||||
from part import tasks as part_tasks
|
||||
|
||||
@ -2558,17 +2558,19 @@ def after_save_part(sender, instance: Part, created, **kwargs):
|
||||
# Check part stock only if we are *updating* the part (not creating it)
|
||||
|
||||
# Run this check in the background
|
||||
try:
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.notify_low_stock_if_required, instance
|
||||
)
|
||||
except PicklingError:
|
||||
# Can sometimes occur if the referenced Part has issues
|
||||
pass
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.notify_low_stock_if_required,
|
||||
instance.pk,
|
||||
group='notification',
|
||||
force_async=not settings.TESTING, # Force async unless in testing mode
|
||||
)
|
||||
|
||||
# Schedule a background task to rebuild any supplier parts
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.rebuild_supplier_parts, instance.pk, force_async=True
|
||||
part_tasks.rebuild_supplier_parts,
|
||||
instance.pk,
|
||||
force_async=True,
|
||||
group='part',
|
||||
)
|
||||
|
||||
|
||||
@ -2705,6 +2707,7 @@ class PartPricing(common.models.MetaMixin):
|
||||
self,
|
||||
counter=counter,
|
||||
force_async=background,
|
||||
group='pricing',
|
||||
)
|
||||
|
||||
def update_pricing(
|
||||
@ -3856,7 +3859,10 @@ def post_save_part_parameter_template(sender, instance, created, **kwargs):
|
||||
if not created:
|
||||
# Schedule a background task to rebuild the parameters against this template
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.rebuild_parameters, instance.pk, force_async=True
|
||||
part_tasks.rebuild_parameters,
|
||||
instance.pk,
|
||||
force_async=True,
|
||||
group='part',
|
||||
)
|
||||
|
||||
|
||||
@ -4548,7 +4554,9 @@ def update_bom_build_lines(sender, instance, created, **kwargs):
|
||||
if InvenTree.ready.canAppAccessDatabase() and not InvenTree.ready.isImportingData():
|
||||
import build.tasks
|
||||
|
||||
InvenTree.tasks.offload_task(build.tasks.update_build_order_lines, instance.pk)
|
||||
InvenTree.tasks.offload_task(
|
||||
build.tasks.update_build_order_lines, instance.pk, group='build'
|
||||
)
|
||||
|
||||
|
||||
@receiver(post_save, sender=BomItem, dispatch_uid='post_save_bom_item')
|
||||
|
@ -1316,6 +1316,7 @@ class PartStocktakeReportGenerateSerializer(serializers.Serializer):
|
||||
exclude_external=data.get('exclude_external', True),
|
||||
generate_report=data.get('generate_report', True),
|
||||
update_parts=data.get('update_parts', True),
|
||||
group='report',
|
||||
)
|
||||
|
||||
|
||||
|
@ -14,7 +14,7 @@ import company.models
|
||||
import InvenTree.helpers
|
||||
import InvenTree.helpers_model
|
||||
import InvenTree.tasks
|
||||
import part.models
|
||||
import part.models as part_models
|
||||
import part.stocktake
|
||||
from common.settings import get_global_setting
|
||||
from InvenTree.tasks import (
|
||||
@ -27,7 +27,7 @@ from InvenTree.tasks import (
|
||||
logger = logging.getLogger('inventree')
|
||||
|
||||
|
||||
def notify_low_stock(part: part.models.Part):
|
||||
def notify_low_stock(part: part_models.Part):
|
||||
"""Notify interested users that a part is 'low stock'.
|
||||
|
||||
Rules:
|
||||
@ -51,20 +51,28 @@ def notify_low_stock(part: part.models.Part):
|
||||
)
|
||||
|
||||
|
||||
def notify_low_stock_if_required(part: part.models.Part):
|
||||
def notify_low_stock_if_required(part_id: int):
|
||||
"""Check if the stock quantity has fallen below the minimum threshold of part.
|
||||
|
||||
If true, notify the users who have subscribed to the part
|
||||
"""
|
||||
try:
|
||||
part = part_models.Part.objects.get(pk=part_id)
|
||||
except part_models.Part.DoesNotExist:
|
||||
logger.warning(
|
||||
'notify_low_stock_if_required: Part with ID %s does not exist', part_id
|
||||
)
|
||||
return
|
||||
|
||||
# Run "up" the tree, to allow notification for "parent" parts
|
||||
parts = part.get_ancestors(include_self=True, ascending=True)
|
||||
|
||||
for p in parts:
|
||||
if p.is_part_low_on_stock():
|
||||
InvenTree.tasks.offload_task(notify_low_stock, p)
|
||||
InvenTree.tasks.offload_task(notify_low_stock, p, group='notification')
|
||||
|
||||
|
||||
def update_part_pricing(pricing: part.models.PartPricing, counter: int = 0):
|
||||
def update_part_pricing(pricing: part_models.PartPricing, counter: int = 0):
|
||||
"""Update cached pricing data for the specified PartPricing instance.
|
||||
|
||||
Arguments:
|
||||
@ -93,7 +101,7 @@ def check_missing_pricing(limit=250):
|
||||
limit: Maximum number of parts to process at once
|
||||
"""
|
||||
# Find parts for which pricing information has never been updated
|
||||
results = part.models.PartPricing.objects.filter(updated=None)[:limit]
|
||||
results = part_models.PartPricing.objects.filter(updated=None)[:limit]
|
||||
|
||||
if results.count() > 0:
|
||||
logger.info('Found %s parts with empty pricing', results.count())
|
||||
@ -105,7 +113,7 @@ def check_missing_pricing(limit=250):
|
||||
days = int(get_global_setting('PRICING_UPDATE_DAYS', 30))
|
||||
stale_date = datetime.now().date() - timedelta(days=days)
|
||||
|
||||
results = part.models.PartPricing.objects.filter(updated__lte=stale_date)[:limit]
|
||||
results = part_models.PartPricing.objects.filter(updated__lte=stale_date)[:limit]
|
||||
|
||||
if results.count() > 0:
|
||||
logger.info('Found %s stale pricing entries', results.count())
|
||||
@ -115,7 +123,7 @@ def check_missing_pricing(limit=250):
|
||||
|
||||
# Find any pricing data which is in the wrong currency
|
||||
currency = common.currency.currency_code_default()
|
||||
results = part.models.PartPricing.objects.exclude(currency=currency)
|
||||
results = part_models.PartPricing.objects.exclude(currency=currency)
|
||||
|
||||
if results.count() > 0:
|
||||
logger.info('Found %s pricing entries in the wrong currency', results.count())
|
||||
@ -124,7 +132,7 @@ def check_missing_pricing(limit=250):
|
||||
pp.schedule_for_update()
|
||||
|
||||
# Find any parts which do not have pricing information
|
||||
results = part.models.Part.objects.filter(pricing_data=None)[:limit]
|
||||
results = part_models.Part.objects.filter(pricing_data=None)[:limit]
|
||||
|
||||
if results.count() > 0:
|
||||
logger.info('Found %s parts without pricing', results.count())
|
||||
@ -152,7 +160,7 @@ def scheduled_stocktake_reports():
|
||||
get_global_setting('STOCKTAKE_DELETE_REPORT_DAYS', 30, cache=False)
|
||||
)
|
||||
threshold = datetime.now() - timedelta(days=delete_n_days)
|
||||
old_reports = part.models.PartStocktakeReport.objects.filter(date__lt=threshold)
|
||||
old_reports = part_models.PartStocktakeReport.objects.filter(date__lt=threshold)
|
||||
|
||||
if old_reports.count() > 0:
|
||||
logger.info('Deleting %s stale stocktake reports', old_reports.count())
|
||||
@ -187,11 +195,11 @@ def rebuild_parameters(template_id):
|
||||
which may cause the base unit to be adjusted.
|
||||
"""
|
||||
try:
|
||||
template = part.models.PartParameterTemplate.objects.get(pk=template_id)
|
||||
except part.models.PartParameterTemplate.DoesNotExist:
|
||||
template = part_models.PartParameterTemplate.objects.get(pk=template_id)
|
||||
except part_models.PartParameterTemplate.DoesNotExist:
|
||||
return
|
||||
|
||||
parameters = part.models.PartParameter.objects.filter(template=template)
|
||||
parameters = part_models.PartParameter.objects.filter(template=template)
|
||||
|
||||
n = 0
|
||||
|
||||
@ -216,8 +224,8 @@ def rebuild_supplier_parts(part_id):
|
||||
which may cause the native units of any supplier parts to be updated
|
||||
"""
|
||||
try:
|
||||
prt = part.models.Part.objects.get(pk=part_id)
|
||||
except part.models.Part.DoesNotExist:
|
||||
prt = part_models.Part.objects.get(pk=part_id)
|
||||
except part_models.Part.DoesNotExist:
|
||||
return
|
||||
|
||||
supplier_parts = company.models.SupplierPart.objects.filter(part=prt)
|
||||
|
@ -40,7 +40,7 @@ def trigger_event(event, *args, **kwargs):
|
||||
if 'force_async' not in kwargs and not settings.PLUGIN_TESTING_EVENTS:
|
||||
kwargs['force_async'] = True
|
||||
|
||||
offload_task(register_event, event, *args, **kwargs)
|
||||
offload_task(register_event, event, *args, group='plugin', **kwargs)
|
||||
|
||||
|
||||
def register_event(event, *args, **kwargs):
|
||||
@ -77,7 +77,9 @@ def register_event(event, *args, **kwargs):
|
||||
kwargs['force_async'] = True
|
||||
|
||||
# Offload a separate task for each plugin
|
||||
offload_task(process_event, slug, event, *args, **kwargs)
|
||||
offload_task(
|
||||
process_event, slug, event, *args, group='plugin', **kwargs
|
||||
)
|
||||
|
||||
|
||||
def process_event(plugin_slug, event, *args, **kwargs):
|
||||
|
@ -185,7 +185,12 @@ class LabelPrintingMixin:
|
||||
# Exclude the 'context' object - cannot be pickled
|
||||
print_args.pop('context', None)
|
||||
|
||||
offload_task(plugin_label.print_label, self.plugin_slug(), **print_args)
|
||||
offload_task(
|
||||
plugin_label.print_label,
|
||||
self.plugin_slug(),
|
||||
group='plugin',
|
||||
**print_args,
|
||||
)
|
||||
|
||||
# Update the progress of the print job
|
||||
output.progress += int(100 / N)
|
||||
|
@ -59,7 +59,11 @@ class LocatePluginView(GenericAPIView):
|
||||
StockItem.objects.get(pk=item_pk)
|
||||
|
||||
offload_task(
|
||||
registry.call_plugin_function, plugin, 'locate_stock_item', item_pk
|
||||
registry.call_plugin_function,
|
||||
plugin,
|
||||
'locate_stock_item',
|
||||
item_pk,
|
||||
group='plugin',
|
||||
)
|
||||
|
||||
data['item'] = item_pk
|
||||
@ -78,6 +82,7 @@ class LocatePluginView(GenericAPIView):
|
||||
plugin,
|
||||
'locate_stock_location',
|
||||
location_pk,
|
||||
group='plugin',
|
||||
)
|
||||
|
||||
data['location'] = location_pk
|
||||
|
@ -95,7 +95,9 @@ class InvenTreeLabelPlugin(LabelPrintingMixin, InvenTreePlugin):
|
||||
if driver.USE_BACKGROUND_WORKER is False:
|
||||
return driver.print_labels(machine, label, items, **print_kwargs)
|
||||
|
||||
offload_task(driver.print_labels, machine, label, items, **print_kwargs)
|
||||
offload_task(
|
||||
driver.print_labels, machine, label, items, group='plugin', **print_kwargs
|
||||
)
|
||||
|
||||
return JsonResponse({
|
||||
'success': True,
|
||||
|
@ -236,7 +236,9 @@ class PluginConfig(InvenTree.models.MetadataMixin, models.Model):
|
||||
|
||||
if active:
|
||||
offload_task(check_for_migrations)
|
||||
offload_task(plugin.staticfiles.copy_plugin_static_files, self.key)
|
||||
offload_task(
|
||||
plugin.staticfiles.copy_plugin_static_files, self.key, group='plugin'
|
||||
)
|
||||
|
||||
|
||||
class PluginSetting(common.models.BaseInvenTreeSetting):
|
||||
|
@ -1680,7 +1680,7 @@ class StockItem(
|
||||
|
||||
# Rebuild the stock tree
|
||||
InvenTree.tasks.offload_task(
|
||||
stock.tasks.rebuild_stock_item_tree, tree_id=self.tree_id
|
||||
stock.tasks.rebuild_stock_item_tree, tree_id=self.tree_id, group='part'
|
||||
)
|
||||
|
||||
@transaction.atomic
|
||||
@ -1915,7 +1915,7 @@ class StockItem(
|
||||
# Rebuild stock trees as required
|
||||
for tree_id in tree_ids:
|
||||
InvenTree.tasks.offload_task(
|
||||
stock.tasks.rebuild_stock_item_tree, tree_id=tree_id
|
||||
stock.tasks.rebuild_stock_item_tree, tree_id=tree_id, group='stock'
|
||||
)
|
||||
|
||||
@transaction.atomic
|
||||
@ -2012,7 +2012,7 @@ class StockItem(
|
||||
|
||||
# Rebuild the tree for this parent item
|
||||
InvenTree.tasks.offload_task(
|
||||
stock.tasks.rebuild_stock_item_tree, tree_id=self.tree_id
|
||||
stock.tasks.rebuild_stock_item_tree, tree_id=self.tree_id, group='stock'
|
||||
)
|
||||
|
||||
# Attempt to reload the new item from the database
|
||||
@ -2405,7 +2405,10 @@ def after_delete_stock_item(sender, instance: StockItem, **kwargs):
|
||||
if InvenTree.ready.canAppAccessDatabase(allow_test=True):
|
||||
# Run this check in the background
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.notify_low_stock_if_required, instance.part
|
||||
part_tasks.notify_low_stock_if_required,
|
||||
instance.part.pk,
|
||||
group='notification',
|
||||
force_async=True,
|
||||
)
|
||||
|
||||
if InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING):
|
||||
@ -2422,7 +2425,10 @@ def after_save_stock_item(sender, instance: StockItem, created, **kwargs):
|
||||
if created and not InvenTree.ready.isImportingData():
|
||||
if InvenTree.ready.canAppAccessDatabase(allow_test=True):
|
||||
InvenTree.tasks.offload_task(
|
||||
part_tasks.notify_low_stock_if_required, instance.part
|
||||
part_tasks.notify_low_stock_if_required,
|
||||
instance.part.pk,
|
||||
group='notification',
|
||||
force_async=True,
|
||||
)
|
||||
|
||||
if InvenTree.ready.canAppAccessDatabase(allow_test=settings.TESTING_PRICING):
|
||||
|
Loading…
x
Reference in New Issue
Block a user