mirror of
https://github.com/inventree/InvenTree.git
synced 2025-06-17 12:35:46 +00:00
Tasks API Endpoint (#6230)
* Add API endpoint for background task overview * Cleanup other pending heartbeat tasks * Adds API endpoint for queued tasks * Adds API endpoint for scheduled tasks * Add API endpoint for failed tasks * Update API version info * Add table for displaying pending tasks * Add failed tasks table * Use accordion * Annotate extra data to scheduled tasks serializer * Extend API functionality * Update tasks.py - Allow skipping of static file step in "invoke update" - Allows for quicker updates in dev mode * Display task result error for failed tasks * Allow delete of failed tasks * Remove old debug message * Adds ability to delete pending tasks * Update table columns * Fix unused imports * Prevent multiple heartbeat functions from being added to the queue at startup * Add unit tests for API
This commit is contained in:
@ -1,11 +1,14 @@
|
||||
"""InvenTree API version information."""
|
||||
|
||||
# InvenTree API version
|
||||
INVENTREE_API_VERSION = 161
|
||||
INVENTREE_API_VERSION = 162
|
||||
"""Increment this API version number whenever there is a significant change to the API that any clients need to know about."""
|
||||
|
||||
INVENTREE_API_TEXT = """
|
||||
|
||||
v162 -> 2024-01-14 : https://github.com/inventree/InvenTree/pull/6230
|
||||
- Adds API endpoints to provide information on background tasks
|
||||
|
||||
v161 -> 2024-01-13 : https://github.com/inventree/InvenTree/pull/6222
|
||||
- Adds API endpoint for system error information
|
||||
|
||||
|
@ -138,12 +138,22 @@ class InvenTreeConfig(AppConfig):
|
||||
Schedule.objects.bulk_update(tasks_to_update, ['schedule_type', 'minutes'])
|
||||
logger.info('Updated %s existing scheduled tasks', len(tasks_to_update))
|
||||
|
||||
# Put at least one task onto the background worker stack,
|
||||
# which will be processed as soon as the worker comes online
|
||||
InvenTree.tasks.offload_task(InvenTree.tasks.heartbeat, force_async=True)
|
||||
self.add_heartbeat()
|
||||
|
||||
logger.info('Started %s scheduled background tasks...', len(tasks))
|
||||
|
||||
def add_heartbeat(self):
|
||||
"""Ensure there is at least one background task in the queue."""
|
||||
import django_q.models
|
||||
|
||||
try:
|
||||
if django_q.models.OrmQ.objects.count() == 0:
|
||||
InvenTree.tasks.offload_task(
|
||||
InvenTree.tasks.heartbeat, force_async=True
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def collect_tasks(self):
|
||||
"""Collect all background tasks."""
|
||||
for app_name, app in apps.app_configs.items():
|
||||
|
@ -347,7 +347,7 @@ def heartbeat():
|
||||
(There is probably a less "hacky" way of achieving this)?
|
||||
"""
|
||||
try:
|
||||
from django_q.models import Success
|
||||
from django_q.models import OrmQ, Success
|
||||
except AppRegistryNotReady: # pragma: no cover
|
||||
logger.info('Could not perform heartbeat task - App registry not ready')
|
||||
return
|
||||
@ -362,6 +362,11 @@ def heartbeat():
|
||||
|
||||
heartbeats.delete()
|
||||
|
||||
# Clear out any other pending heartbeat tasks
|
||||
for task in OrmQ.objects.all():
|
||||
if task.func() == 'InvenTree.tasks.heartbeat':
|
||||
task.delete()
|
||||
|
||||
|
||||
@scheduled_task(ScheduledTask.DAILY)
|
||||
def delete_successful_tasks():
|
||||
|
@ -8,6 +8,7 @@ from django.urls import include, path, re_path
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
|
||||
import django_q.models
|
||||
from django_q.tasks import async_task
|
||||
from djmoney.contrib.exchange.models import ExchangeBackend, Rate
|
||||
from error_report.models import Error
|
||||
@ -509,6 +510,71 @@ class ErrorMessageDetail(RetrieveUpdateDestroyAPI):
|
||||
permission_classes = [permissions.IsAuthenticated, IsAdminUser]
|
||||
|
||||
|
||||
class BackgroundTaskOverview(APIView):
|
||||
"""Provides an overview of the background task queue status."""
|
||||
|
||||
permission_classes = [permissions.IsAuthenticated, IsAdminUser]
|
||||
|
||||
def get(self, request, format=None):
|
||||
"""Return information about the current status of the background task queue."""
|
||||
import django_q.models as q_models
|
||||
|
||||
import InvenTree.status
|
||||
|
||||
serializer = common.serializers.TaskOverviewSerializer({
|
||||
'is_running': InvenTree.status.is_worker_running(),
|
||||
'pending_tasks': q_models.OrmQ.objects.count(),
|
||||
'scheduled_tasks': q_models.Schedule.objects.count(),
|
||||
'failed_tasks': q_models.Failure.objects.count(),
|
||||
})
|
||||
|
||||
return Response(serializer.data)
|
||||
|
||||
|
||||
class PendingTaskList(BulkDeleteMixin, ListAPI):
|
||||
"""Provides a read-only list of currently pending tasks."""
|
||||
|
||||
permission_classes = [permissions.IsAuthenticated, IsAdminUser]
|
||||
|
||||
queryset = django_q.models.OrmQ.objects.all()
|
||||
serializer_class = common.serializers.PendingTaskSerializer
|
||||
|
||||
|
||||
class ScheduledTaskList(ListAPI):
|
||||
"""Provides a read-only list of currently scheduled tasks."""
|
||||
|
||||
permission_classes = [permissions.IsAuthenticated, IsAdminUser]
|
||||
|
||||
queryset = django_q.models.Schedule.objects.all()
|
||||
serializer_class = common.serializers.ScheduledTaskSerializer
|
||||
|
||||
filter_backends = SEARCH_ORDER_FILTER
|
||||
|
||||
ordering_fields = ['pk', 'func', 'last_run', 'next_run']
|
||||
|
||||
search_fields = ['func']
|
||||
|
||||
def get_queryset(self):
|
||||
"""Return annotated queryset."""
|
||||
queryset = super().get_queryset()
|
||||
return common.serializers.ScheduledTaskSerializer.annotate_queryset(queryset)
|
||||
|
||||
|
||||
class FailedTaskList(BulkDeleteMixin, ListAPI):
|
||||
"""Provides a read-only list of currently failed tasks."""
|
||||
|
||||
permission_classes = [permissions.IsAuthenticated, IsAdminUser]
|
||||
|
||||
queryset = django_q.models.Failure.objects.all()
|
||||
serializer_class = common.serializers.FailedTaskSerializer
|
||||
|
||||
filter_backends = SEARCH_ORDER_FILTER
|
||||
|
||||
ordering_fields = ['pk', 'func', 'started', 'stopped']
|
||||
|
||||
search_fields = ['func']
|
||||
|
||||
|
||||
class FlagList(ListAPI):
|
||||
"""List view for feature flags."""
|
||||
|
||||
@ -590,6 +656,24 @@ common_api_urls = [
|
||||
re_path(
|
||||
r'^notes-image-upload/', NotesImageList.as_view(), name='api-notes-image-list'
|
||||
),
|
||||
# Background task information
|
||||
re_path(
|
||||
r'^background-task/',
|
||||
include([
|
||||
re_path(
|
||||
r'^pending/', PendingTaskList.as_view(), name='api-pending-task-list'
|
||||
),
|
||||
re_path(
|
||||
r'^scheduled/',
|
||||
ScheduledTaskList.as_view(),
|
||||
name='api-scheduled-task-list',
|
||||
),
|
||||
re_path(r'^failed/', FailedTaskList.as_view(), name='api-failed-task-list'),
|
||||
re_path(
|
||||
r'^.*$', BackgroundTaskOverview.as_view(), name='api-task-overview'
|
||||
),
|
||||
]),
|
||||
),
|
||||
# Project codes
|
||||
re_path(
|
||||
r'^project-code/',
|
||||
|
@ -1,7 +1,10 @@
|
||||
"""JSON serializers for common components."""
|
||||
|
||||
from django.db.models import OuterRef, Subquery
|
||||
from django.urls import reverse
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import django_q.models
|
||||
from error_report.models import Error
|
||||
from flags.state import flag_state
|
||||
from rest_framework import serializers
|
||||
@ -316,3 +319,120 @@ class ErrorMessageSerializer(InvenTreeModelSerializer):
|
||||
fields = ['when', 'info', 'data', 'path', 'pk']
|
||||
|
||||
read_only_fields = ['when', 'info', 'data', 'path', 'pk']
|
||||
|
||||
|
||||
class TaskOverviewSerializer(serializers.Serializer):
|
||||
"""Serializer for background task overview."""
|
||||
|
||||
is_running = serializers.BooleanField(
|
||||
label=_('Is Running'),
|
||||
help_text='Boolean value to indicate if the background worker process is running.',
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
pending_tasks = serializers.IntegerField(
|
||||
label=_('Pending Tasks'),
|
||||
help_text='Number of active background tasks',
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
scheduled_tasks = serializers.IntegerField(
|
||||
label=_('Scheduled Tasks'),
|
||||
help_text='Number of scheduled background tasks',
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
failed_tasks = serializers.IntegerField(
|
||||
label=_('Failed Tasks'),
|
||||
help_text='Number of failed background tasks',
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
|
||||
class PendingTaskSerializer(InvenTreeModelSerializer):
|
||||
"""Serializer for an individual pending task object."""
|
||||
|
||||
class Meta:
|
||||
"""Metaclass options for the serializer."""
|
||||
|
||||
model = django_q.models.OrmQ
|
||||
fields = ['pk', 'key', 'lock', 'task_id', 'name', 'func', 'args', 'kwargs']
|
||||
|
||||
task_id = serializers.CharField(label=_('Task ID'), help_text=_('Unique task ID'))
|
||||
|
||||
lock = serializers.DateTimeField(label=_('Lock'), help_text=_('Lock time'))
|
||||
|
||||
name = serializers.CharField(label=_('Name'), help_text=_('Task name'))
|
||||
|
||||
func = serializers.CharField(label=_('Function'), help_text=_('Function name'))
|
||||
|
||||
args = serializers.CharField(label=_('Arguments'), help_text=_('Task arguments'))
|
||||
|
||||
kwargs = serializers.CharField(
|
||||
label=_('Keyword Arguments'), help_text=_('Task keyword arguments')
|
||||
)
|
||||
|
||||
|
||||
class ScheduledTaskSerializer(InvenTreeModelSerializer):
|
||||
"""Serializer for an individual scheduled task object."""
|
||||
|
||||
class Meta:
|
||||
"""Metaclass options for the serializer."""
|
||||
|
||||
model = django_q.models.Schedule
|
||||
fields = [
|
||||
'pk',
|
||||
'name',
|
||||
'func',
|
||||
'args',
|
||||
'kwargs',
|
||||
'schedule_type',
|
||||
'repeats',
|
||||
'last_run',
|
||||
'next_run',
|
||||
'success',
|
||||
'task',
|
||||
]
|
||||
|
||||
last_run = serializers.DateTimeField()
|
||||
success = serializers.BooleanField()
|
||||
|
||||
@staticmethod
|
||||
def annotate_queryset(queryset):
|
||||
"""Add custom annotations to the queryset.
|
||||
|
||||
- last_run: The last time the task was run
|
||||
- success: The outcome status of the last run
|
||||
"""
|
||||
task = django_q.models.Task.objects.filter(id=OuterRef('task'))
|
||||
|
||||
queryset = queryset.annotate(
|
||||
last_run=Subquery(task.values('started')[:1]),
|
||||
success=Subquery(task.values('success')[:1]),
|
||||
)
|
||||
|
||||
return queryset
|
||||
|
||||
|
||||
class FailedTaskSerializer(InvenTreeModelSerializer):
|
||||
"""Serializer for an individual failed task object."""
|
||||
|
||||
class Meta:
|
||||
"""Metaclass options for the serializer."""
|
||||
|
||||
model = django_q.models.Failure
|
||||
fields = [
|
||||
'pk',
|
||||
'name',
|
||||
'func',
|
||||
'args',
|
||||
'kwargs',
|
||||
'started',
|
||||
'stopped',
|
||||
'attempt_count',
|
||||
'result',
|
||||
]
|
||||
|
||||
pk = serializers.CharField(source='id', read_only=True)
|
||||
|
||||
result = serializers.CharField()
|
||||
|
@ -658,6 +658,50 @@ class PluginSettingsApiTest(PluginMixin, InvenTreeAPITestCase):
|
||||
...
|
||||
|
||||
|
||||
class TaskListApiTests(InvenTreeAPITestCase):
|
||||
"""Unit tests for the background task API endpoints."""
|
||||
|
||||
def test_pending_tasks(self):
|
||||
"""Test that the pending tasks endpoint is available."""
|
||||
# Schedule some tasks
|
||||
from django_q.models import OrmQ
|
||||
|
||||
from InvenTree.tasks import offload_task
|
||||
|
||||
n = OrmQ.objects.count()
|
||||
|
||||
for i in range(3):
|
||||
offload_task(f'fake_module.test_{i}', force_async=True)
|
||||
|
||||
self.assertEqual(OrmQ.objects.count(), 3)
|
||||
|
||||
url = reverse('api-pending-task-list')
|
||||
response = self.get(url, expected_code=200)
|
||||
|
||||
self.assertEqual(len(response.data), n + 3)
|
||||
|
||||
for task in response.data:
|
||||
self.assertTrue(task['func'].startswith('fake_module.test_'))
|
||||
|
||||
def test_scheduled_tasks(self):
|
||||
"""Test that the scheduled tasks endpoint is available."""
|
||||
from django_q.models import Schedule
|
||||
|
||||
for i in range(5):
|
||||
Schedule.objects.create(
|
||||
name='time.sleep', func='time.sleep', args=f'{i + 1}'
|
||||
)
|
||||
|
||||
n = Schedule.objects.count()
|
||||
self.assertGreater(n, 0)
|
||||
|
||||
url = reverse('api-scheduled-task-list')
|
||||
response = self.get(url, expected_code=200)
|
||||
|
||||
for task in response.data:
|
||||
self.assertTrue(task['name'] == 'time.sleep')
|
||||
|
||||
|
||||
class WebhookMessageTests(TestCase):
|
||||
"""Tests for webhooks."""
|
||||
|
||||
|
@ -54,9 +54,7 @@ class CompanyList(ListCreateAPI):
|
||||
def get_queryset(self):
|
||||
"""Return annotated queryset for the company list endpoint."""
|
||||
queryset = super().get_queryset()
|
||||
queryset = CompanySerializer.annotate_queryset(queryset)
|
||||
|
||||
return queryset
|
||||
return CompanySerializer.annotate_queryset(queryset)
|
||||
|
||||
filter_backends = SEARCH_ORDER_FILTER
|
||||
|
||||
|
Reference in New Issue
Block a user