mirror of
https://github.com/inventree/InvenTree.git
synced 2026-04-04 02:21:18 +00:00
Reduce load on background worker (#11651)
* Do not save setting with identical value * Prevent task duplication * Logic fixes * Add unit test for task de-duplication * Updated unit test
This commit is contained in:
@@ -159,8 +159,56 @@ def record_task_success(task_name: str):
|
|||||||
set_global_setting(f'_{task_name}_SUCCESS', datetime.now().isoformat(), None)
|
set_global_setting(f'_{task_name}_SUCCESS', datetime.now().isoformat(), None)
|
||||||
|
|
||||||
|
|
||||||
|
def check_existing_task(taskname, group: str, *args, **kwargs) -> Optional[str]:
|
||||||
|
"""Test if an identical task is already registered with the worker.
|
||||||
|
|
||||||
|
This will only return true if the task name, group, args and kwargs all match an existing task.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
taskname: The name of the task to check for, in the format 'app.module.function'
|
||||||
|
group: The group that the task belongs to
|
||||||
|
*args: Positional arguments to match
|
||||||
|
**kwargs: Keyword arguments to match
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Optional[str]: The ID of the matching task, if found, otherwise None
|
||||||
|
"""
|
||||||
|
from django_q.models import OrmQ
|
||||||
|
|
||||||
|
task_id = None
|
||||||
|
|
||||||
|
# Iterate through all available tasks, with the most recent first
|
||||||
|
for task in OrmQ.objects.all().order_by('-id'):
|
||||||
|
if task.func() != taskname and task.task['func'] != taskname:
|
||||||
|
# Task does not match
|
||||||
|
continue
|
||||||
|
|
||||||
|
if task.group() != group:
|
||||||
|
# Group does not match
|
||||||
|
continue
|
||||||
|
|
||||||
|
if task.args() != args:
|
||||||
|
# Task args do not match
|
||||||
|
continue
|
||||||
|
|
||||||
|
if task.kwargs() != kwargs:
|
||||||
|
# Task kwargs do not match
|
||||||
|
continue
|
||||||
|
|
||||||
|
task_id = task.task_id()
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
return task_id
|
||||||
|
|
||||||
|
|
||||||
def offload_task(
|
def offload_task(
|
||||||
taskname, *args, force_async=False, force_sync=False, **kwargs
|
taskname,
|
||||||
|
*args,
|
||||||
|
force_async: bool = False,
|
||||||
|
force_sync: bool = False,
|
||||||
|
check_duplicates: bool = True,
|
||||||
|
**kwargs,
|
||||||
) -> str | bool:
|
) -> str | bool:
|
||||||
"""Create an AsyncTask if workers are running. This is different to a 'scheduled' task, in that it only runs once!
|
"""Create an AsyncTask if workers are running. This is different to a 'scheduled' task, in that it only runs once!
|
||||||
|
|
||||||
@@ -171,6 +219,7 @@ def offload_task(
|
|||||||
*args: Positional arguments to be passed to the task function
|
*args: Positional arguments to be passed to the task function
|
||||||
force_async: If True, force the task to be offloaded (even if workers are not running)
|
force_async: If True, force the task to be offloaded (even if workers are not running)
|
||||||
force_sync: If True, force the task to be run synchronously (even if workers are running)
|
force_sync: If True, force the task to be run synchronously (even if workers are running)
|
||||||
|
check_duplicates: If True, check for existing identical tasks before offloading
|
||||||
**kwargs: Keyword arguments to be passed to the task function
|
**kwargs: Keyword arguments to be passed to the task function
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -205,6 +254,15 @@ def offload_task(
|
|||||||
force_sync = True
|
force_sync = True
|
||||||
|
|
||||||
if force_async or (is_worker_running() and not force_sync):
|
if force_async or (is_worker_running() and not force_sync):
|
||||||
|
# Before offloading, check if a duplicate task exists
|
||||||
|
if not force_sync and check_duplicates:
|
||||||
|
if task_id := check_existing_task(taskname, group, *args, **kwargs):
|
||||||
|
logger.debug(
|
||||||
|
"Skipping duplicate task '%s' with ID '%s'", taskname, task_id
|
||||||
|
)
|
||||||
|
|
||||||
|
return task_id
|
||||||
|
|
||||||
# Running as asynchronous task
|
# Running as asynchronous task
|
||||||
try:
|
try:
|
||||||
task = AsyncTask(taskname, *args, group=group, **kwargs)
|
task = AsyncTask(taskname, *args, group=group, **kwargs)
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ from django.db.utils import NotSupportedError
|
|||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
from django_q.models import Schedule, Task
|
from django_q.models import OrmQ, Schedule, Task
|
||||||
from error_report.models import Error
|
from error_report.models import Error
|
||||||
|
|
||||||
import InvenTree.tasks
|
import InvenTree.tasks
|
||||||
@@ -294,3 +294,74 @@ class InvenTreeTaskTests(PluginRegistryMixin, TestCase):
|
|||||||
)
|
)
|
||||||
msg.timestamp = timestamp
|
msg.timestamp = timestamp
|
||||||
msg.save()
|
msg.save()
|
||||||
|
|
||||||
|
def test_duplicate_tasks(self):
|
||||||
|
"""Test for task duplication."""
|
||||||
|
# Start with a blank slate
|
||||||
|
OrmQ.objects.all().delete()
|
||||||
|
|
||||||
|
# Add some unique tasks
|
||||||
|
for idx in range(10):
|
||||||
|
InvenTree.tasks.offload_task(
|
||||||
|
f'dummy_module.dummy_function_{idx}', force_async=True
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(OrmQ.objects.count(), 10)
|
||||||
|
|
||||||
|
# Add some duplicate tasks
|
||||||
|
for _idx in range(10):
|
||||||
|
InvenTree.tasks.offload_task(
|
||||||
|
'dummy_module.dummy_function_x',
|
||||||
|
1,
|
||||||
|
2,
|
||||||
|
3,
|
||||||
|
animal='cat',
|
||||||
|
vegetable='carrot',
|
||||||
|
force_async=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Only 1 extra task should have been added
|
||||||
|
self.assertEqual(OrmQ.objects.count(), 11)
|
||||||
|
|
||||||
|
# Add some more duplicate tasks, but ignore duplication checks
|
||||||
|
for _idx in range(10):
|
||||||
|
InvenTree.tasks.offload_task(
|
||||||
|
'dummy_module.dummy_function_y',
|
||||||
|
1,
|
||||||
|
2,
|
||||||
|
3,
|
||||||
|
animal='dog',
|
||||||
|
vegetable='yam',
|
||||||
|
force_async=True,
|
||||||
|
check_duplicates=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 10 extra tasks should have been added
|
||||||
|
self.assertEqual(OrmQ.objects.count(), 21)
|
||||||
|
|
||||||
|
# Add more tasks, which are *not* duplicates based on args
|
||||||
|
for idx in range(10):
|
||||||
|
InvenTree.tasks.offload_task(
|
||||||
|
'dummy_module.dummy_function',
|
||||||
|
1,
|
||||||
|
idx,
|
||||||
|
3,
|
||||||
|
animal='cat',
|
||||||
|
vegetable='carrot',
|
||||||
|
force_async=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add more tasks, which are *not* duplicates based on kwargs
|
||||||
|
for idx in range(10):
|
||||||
|
InvenTree.tasks.offload_task(
|
||||||
|
'dummy_module.dummy_function',
|
||||||
|
1,
|
||||||
|
2,
|
||||||
|
3,
|
||||||
|
animal='cat',
|
||||||
|
vegetable=f'vegetable_{idx}',
|
||||||
|
force_async=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 20 more tasks should have been added
|
||||||
|
self.assertEqual(OrmQ.objects.count(), 41)
|
||||||
|
|||||||
@@ -53,6 +53,12 @@ def set_global_setting(key, value, change_user=None, create=True, **kwargs):
|
|||||||
kwargs['change_user'] = change_user
|
kwargs['change_user'] = change_user
|
||||||
kwargs['create'] = create
|
kwargs['create'] = create
|
||||||
|
|
||||||
|
if get_global_setting(key, create=False, cache=False) == value:
|
||||||
|
logger.debug(
|
||||||
|
f'Global setting "{key}" already has the desired value, no update needed'
|
||||||
|
)
|
||||||
|
return True
|
||||||
|
|
||||||
return InvenTreeSetting.set_setting(key, value, **kwargs)
|
return InvenTreeSetting.set_setting(key, value, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user