mirror of
https://github.com/inventree/InvenTree.git
synced 2026-05-09 03:03:41 +00:00
Backport (#11686)
* Ensure the MeUserSerializer correctly marks fields as read-only
* fix behaviour
* Add note to plugin docs.
* Adjust logic for PluginListTable
* Add superuser scope to PluginInstall API endpoint
* Update unit test for API endpoint
* Explicitly set PLUGINS_INSTALL_DISABLED if PLUGINS_ENABLED = False
* Check for superuser permission in installer.py
* Additional user checks
* Sanitize package name to protect against OS command injection
* fix(security): use SandboxedEnvironment for PART_NAME_FORMAT rendering
- Switch jinja2.Environment to jinja2.sandbox.SandboxedEnvironment in
part/helpers.py to prevent SSTI via template tags in PART_NAME_FORMAT.
- Set pk=1 on the dummy Part instance in the validator to ensure
conditional expressions like {% if part.pk %} are properly evaluated
during validation, closing the sandbox bypass vector.
Fixes GHSA-84jh-x777-8pqq
* Disable some unit tests for backport
* Fix SSRF in remote image download
Add IP address validation to prevent Server-Side Request Forgery
when downloading images from remote URLs. The resolved IP is now
checked against private, loopback, link-local, and reserved ranges
before connecting.
Redirects are followed manually (up to 5 hops) with SSRF validation
at each step, preventing redirect-based bypass of URL format checks.
* Style fixes
* fix styles
* fix test
* Reintroduce unit tests
---------
Co-authored-by: Paul <morimori-dev@github.com>
Co-authored-by: tikket1 <chrisveres1@gmail.com>
Co-authored-by: Matthias Mair <code@mjmair.com>
This commit is contained in:
@@ -74,6 +74,9 @@ Enter the package name into the form as shown below. You can add a path and a ve
|
||||
|
||||
{{ image("plugin/plugin_install_txt.png", "Plugin.txt file") }}
|
||||
|
||||
!!! info "Superuser Required"
|
||||
Only users with superuser privileges can manage plugins via the web interface.
|
||||
|
||||
#### Local Directory
|
||||
|
||||
Custom plugins can be placed in the `data/plugins/` directory, where they will be automatically discovered. This can be useful for developing and testing plugins, but can prove more difficult in production (e.g. when using Docker).
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
"""Provides helper functions used throughout the InvenTree project that access the database."""
|
||||
|
||||
import io
|
||||
import ipaddress
|
||||
import socket
|
||||
from decimal import Decimal
|
||||
from typing import Optional, cast
|
||||
from urllib.parse import urljoin
|
||||
from urllib.parse import urljoin, urlparse
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.exceptions import ValidationError
|
||||
@@ -88,6 +90,36 @@ def construct_absolute_url(*arg, base_url=None, request=None):
|
||||
return urljoin(base_url, relative_url)
|
||||
|
||||
|
||||
def validate_url_no_ssrf(url):
|
||||
"""Validate that a URL does not point to a private/internal network address.
|
||||
|
||||
Resolves the hostname to an IP address and checks it against private,
|
||||
loopback, link-local, and reserved IP ranges to prevent SSRF attacks.
|
||||
|
||||
Arguments:
|
||||
url: The URL to validate
|
||||
|
||||
Raises:
|
||||
ValueError: If the URL resolves to a private or reserved IP address
|
||||
"""
|
||||
parsed = urlparse(url)
|
||||
hostname = parsed.hostname
|
||||
|
||||
if not hostname:
|
||||
raise ValueError(_('Invalid URL: no hostname'))
|
||||
|
||||
try:
|
||||
addrinfo = socket.getaddrinfo(hostname, None)
|
||||
except socket.gaierror:
|
||||
raise ValueError(_('Invalid URL: hostname could not be resolved'))
|
||||
|
||||
for _family, _type, _proto, _canonname, sockaddr in addrinfo:
|
||||
ip = ipaddress.ip_address(sockaddr[0])
|
||||
|
||||
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
|
||||
raise ValueError(_('URL points to a private or reserved IP address'))
|
||||
|
||||
|
||||
def download_image_from_url(remote_url, timeout=2.5):
|
||||
"""Download an image file from a remote URL.
|
||||
|
||||
@@ -115,6 +147,9 @@ def download_image_from_url(remote_url, timeout=2.5):
|
||||
validator = URLValidator()
|
||||
validator(remote_url)
|
||||
|
||||
# SSRF protection: validate the resolved IP is not private/internal
|
||||
validate_url_no_ssrf(remote_url)
|
||||
|
||||
# Calculate maximum allowable image size (in bytes)
|
||||
max_size = (
|
||||
int(get_global_setting('INVENTREE_DOWNLOAD_IMAGE_MAX_SIZE')) * 1024 * 1024
|
||||
@@ -129,10 +164,36 @@ def download_image_from_url(remote_url, timeout=2.5):
|
||||
response = requests.get(
|
||||
remote_url,
|
||||
timeout=timeout,
|
||||
allow_redirects=True,
|
||||
allow_redirects=False,
|
||||
stream=True,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
# Handle redirects manually to validate each destination
|
||||
max_redirects = 5
|
||||
redirect_count = 0
|
||||
|
||||
while response.is_redirect and redirect_count < max_redirects:
|
||||
redirect_url = response.headers.get('Location')
|
||||
if not redirect_url:
|
||||
break
|
||||
|
||||
# Validate the redirect destination against SSRF
|
||||
validator(redirect_url)
|
||||
validate_url_no_ssrf(redirect_url)
|
||||
|
||||
redirect_count += 1
|
||||
response = requests.get(
|
||||
redirect_url,
|
||||
timeout=timeout,
|
||||
allow_redirects=False,
|
||||
stream=True,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
if redirect_count >= max_redirects:
|
||||
raise ValueError(_('Too many redirects'))
|
||||
|
||||
# Throw an error if anything goes wrong
|
||||
response.raise_for_status()
|
||||
except requests.exceptions.ConnectionError as exc:
|
||||
@@ -143,6 +204,8 @@ def download_image_from_url(remote_url, timeout=2.5):
|
||||
raise requests.exceptions.HTTPError(
|
||||
_('Server responded with invalid status code') + f': {response.status_code}'
|
||||
)
|
||||
except ValueError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise Exception(_('Exception occurred') + f': {exc!s}')
|
||||
|
||||
|
||||
@@ -198,6 +198,11 @@ PLUGINS_INSTALL_DISABLED = get_boolean_setting(
|
||||
'INVENTREE_PLUGIN_NOINSTALL', 'plugin_noinstall', False
|
||||
)
|
||||
|
||||
if not PLUGINS_ENABLED:
|
||||
PLUGINS_INSTALL_DISABLED = (
|
||||
True # If plugins are disabled, also disable installation
|
||||
)
|
||||
|
||||
PLUGIN_FILE = config.get_plugin_file()
|
||||
|
||||
# Plugin test settings
|
||||
|
||||
@@ -49,7 +49,8 @@ def validate_part_name_format(value):
|
||||
})
|
||||
|
||||
# Attempt to render the template with a dummy Part instance
|
||||
p = Part(name='test part', description='some test part')
|
||||
# Use pk=1 to ensure conditional checks like {% if part.pk %} are evaluated
|
||||
p = Part(pk=1, name='test part', description='some test part')
|
||||
|
||||
try:
|
||||
SandboxedEnvironment().from_string(value).render({'part': p})
|
||||
|
||||
@@ -5,7 +5,7 @@ import os
|
||||
from django.conf import settings
|
||||
|
||||
import structlog
|
||||
from jinja2 import Environment, select_autoescape
|
||||
from jinja2.sandbox import SandboxedEnvironment
|
||||
|
||||
from common.settings import get_global_setting
|
||||
|
||||
@@ -37,11 +37,7 @@ def compile_full_name_template(*args, **kwargs):
|
||||
# Cache the template string
|
||||
_part_full_name_template_string = template_string
|
||||
|
||||
env = Environment(
|
||||
autoescape=select_autoescape(default_for_string=False, default=False),
|
||||
variable_start_string='{{',
|
||||
variable_end_string='}}',
|
||||
)
|
||||
env = SandboxedEnvironment(variable_start_string='{{', variable_end_string='}}')
|
||||
|
||||
# Compile the template
|
||||
try:
|
||||
|
||||
@@ -210,6 +210,7 @@ class PluginInstall(CreateAPI):
|
||||
|
||||
queryset = PluginConfig.objects.none()
|
||||
serializer_class = PluginSerializers.PluginConfigInstallSerializer
|
||||
permission_classes = [InvenTree.permissions.IsSuperuserOrSuperScope]
|
||||
|
||||
def create(self, request, *args, **kwargs):
|
||||
"""Install a plugin via the API."""
|
||||
|
||||
@@ -236,8 +236,8 @@ def install_plugin(url=None, packagename=None, user=None, version=None):
|
||||
user: Optional user performing the installation
|
||||
version: Optional version specifier
|
||||
"""
|
||||
if user and not user.is_staff:
|
||||
raise ValidationError(_('Only staff users can administer plugins'))
|
||||
if user and not user.is_superuser:
|
||||
raise ValidationError(_('Only superuser accounts can administer plugins'))
|
||||
|
||||
if settings.PLUGINS_INSTALL_DISABLED:
|
||||
raise ValidationError(_('Plugin installation is disabled'))
|
||||
@@ -269,6 +269,13 @@ def install_plugin(url=None, packagename=None, user=None, version=None):
|
||||
if version:
|
||||
full_pkg = f'{full_pkg}=={version}'
|
||||
|
||||
if not full_pkg:
|
||||
raise ValidationError(_('No package name or URL provided for installation'))
|
||||
|
||||
# Sanitize the package name for installation
|
||||
if any(c in full_pkg for c in ';&|`$()'):
|
||||
raise ValidationError(_('Invalid characters in package name or URL'))
|
||||
|
||||
install_name.append(full_pkg)
|
||||
|
||||
ret = {}
|
||||
@@ -333,6 +340,9 @@ def uninstall_plugin(cfg: plugin.models.PluginConfig, user=None, delete_config=T
|
||||
"""
|
||||
from plugin.registry import registry
|
||||
|
||||
if user and not user.is_superuser:
|
||||
raise ValidationError(_('Only superuser accounts can administer plugins'))
|
||||
|
||||
if settings.PLUGINS_INSTALL_DISABLED:
|
||||
raise ValidationError(_('Plugin uninstalling is disabled'))
|
||||
|
||||
|
||||
@@ -165,6 +165,9 @@ class PluginConfigInstallSerializer(serializers.Serializer):
|
||||
version = data.get('version', None)
|
||||
user = self.context['request'].user
|
||||
|
||||
if not user or not user.is_superuser:
|
||||
raise ValidationError(_('Only superuser accounts can administer plugins'))
|
||||
|
||||
return install_plugin(
|
||||
url=url, packagename=packagename, version=version, user=user
|
||||
)
|
||||
@@ -266,10 +269,13 @@ class PluginUninstallSerializer(serializers.Serializer):
|
||||
"""Uninstall the specified plugin."""
|
||||
from plugin.installer import uninstall_plugin
|
||||
|
||||
user = self.context['request'].user
|
||||
|
||||
if not user or not user.is_superuser:
|
||||
raise ValidationError(_('Only superuser accounts can administer plugins'))
|
||||
|
||||
return uninstall_plugin(
|
||||
instance,
|
||||
user=self.context['request'].user,
|
||||
delete_config=validated_data.get('delete_config', True),
|
||||
instance, user=user, delete_config=validated_data.get('delete_config', True)
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -63,6 +63,21 @@ class PluginDetailAPITest(PluginMixin, InvenTreeAPITestCase):
|
||||
"""Test the plugin install command."""
|
||||
url = reverse('api-plugin-install')
|
||||
|
||||
# Requires superuser permissions
|
||||
self.user.is_superuser = False
|
||||
self.user.save()
|
||||
|
||||
self.post(
|
||||
url,
|
||||
{'confirm': True, 'packagename': self.PKG_NAME},
|
||||
expected_code=403,
|
||||
max_query_time=30,
|
||||
)
|
||||
|
||||
# Provide superuser permissions
|
||||
self.user.is_superuser = True
|
||||
self.user.save()
|
||||
|
||||
# invalid package name
|
||||
data = self.post(
|
||||
url,
|
||||
@@ -209,7 +224,7 @@ class PluginDetailAPITest(PluginMixin, InvenTreeAPITestCase):
|
||||
test_plg.refresh_from_db()
|
||||
self.assertTrue(test_plg.is_active())
|
||||
|
||||
def test_pluginCfg_delete(self):
|
||||
def test_plugin_config_delete(self):
|
||||
"""Test deleting a config."""
|
||||
test_plg = self.plugin_confs.first()
|
||||
assert test_plg is not None
|
||||
|
||||
@@ -11,11 +11,14 @@ from typing import Optional
|
||||
from unittest import mock
|
||||
from unittest.mock import patch
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
import plugin.templatetags.plugin_extras as plugin_tags
|
||||
from InvenTree.unit_test import PluginRegistryMixin, TestQueryMixin
|
||||
from plugin import InvenTreePlugin, PluginMixinEnum
|
||||
from plugin.installer import install_plugin
|
||||
from plugin.registry import registry
|
||||
from plugin.samples.integration.another_sample import (
|
||||
NoIntegrationPlugin,
|
||||
@@ -278,6 +281,9 @@ class RegistryTests(TestQueryMixin, PluginRegistryMixin, TestCase):
|
||||
|
||||
def test_broken_samples(self):
|
||||
"""Test that the broken samples trigger reloads."""
|
||||
# Reset the registry to a known state
|
||||
registry.errors = {}
|
||||
|
||||
# In the base setup there are no errors
|
||||
self.assertEqual(len(registry.errors), 0)
|
||||
|
||||
@@ -638,3 +644,40 @@ class RegistryTests(TestQueryMixin, PluginRegistryMixin, TestCase):
|
||||
self.assertTrue(cfg.is_builtin())
|
||||
self.assertFalse(cfg.is_package())
|
||||
self.assertFalse(cfg.is_sample())
|
||||
|
||||
|
||||
class InstallerTests(TestCase):
|
||||
"""Tests for the plugin installer code."""
|
||||
|
||||
def test_plugin_install_errors(self):
|
||||
"""Test error handling for plugin installation."""
|
||||
# No data provided
|
||||
with self.assertRaises(ValidationError) as e:
|
||||
install_plugin()
|
||||
|
||||
self.assertIn(
|
||||
'No package name or URL provided for installation', str(e.exception)
|
||||
)
|
||||
|
||||
# Invalid package name
|
||||
for pkg in [
|
||||
'invalid;name',
|
||||
'invalid&name',
|
||||
'invalid|name',
|
||||
'invalid`name',
|
||||
'invalid$(name)',
|
||||
]:
|
||||
with self.assertRaises(ValidationError) as e:
|
||||
install_plugin(packagename=pkg)
|
||||
|
||||
self.assertIn('Invalid characters in package name or URL', str(e.exception))
|
||||
|
||||
# Non superuser account
|
||||
user = User.objects.create(username='my-user', is_superuser=False)
|
||||
|
||||
with self.assertRaises(ValidationError) as e:
|
||||
install_plugin(user=user, packagename='some-package')
|
||||
|
||||
self.assertIn(
|
||||
'Only superuser accounts can administer plugins', str(e.exception)
|
||||
)
|
||||
|
||||
@@ -238,8 +238,21 @@ class ApiTokenSerializer(InvenTreeModelSerializer):
|
||||
|
||||
def validate(self, data):
|
||||
"""Validate the data for the serializer."""
|
||||
request_user = self.context['request'].user
|
||||
if not request_user:
|
||||
raise serializers.ValidationError(
|
||||
_('User must be authenticated')
|
||||
) # pragma: no cover
|
||||
|
||||
if 'user' not in data:
|
||||
data['user'] = self.context['request'].user
|
||||
data['user'] = request_user
|
||||
|
||||
# Only superusers can create tokens for other users
|
||||
if data['user'] != request_user and not request_user.is_superuser:
|
||||
raise serializers.ValidationError(
|
||||
_('Only a superuser can create a token for another user')
|
||||
)
|
||||
|
||||
return super().validate(data)
|
||||
|
||||
user_detail = UserSerializer(source='user', read_only=True)
|
||||
@@ -380,6 +393,9 @@ class MeUserSerializer(ExtendedUserSerializer):
|
||||
but ensures that certain fields are read-only.
|
||||
"""
|
||||
|
||||
# Remove the 'group_ids' field, as this is not relevant for the 'me' endpoint
|
||||
fields = [f for f in ExtendedUserSerializer.Meta.fields if f != 'group_ids']
|
||||
|
||||
read_only_fields = [
|
||||
*ExtendedUserSerializer.Meta.read_only_fields,
|
||||
'is_active',
|
||||
@@ -389,6 +405,28 @@ class MeUserSerializer(ExtendedUserSerializer):
|
||||
|
||||
profile = UserProfileSerializer(many=False, read_only=True)
|
||||
|
||||
# Redefine the fields from ExtendedUserSerializer, to ensure they are marked as read-only
|
||||
is_staff = serializers.BooleanField(
|
||||
label=_('Staff'),
|
||||
help_text=_('Does this user have staff permissions'),
|
||||
required=False,
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
is_superuser = serializers.BooleanField(
|
||||
label=_('Superuser'),
|
||||
help_text=_('Is this user a superuser'),
|
||||
required=False,
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
is_active = serializers.BooleanField(
|
||||
label=_('Active'),
|
||||
help_text=_('Is this user account active'),
|
||||
required=False,
|
||||
read_only=True,
|
||||
)
|
||||
|
||||
|
||||
def make_random_password(length=14):
|
||||
"""Generate a random password of given length."""
|
||||
|
||||
@@ -258,6 +258,8 @@ class SuperuserAPITests(InvenTreeAPITestCase):
|
||||
class UserTokenTests(InvenTreeAPITestCase):
|
||||
"""Tests for user token functionality."""
|
||||
|
||||
fixtures = ['users']
|
||||
|
||||
def test_token_generation(self):
|
||||
"""Test user token generation."""
|
||||
url = reverse('api-token')
|
||||
@@ -396,6 +398,28 @@ class UserTokenTests(InvenTreeAPITestCase):
|
||||
self.client.logout()
|
||||
self.get(reverse('api-token'), expected_code=401)
|
||||
|
||||
def test_token_security(self):
|
||||
"""Test that token generation is only available to users with the correct permissions."""
|
||||
url = reverse('api-token-list')
|
||||
|
||||
# Try to generate a token for a different user (should fail)
|
||||
response = self.post(url, data={'name': 'test', 'user': 1}, expected_code=400)
|
||||
self.assertIn(
|
||||
'Only a superuser can create a token for another user', str(response.data)
|
||||
)
|
||||
|
||||
# there should be no tokens created
|
||||
self.assertEqual(ApiToken.objects.count(), 0)
|
||||
|
||||
# now with superuser permissions
|
||||
self.user.is_superuser = True
|
||||
self.user.save()
|
||||
|
||||
response = self.post(url, data={'name': 'test', 'user': 1}, expected_code=201)
|
||||
self.assertIn('token', response.data)
|
||||
|
||||
self.assertEqual(ApiToken.objects.count(), 1)
|
||||
|
||||
|
||||
class GroupDetialTests(InvenTreeAPITestCase):
|
||||
"""Tests for the GroupDetail API endpoint."""
|
||||
|
||||
@@ -220,7 +220,6 @@ export default function PluginListTable() {
|
||||
// Uninstall an installed plugin
|
||||
// Must be inactive, not a builtin, not a sample, and installed as a package
|
||||
hidden:
|
||||
!user.isSuperuser() ||
|
||||
record.active ||
|
||||
record.is_builtin ||
|
||||
record.is_mandatory ||
|
||||
@@ -244,8 +243,7 @@ export default function PluginListTable() {
|
||||
record.is_builtin ||
|
||||
record.is_mandatory ||
|
||||
record.is_sample ||
|
||||
record.is_installed ||
|
||||
!user.isSuperuser(),
|
||||
record.is_installed,
|
||||
title: t`Delete`,
|
||||
tooltip: t`Delete selected plugin configuration`,
|
||||
color: 'red',
|
||||
@@ -355,7 +353,12 @@ export default function PluginListTable() {
|
||||
|
||||
// Custom table actions
|
||||
const tableActions = useMemo(() => {
|
||||
if (!user.isSuperuser() || !server.plugins_enabled) {
|
||||
if (
|
||||
!user.isSuperuser() ||
|
||||
!server.plugins_enabled ||
|
||||
server.plugins_install_disabled
|
||||
) {
|
||||
// Prevent installation if plugins are disabled or user is not superuser
|
||||
return [];
|
||||
}
|
||||
|
||||
@@ -376,7 +379,6 @@ export default function PluginListTable() {
|
||||
setPluginPackage('');
|
||||
installPluginModal.open();
|
||||
}}
|
||||
disabled={server.plugins_install_disabled || false}
|
||||
/>
|
||||
];
|
||||
}, [user, server]);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { expect, test } from './baseFixtures.js';
|
||||
import { adminuser } from './defaults.js';
|
||||
import { activateTableView, loadTab } from './helpers.js';
|
||||
import { doCachedLogin } from './login.js';
|
||||
import { setPluginState } from './settings.js';
|
||||
@@ -6,7 +7,8 @@ import { setPluginState } from './settings.js';
|
||||
// Test for the label editing interface
|
||||
test('Printing - Label Editing', async ({ browser }) => {
|
||||
const page = await doCachedLogin(browser, {
|
||||
user: adminuser,
|
||||
username: adminuser.username,
|
||||
password: adminuser.password,
|
||||
url: 'settings/admin/labels'
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user