mirror of
https://github.com/inventree/InvenTree.git
synced 2026-06-06 00:44:25 +00:00
Django admin permission updates (#12036)
* Tweak task offloading - Improved function splitting - Remove call to eval * Read-only ScheduledTask admin - Prevent admin users creating / editing / deleting scheduled tasks * Add action for immediate trigger * Prevent editing of UserSession via django admin - Allow only read or delete - No create or edit permissions * Make StockTrackingAdmin read-only * Reduce permission scope on common admin models * Additional unit tests for task offloading * Only superuser can change certain user fields * Adjust unit test --------- Co-authored-by: Matthias Mair <code@mjmair.com>
This commit is contained in:
@@ -2,7 +2,12 @@
|
||||
|
||||
from django.contrib import admin
|
||||
from django.http.request import HttpRequest
|
||||
from django.utils import timezone
|
||||
|
||||
from allauth.usersessions.admin import UserSessionAdmin
|
||||
from allauth.usersessions.models import UserSession
|
||||
from django_q.admin import ScheduleAdmin
|
||||
from django_q.models import Schedule
|
||||
from djmoney.contrib.exchange.admin import RateAdmin
|
||||
from djmoney.contrib.exchange.models import Rate
|
||||
|
||||
@@ -17,3 +22,51 @@ class CustomRateAdmin(RateAdmin):
|
||||
|
||||
admin.site.unregister(Rate)
|
||||
admin.site.register(Rate, CustomRateAdmin)
|
||||
|
||||
|
||||
def run_schedule_now(modeladmin, request, queryset):
|
||||
"""Immediately queue the selected scheduled tasks for execution."""
|
||||
queryset.update(next_run=timezone.now())
|
||||
count = queryset.count()
|
||||
modeladmin.message_user(request, f'{count} task(s) queued for immediate execution.')
|
||||
|
||||
|
||||
run_schedule_now.short_description = 'Run selected tasks now'
|
||||
|
||||
|
||||
class ReadOnlyScheduleAdmin(ScheduleAdmin):
|
||||
"""Read-only admin interface for django-q Schedule objects."""
|
||||
|
||||
actions = [run_schedule_now]
|
||||
|
||||
def has_add_permission(self, request: HttpRequest) -> bool:
|
||||
"""Prevent adding new Schedule objects."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request: HttpRequest, obj=None) -> bool:
|
||||
"""Prevent changing existing Schedule objects."""
|
||||
return False
|
||||
|
||||
def has_delete_permission(self, request: HttpRequest, obj=None) -> bool:
|
||||
"""Prevent deleting Schedule objects."""
|
||||
return False
|
||||
|
||||
|
||||
admin.site.unregister(Schedule)
|
||||
admin.site.register(Schedule, ReadOnlyScheduleAdmin)
|
||||
|
||||
|
||||
class InvenTreeUserSessionAdmin(UserSessionAdmin):
|
||||
"""Admin interface for UserSession - view and delete only, no add or edit."""
|
||||
|
||||
def has_add_permission(self, request: HttpRequest) -> bool:
|
||||
"""Prevent creating sessions via admin."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request: HttpRequest, obj=None) -> bool:
|
||||
"""Prevent editing sessions via admin."""
|
||||
return False
|
||||
|
||||
|
||||
admin.site.unregister(UserSession)
|
||||
admin.site.register(UserSession, InvenTreeUserSessionAdmin)
|
||||
|
||||
@@ -56,7 +56,11 @@ INVENTREE_BASE_URL = 'https://inventree.org'
|
||||
INVENTREE_NEWS_URL = f'{INVENTREE_BASE_URL}/news/feed.atom'
|
||||
|
||||
# Determine if we are running in "test" mode e.g. "manage.py test"
|
||||
TESTING = 'test' in sys.argv or 'TESTING' in os.environ or 'pytest' in sys.argv
|
||||
TESTING = (
|
||||
'test' in sys.argv
|
||||
or 'TESTING' in os.environ
|
||||
or any('pytest' in arg for arg in sys.argv)
|
||||
)
|
||||
|
||||
if TESTING:
|
||||
# Use a weaker password hasher for testing (improves testing speed)
|
||||
|
||||
@@ -282,40 +282,31 @@ def offload_task(
|
||||
# function was passed - use that
|
||||
_func = taskname
|
||||
else:
|
||||
# Split path
|
||||
# Split on the last dot: everything before is the module path,
|
||||
# everything after is the function name. rsplit handles any depth
|
||||
# (e.g. 'app.module.func' or 'app.sub.module.func').
|
||||
try:
|
||||
app, mod, func = taskname.split('.')
|
||||
app_mod = app + '.' + mod
|
||||
module_path, func_name = taskname.rsplit('.', 1)
|
||||
except ValueError:
|
||||
raise_warning(
|
||||
f"WARNING: '{taskname}' not started - Malformed function path"
|
||||
)
|
||||
return False
|
||||
|
||||
# Import module from app
|
||||
try:
|
||||
_mod = importlib.import_module(app_mod)
|
||||
_mod = importlib.import_module(module_path)
|
||||
except ModuleNotFoundError:
|
||||
log_error('offload_task', scope='worker')
|
||||
raise_warning(
|
||||
f"WARNING: '{taskname}' not started - No module named '{app_mod}'"
|
||||
f"WARNING: '{taskname}' not started - No module named '{module_path}'"
|
||||
)
|
||||
return False
|
||||
|
||||
# Retrieve function
|
||||
try:
|
||||
_func = getattr(_mod, func)
|
||||
except AttributeError: # pragma: no cover
|
||||
# getattr does not work for local import
|
||||
_func = None
|
||||
|
||||
try:
|
||||
if not _func:
|
||||
_func = eval(func) # pragma: no cover
|
||||
except NameError:
|
||||
_func = getattr(_mod, func_name, None)
|
||||
if _func is None:
|
||||
log_error('offload_task', scope='worker')
|
||||
raise_warning(
|
||||
f"WARNING: '{taskname}' not started - No function named '{func}'"
|
||||
f"WARNING: '{taskname}' not started - No function named '{func_name}'"
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Sample task functions used by offload_task unit tests."""
|
||||
|
||||
|
||||
def get_result():
|
||||
"""Return a fixed value; used to verify 4-part module paths resolve correctly."""
|
||||
return 'abc'
|
||||
@@ -67,26 +67,72 @@ class InvenTreeTaskTests(PluginRegistryMixin, TestCase):
|
||||
# Run with string ref
|
||||
InvenTree.tasks.offload_task('InvenTree.test_tasks.get_result')
|
||||
|
||||
# Error runs
|
||||
# The error-path tests below all use force_sync=True so that the sync
|
||||
# resolution code is exercised regardless of whether background workers
|
||||
# happen to be running in this environment.
|
||||
|
||||
# Malformed taskname
|
||||
with self.assertWarnsMessage(
|
||||
UserWarning, "WARNING: 'InvenTree' not started - Malformed function path"
|
||||
):
|
||||
InvenTree.tasks.offload_task('InvenTree')
|
||||
InvenTree.tasks.offload_task('InvenTree', force_sync=True)
|
||||
|
||||
# Non existent app
|
||||
with self.assertWarnsMessage(
|
||||
UserWarning,
|
||||
"WARNING: 'InvenTreeABC.test_tasks.doesnotmatter' not started - No module named 'InvenTreeABC.test_tasks'",
|
||||
):
|
||||
InvenTree.tasks.offload_task('InvenTreeABC.test_tasks.doesnotmatter')
|
||||
InvenTree.tasks.offload_task(
|
||||
'InvenTreeABC.test_tasks.doesnotmatter', force_sync=True
|
||||
)
|
||||
|
||||
# Non existent function
|
||||
with self.assertWarnsMessage(
|
||||
UserWarning,
|
||||
"WARNING: 'InvenTree.test_tasks.doesnotexist' not started - No function named 'doesnotexist'",
|
||||
):
|
||||
InvenTree.tasks.offload_task('InvenTree.test_tasks.doesnotexist')
|
||||
InvenTree.tasks.offload_task(
|
||||
'InvenTree.test_tasks.doesnotexist', force_sync=True
|
||||
)
|
||||
|
||||
def test_offloading_deep_path(self):
|
||||
"""Test that task paths with more than 3 components are resolved correctly.
|
||||
|
||||
Previously the sync fallback split on '.' requiring exactly 3 parts,
|
||||
so a 4-part path raised ValueError and returned False with no warning.
|
||||
Now rsplit('.', 1) is used, so any depth works.
|
||||
"""
|
||||
# 4-part valid path: module = 'InvenTree.test_helpers.sample_tasks', func = 'get_result'
|
||||
InvenTree.tasks.offload_task(
|
||||
'InvenTree.test_helpers.sample_tasks.get_result', force_sync=True
|
||||
)
|
||||
|
||||
# 4-part path with a non-existent module — should warn cleanly rather than crash
|
||||
with self.assertWarnsMessage(
|
||||
UserWarning,
|
||||
"WARNING: 'InvenTreeABC.sub.tasks.doesnotmatter' not started - No module named 'InvenTreeABC.sub.tasks'",
|
||||
):
|
||||
InvenTree.tasks.offload_task(
|
||||
'InvenTreeABC.sub.tasks.doesnotmatter', force_sync=True
|
||||
)
|
||||
|
||||
def test_offloading_no_eval_bypass(self):
|
||||
"""Verify that names in tasks.py scope cannot be resolved via an eval bypass.
|
||||
|
||||
The previous implementation fell back to eval(func) when getattr returned
|
||||
None, evaluating the function name in the local scope of offload_task rather
|
||||
than in the target module. This meant a Python builtin like 'eval', or any
|
||||
name imported into tasks.py, could be invoked via a crafted taskname. The
|
||||
replacement uses only getattr on the imported module and rejects anything
|
||||
not found there.
|
||||
"""
|
||||
# 'eval' is a Python builtin; it is not an attribute of InvenTree.test_tasks.
|
||||
# Previously eval('eval') in tasks.py's scope would resolve it; now it fails.
|
||||
with self.assertWarnsMessage(
|
||||
UserWarning,
|
||||
"WARNING: 'InvenTree.test_tasks.eval' not started - No function named 'eval'",
|
||||
):
|
||||
InvenTree.tasks.offload_task('InvenTree.test_tasks.eval', force_sync=True)
|
||||
|
||||
def test_task_heartbeat(self):
|
||||
"""Test the task heartbeat."""
|
||||
|
||||
@@ -1488,7 +1488,7 @@ class TestOffloadTask(InvenTreeTestCase):
|
||||
offload_task('dummy_task.numbers', 1, 1, 1, force_sync=True)
|
||||
)
|
||||
|
||||
self.assertIn('Malformed function path', str(log.output))
|
||||
self.assertIn("No module named \\'dummy_task\\'", str(log.output))
|
||||
|
||||
# Offload dummy task with a Part instance
|
||||
# This should succeed, ensuring that the Part instance is correctly pickled
|
||||
|
||||
@@ -79,21 +79,37 @@ class AttachmentAdmin(admin.ModelAdmin):
|
||||
|
||||
@admin.register(common.models.DataOutput)
|
||||
class DataOutputAdmin(admin.ModelAdmin):
|
||||
"""Admin interface for DataOutput objects."""
|
||||
"""Admin interface for DataOutput objects - view and delete only."""
|
||||
|
||||
list_display = ('user', 'created', 'output_type', 'output')
|
||||
|
||||
list_filter = ('user', 'output_type')
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new DataOutput objects via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of DataOutput objects via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
@admin.register(common.models.BarcodeScanResult)
|
||||
class BarcodeScanResultAdmin(admin.ModelAdmin):
|
||||
"""Admin interface for BarcodeScanResult objects."""
|
||||
"""Admin interface for BarcodeScanResult objects - read-only audit log."""
|
||||
|
||||
list_display = ('data', 'timestamp', 'user', 'endpoint', 'result')
|
||||
|
||||
list_filter = ('user', 'endpoint', 'result')
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new BarcodeScanResult objects via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of BarcodeScanResult objects via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
@admin.register(common.models.ProjectCode)
|
||||
class ProjectCodeAdmin(admin.ModelAdmin):
|
||||
@@ -139,14 +155,22 @@ class WebhookAdmin(admin.ModelAdmin):
|
||||
|
||||
@admin.register(common.models.NotificationEntry)
|
||||
class NotificationEntryAdmin(admin.ModelAdmin):
|
||||
"""Admin settings for NotificationEntry."""
|
||||
"""Admin settings for NotificationEntry - view and delete only."""
|
||||
|
||||
list_display = ('key', 'uid', 'updated')
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new NotificationEntry objects via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of NotificationEntry objects via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
@admin.register(common.models.NotificationMessage)
|
||||
class NotificationMessageAdmin(admin.ModelAdmin):
|
||||
"""Admin settings for NotificationMessage."""
|
||||
"""Admin settings for NotificationMessage - view and delete only."""
|
||||
|
||||
list_display = (
|
||||
'age_human',
|
||||
@@ -162,14 +186,46 @@ class NotificationMessageAdmin(admin.ModelAdmin):
|
||||
|
||||
search_fields = ('name', 'category', 'message')
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new NotificationMessage objects via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of NotificationMessage objects via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
@admin.register(common.models.NewsFeedEntry)
|
||||
class NewsFeedEntryAdmin(admin.ModelAdmin):
|
||||
"""Admin settings for NewsFeedEntry."""
|
||||
"""Admin settings for NewsFeedEntry - view and delete only."""
|
||||
|
||||
list_display = ('title', 'author', 'published', 'summary')
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new NewsFeedEntry objects via the admin interface."""
|
||||
return False
|
||||
|
||||
admin.site.register(common.models.WebhookMessage, admin.ModelAdmin)
|
||||
admin.site.register(common.models.EmailMessage, admin.ModelAdmin)
|
||||
admin.site.register(common.models.EmailThread, admin.ModelAdmin)
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of NewsFeedEntry objects via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
class ReadOnlyAdmin(admin.ModelAdmin):
|
||||
"""Base admin class that prevents all modifications."""
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new objects via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of objects via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_delete_permission(self, request, obj=None):
|
||||
"""Prevent deletion of objects via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
admin.site.register(common.models.WebhookMessage, ReadOnlyAdmin)
|
||||
admin.site.register(common.models.EmailMessage, ReadOnlyAdmin)
|
||||
admin.site.register(common.models.EmailThread, ReadOnlyAdmin)
|
||||
|
||||
@@ -77,12 +77,24 @@ class StockItemAdmin(admin.ModelAdmin):
|
||||
|
||||
@admin.register(StockItemTracking)
|
||||
class StockTrackingAdmin(admin.ModelAdmin):
|
||||
"""Admin class for StockTracking."""
|
||||
"""Admin class for StockTracking - read-only to preserve audit trail integrity."""
|
||||
|
||||
list_display = ('item', 'date', 'label')
|
||||
|
||||
autocomplete_fields = ['item']
|
||||
|
||||
def has_add_permission(self, request):
|
||||
"""Prevent addition of new tracking entries via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
"""Prevent modification of tracking entries via the admin interface."""
|
||||
return False
|
||||
|
||||
def has_delete_permission(self, request, obj=None):
|
||||
"""Prevent deletion of tracking entries via the admin interface."""
|
||||
return False
|
||||
|
||||
|
||||
@admin.register(StockItemTestResult)
|
||||
class StockItemTestResultAdmin(admin.ModelAdmin):
|
||||
|
||||
@@ -118,10 +118,8 @@ class InvenTreeGroupAdminForm(forms.ModelForm):
|
||||
class InvenTreeUserAdmin(UserAdmin):
|
||||
"""Custom admin page for the User model.
|
||||
|
||||
Hides the "permissions" view as this is now handled
|
||||
entirely by groups and RuleSets.
|
||||
|
||||
(And it's confusing!)
|
||||
- Restrict user creation and editing to superuser accounts
|
||||
- Hides the "permissions" view as this is handled by RuleSets
|
||||
"""
|
||||
|
||||
list_display = (
|
||||
@@ -132,6 +130,7 @@ class InvenTreeUserAdmin(UserAdmin):
|
||||
'is_staff',
|
||||
'last_login',
|
||||
) # display last connection for each user in user admin panel.
|
||||
|
||||
fieldsets = (
|
||||
(None, {'fields': ('username', 'password')}),
|
||||
(_('Personal info'), {'fields': ('first_name', 'last_name', 'email')}),
|
||||
@@ -142,6 +141,15 @@ class InvenTreeUserAdmin(UserAdmin):
|
||||
(_('Important dates'), {'fields': ('last_login', 'date_joined')}),
|
||||
)
|
||||
|
||||
def get_readonly_fields(self, request, obj=None):
|
||||
"""Make all fields read-only for non-superusers."""
|
||||
fields = super().get_readonly_fields(request, obj)
|
||||
|
||||
if not request.user.is_superuser:
|
||||
fields += ('is_staff', 'is_superuser')
|
||||
|
||||
return fields
|
||||
|
||||
|
||||
@admin.register(Owner)
|
||||
class OwnerAdmin(admin.ModelAdmin):
|
||||
|
||||
Reference in New Issue
Block a user