mirror of
https://github.com/inventree/InvenTree.git
synced 2026-04-06 11:31:04 +00:00
[API] Monitor task (#11527)
* Enhance docstring * Return the ID of an offloaded task * Add API endpoint for background task detail * Add UI hook for monitoring background task progress * Handle queued tasks (not yet started) * Improve UX * Update frontend lib version * Bump API version * Fix notification * Simplify UI interface * Implement internal hook * Fix API path sequence * Add unit tests for task detail endpoint * Refactor code into reusable model * Explicit operation_id for API endpoints * Further refactoring * Use 200 response code - axios does not like 202, simplify it * Return task response for validation of part BOM * Fix schema * Cleanup * Run background worker during playwright tests - For full e2e integration testing * Improve hooks and unit testing * Rename custom hooks to meet react naming requirements
This commit is contained in:
@@ -1,11 +1,14 @@
|
||||
"""InvenTree API version information."""
|
||||
|
||||
# InvenTree API version
|
||||
INVENTREE_API_VERSION = 463
|
||||
INVENTREE_API_VERSION = 464
|
||||
"""Increment this API version number whenever there is a significant change to the API that any clients need to know about."""
|
||||
|
||||
INVENTREE_API_TEXT = """
|
||||
|
||||
v464 -> 2026-03-15 : https://github.com/inventree/InvenTree/pull/11527
|
||||
- Add API endpoint for monitoring the progress of a particular background task
|
||||
|
||||
v463 -> 2026-03-12 : https://github.com/inventree/InvenTree/pull/11499
|
||||
- Allow "bulk update" actions against StockItem endpoint
|
||||
|
||||
|
||||
@@ -953,6 +953,7 @@ Q_CLUSTER = {
|
||||
'max_attempts': int(
|
||||
get_setting('INVENTREE_BACKGROUND_MAX_ATTEMPTS', 'background.max_attempts', 5)
|
||||
),
|
||||
'save_limit': 1000,
|
||||
'queue_limit': 50,
|
||||
'catch_up': False,
|
||||
'bulk': 10,
|
||||
|
||||
@@ -161,13 +161,20 @@ def record_task_success(task_name: str):
|
||||
|
||||
def offload_task(
|
||||
taskname, *args, force_async=False, force_sync=False, **kwargs
|
||||
) -> bool:
|
||||
) -> str | bool:
|
||||
"""Create an AsyncTask if workers are running. This is different to a 'scheduled' task, in that it only runs once!
|
||||
|
||||
If workers are not running or force_sync flag, is set then the task is ran synchronously.
|
||||
|
||||
Arguments:
|
||||
taskname: The name of the task to be run, in the format 'app.module.function'
|
||||
*args: Positional arguments to be passed to the task function
|
||||
force_async: If True, force the task to be offloaded (even if workers are not running)
|
||||
force_sync: If True, force the task to be run synchronously (even if workers are running)
|
||||
**kwargs: Keyword arguments to be passed to the task function
|
||||
|
||||
Returns:
|
||||
bool: True if the task was offloaded (or ran), False otherwise
|
||||
str | bool: Task ID if the task was offloaded, True if ran synchronously, False otherwise
|
||||
"""
|
||||
from InvenTree.exceptions import log_error
|
||||
|
||||
@@ -203,6 +210,9 @@ def offload_task(
|
||||
task = AsyncTask(taskname, *args, group=group, **kwargs)
|
||||
with tracer.start_as_current_span(f'async worker: {taskname}'):
|
||||
task.run()
|
||||
|
||||
# Return the ID of the offloaded task, so that it can be tracked if needed
|
||||
return task.id
|
||||
except ImportError:
|
||||
raise_warning(f"WARNING: '{taskname}' not offloaded - Function not found")
|
||||
return False
|
||||
@@ -265,6 +275,40 @@ def offload_task(
|
||||
return True
|
||||
|
||||
|
||||
def get_queued_task(task_id: str):
|
||||
"""Find the task in the queue, if it exists.
|
||||
|
||||
Note that the OrmQ table does NOT keep the task ID as a database field,
|
||||
it is instead stored in the payload data.
|
||||
If there are a large number of pending tasks, this query may be inefficient,
|
||||
but there is no other way to find a queued task by ID.
|
||||
"""
|
||||
offset = 0
|
||||
limit = 500
|
||||
|
||||
if not task_id:
|
||||
# Return early if no task ID was provided
|
||||
return None
|
||||
|
||||
task_id = str(task_id)
|
||||
|
||||
from django_q.models import OrmQ
|
||||
|
||||
while True:
|
||||
queued_tasks = OrmQ.objects.all().order_by('id')[offset : offset + limit]
|
||||
if not queued_tasks:
|
||||
break
|
||||
|
||||
for task in queued_tasks:
|
||||
if task.task_id() == task_id:
|
||||
return task
|
||||
|
||||
offset += limit
|
||||
|
||||
# No matching task was discovered
|
||||
return None
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ScheduledTask:
|
||||
"""A scheduled task.
|
||||
|
||||
@@ -17,8 +17,8 @@ from django.views.decorators.csrf import csrf_exempt
|
||||
|
||||
import django_filters.rest_framework.filters as rest_filters
|
||||
import django_q.models
|
||||
import django_q.tasks
|
||||
from django_filters.rest_framework.filterset import FilterSet
|
||||
from django_q.tasks import async_task
|
||||
from djmoney.contrib.exchange.models import ExchangeBackend, Rate
|
||||
from drf_spectacular.utils import OpenApiResponse, extend_schema
|
||||
from error_report.models import Error
|
||||
@@ -115,7 +115,7 @@ class WebhookView(CsrfExemptMixin, APIView):
|
||||
# process data
|
||||
message = self.webhook.save_data(payload, headers, request)
|
||||
if self.run_async:
|
||||
async_task(self._process_payload, message.id)
|
||||
django_q.tasks.async_task(self._process_payload, message.id)
|
||||
else:
|
||||
self._process_result(
|
||||
self.webhook.process_payload(message, payload, headers), message
|
||||
@@ -564,13 +564,29 @@ class ErrorMessageDetail(RetrieveUpdateDestroyAPI):
|
||||
permission_classes = [IsAuthenticatedOrReadScope, IsAdminUser]
|
||||
|
||||
|
||||
class BackgroundTaskDetail(APIView):
|
||||
"""Detail view for a single background task."""
|
||||
|
||||
permission_classes = [IsAuthenticatedOrReadScope]
|
||||
|
||||
@extend_schema(responses={200: common.serializers.TaskDetailSerializer})
|
||||
def get(self, request, task_id, *args, **kwargs):
|
||||
"""Fetch information regarding a particular background task ID."""
|
||||
response = common.serializers.TaskDetailSerializer.from_task(task_id).data
|
||||
|
||||
return Response(response, status=response['http_status'])
|
||||
|
||||
|
||||
class BackgroundTaskOverview(APIView):
|
||||
"""Provides an overview of the background task queue status."""
|
||||
|
||||
permission_classes = [IsAuthenticatedOrReadScope, IsAdminUser]
|
||||
serializer_class = None
|
||||
|
||||
@extend_schema(responses={200: common.serializers.TaskOverviewSerializer})
|
||||
@extend_schema(
|
||||
operation_id='background_task_overview',
|
||||
responses={200: common.serializers.TaskOverviewSerializer},
|
||||
)
|
||||
def get(self, request, fmt=None):
|
||||
"""Return information about the current status of the background task queue."""
|
||||
import django_q.models as q_models
|
||||
@@ -1396,6 +1412,9 @@ common_api_urls = [
|
||||
name='api-scheduled-task-list',
|
||||
),
|
||||
path('failed/', FailedTaskList.as_view(), name='api-failed-task-list'),
|
||||
path(
|
||||
'<str:task_id>/', BackgroundTaskDetail.as_view(), name='api-task-detail'
|
||||
),
|
||||
path('', BackgroundTaskOverview.as_view(), name='api-task-overview'),
|
||||
]),
|
||||
),
|
||||
|
||||
@@ -522,6 +522,78 @@ class ErrorMessageSerializer(InvenTreeModelSerializer):
|
||||
read_only_fields = ['when', 'info', 'data', 'path', 'pk']
|
||||
|
||||
|
||||
class TaskDetailSerializer(serializers.Serializer):
|
||||
"""Serializer for a background task detail."""
|
||||
|
||||
task_id = serializers.CharField(read_only=True)
|
||||
exists = serializers.BooleanField(read_only=True)
|
||||
pending = serializers.BooleanField(read_only=True)
|
||||
complete = serializers.BooleanField(read_only=True)
|
||||
success = serializers.BooleanField(read_only=True)
|
||||
http_status = serializers.IntegerField(read_only=True)
|
||||
|
||||
@classmethod
|
||||
def from_task(cls, task_id: str | bool | None) -> 'TaskDetailSerializer':
|
||||
"""Create a TaskDetailSerializer instance from a django_q Task.
|
||||
|
||||
Arguments:
|
||||
task_id: The ID of the task to retrieve details for.
|
||||
|
||||
Returns:
|
||||
An instance of TaskDetailSerializer with the task details.
|
||||
|
||||
Notes:
|
||||
- If the provided task_id is None, the task has not been run, or has errored out
|
||||
- If the provided task_id is a boolean, the task has been run synchronously, and the boolean value indicates success or failure
|
||||
- If the provided task_id is a string, the task has been offloaded to the background worker, and the details can be from the database
|
||||
|
||||
"""
|
||||
from InvenTree.tasks import get_queued_task
|
||||
|
||||
if task_id is None or type(task_id) is bool:
|
||||
# If the task_id is a boolean, the task has been run synchronously
|
||||
return cls({
|
||||
'task_id': '',
|
||||
'exists': False,
|
||||
'pending': False,
|
||||
'complete': task_id is not None,
|
||||
'success': False if task_id is None else bool(task_id),
|
||||
'http_status': 404 if task_id is None else 200,
|
||||
})
|
||||
|
||||
# A non-boolean result indicates that the task has been offloaded to the background worker
|
||||
success = django_q.models.Success.objects.filter(id=task_id).first()
|
||||
failure = django_q.models.Failure.objects.filter(id=task_id).first()
|
||||
task = (
|
||||
success
|
||||
or failure
|
||||
or django_q.models.Task.objects.filter(id=task_id).first()
|
||||
)
|
||||
queued = False
|
||||
|
||||
exists = bool(success or failure or task)
|
||||
|
||||
if not exists:
|
||||
# If the task has not been started yet, it may be present in the queue
|
||||
queued = bool(get_queued_task(task_id))
|
||||
|
||||
complete = bool(success) or bool(failure)
|
||||
|
||||
# Determine the http_status code for the task
|
||||
# - 200: Task exists and has been completed
|
||||
# - 404: Task does not exist
|
||||
http_status = 200 if exists or queued else 404
|
||||
|
||||
return cls({
|
||||
'task_id': task_id,
|
||||
'exists': exists or queued,
|
||||
'pending': queued,
|
||||
'complete': complete,
|
||||
'success': bool(success),
|
||||
'http_status': http_status,
|
||||
})
|
||||
|
||||
|
||||
class TaskOverviewSerializer(serializers.Serializer):
|
||||
"""Serializer for background task overview."""
|
||||
|
||||
|
||||
@@ -1107,6 +1107,41 @@ class TaskListApiTests(InvenTreeAPITestCase):
|
||||
for task in response.data:
|
||||
self.assertEqual(task['name'], 'time.sleep')
|
||||
|
||||
def test_task_detail(self):
|
||||
"""Test the BackgroundTaskDetail API endpoint."""
|
||||
from InvenTree.tasks import offload_task
|
||||
|
||||
# Force run a task
|
||||
result = offload_task('fake_module.test_task', force_sync=True)
|
||||
self.assertFalse(result)
|
||||
self.assertEqual(type(result), bool)
|
||||
|
||||
# Schedule a dummy task - and ensure it offloads to the worker
|
||||
task_id = offload_task('fake_module.test_task', force_async=True)
|
||||
self.assertIsNotNone(task_id)
|
||||
self.assertEqual(type(task_id), str)
|
||||
|
||||
url = reverse('api-task-detail', kwargs={'task_id': task_id})
|
||||
|
||||
data = self.get(url, expected_code=200).data
|
||||
|
||||
self.assertEqual(data['task_id'], task_id)
|
||||
self.assertTrue(data['exists'])
|
||||
self.assertTrue(data['pending'])
|
||||
self.assertFalse(data['complete'])
|
||||
self.assertFalse(data['success'])
|
||||
|
||||
# Perform a lookup for a non-existent task
|
||||
url = reverse('api-task-detail', kwargs={'task_id': 'doesnotexist'})
|
||||
|
||||
data = self.get(url, expected_code=404).data
|
||||
|
||||
self.assertEqual(data['task_id'], 'doesnotexist')
|
||||
self.assertFalse(data['exists'])
|
||||
self.assertFalse(data['pending'])
|
||||
self.assertFalse(data['complete'])
|
||||
self.assertFalse(data['success'])
|
||||
|
||||
|
||||
class WebhookMessageTests(TestCase):
|
||||
"""Tests for webhooks."""
|
||||
|
||||
@@ -8,10 +8,11 @@ import django_filters.rest_framework.filters as rest_filters
|
||||
from django_filters.rest_framework import DjangoFilterBackend
|
||||
from django_filters.rest_framework.filterset import FilterSet
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from drf_spectacular.utils import extend_schema_field
|
||||
from drf_spectacular.utils import extend_schema, extend_schema_field
|
||||
from rest_framework import serializers
|
||||
from rest_framework.response import Response
|
||||
|
||||
import common.serializers
|
||||
import part.tasks as part_tasks
|
||||
from data_exporter.mixins import DataExportViewMixin
|
||||
from InvenTree.api import (
|
||||
@@ -614,8 +615,18 @@ class PartValidateBOM(RetrieveUpdateAPI):
|
||||
queryset = Part.objects.all()
|
||||
serializer_class = part_serializers.PartBomValidateSerializer
|
||||
|
||||
@extend_schema(
|
||||
responses={
|
||||
200: common.serializers.TaskDetailSerializer,
|
||||
404: common.serializers.TaskDetailSerializer,
|
||||
}
|
||||
)
|
||||
def update(self, request, *args, **kwargs):
|
||||
"""Validate the referenced BomItem instance."""
|
||||
"""Validate the referenced BomItem instance.
|
||||
|
||||
As this task if offloaded to the background worker,
|
||||
we return information about the background task which is performing the validation.
|
||||
"""
|
||||
part = self.get_object()
|
||||
|
||||
partial = kwargs.pop('partial', False)
|
||||
@@ -629,7 +640,7 @@ class PartValidateBOM(RetrieveUpdateAPI):
|
||||
valid = str2bool(serializer.validated_data.get('valid', False))
|
||||
|
||||
# BOM validation may take some time, so we offload it to a background task
|
||||
offload_task(
|
||||
task_id = offload_task(
|
||||
part_tasks.validate_bom,
|
||||
part.pk,
|
||||
valid,
|
||||
@@ -637,10 +648,8 @@ class PartValidateBOM(RetrieveUpdateAPI):
|
||||
group='part',
|
||||
)
|
||||
|
||||
# Re-serialize the response
|
||||
serializer = self.get_serializer(part, many=False)
|
||||
|
||||
return Response(serializer.data)
|
||||
response = common.serializers.TaskDetailSerializer.from_task(task_id).data
|
||||
return Response(response, status=response['http_status'])
|
||||
|
||||
|
||||
class PartFilter(FilterSet):
|
||||
|
||||
Reference in New Issue
Block a user