diff --git a/docs/docs/plugins/install.md b/docs/docs/plugins/install.md index 3d18d58850..6f2f9f5fd6 100644 --- a/docs/docs/plugins/install.md +++ b/docs/docs/plugins/install.md @@ -74,6 +74,9 @@ Enter the package name into the form as shown below. You can add a path and a ve {{ image("plugin/plugin_install_txt.png", "Plugin.txt file") }} +!!! info "Superuser Required" + Only users with superuser privileges can manage plugins via the web interface. + #### Local Directory Custom plugins can be placed in the `data/plugins/` directory, where they will be automatically discovered. This can be useful for developing and testing plugins, but can prove more difficult in production (e.g. when using Docker). diff --git a/src/backend/InvenTree/InvenTree/helpers_model.py b/src/backend/InvenTree/InvenTree/helpers_model.py index 5fa5ae3cea..28fec74a6f 100644 --- a/src/backend/InvenTree/InvenTree/helpers_model.py +++ b/src/backend/InvenTree/InvenTree/helpers_model.py @@ -1,9 +1,11 @@ """Provides helper functions used throughout the InvenTree project that access the database.""" import io +import ipaddress +import socket from decimal import Decimal from typing import Optional, cast -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse from django.conf import settings from django.core.exceptions import ValidationError @@ -88,6 +90,36 @@ def construct_absolute_url(*arg, base_url=None, request=None): return urljoin(base_url, relative_url) +def validate_url_no_ssrf(url): + """Validate that a URL does not point to a private/internal network address. + + Resolves the hostname to an IP address and checks it against private, + loopback, link-local, and reserved IP ranges to prevent SSRF attacks. + + Arguments: + url: The URL to validate + + Raises: + ValueError: If the URL resolves to a private or reserved IP address + """ + parsed = urlparse(url) + hostname = parsed.hostname + + if not hostname: + raise ValueError(_('Invalid URL: no hostname')) + + try: + addrinfo = socket.getaddrinfo(hostname, None) + except socket.gaierror: + raise ValueError(_('Invalid URL: hostname could not be resolved')) + + for _family, _type, _proto, _canonname, sockaddr in addrinfo: + ip = ipaddress.ip_address(sockaddr[0]) + + if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved: + raise ValueError(_('URL points to a private or reserved IP address')) + + def download_image_from_url(remote_url, timeout=2.5): """Download an image file from a remote URL. @@ -115,6 +147,9 @@ def download_image_from_url(remote_url, timeout=2.5): validator = URLValidator() validator(remote_url) + # SSRF protection: validate the resolved IP is not private/internal + validate_url_no_ssrf(remote_url) + # Calculate maximum allowable image size (in bytes) max_size = ( int(get_global_setting('INVENTREE_DOWNLOAD_IMAGE_MAX_SIZE')) * 1024 * 1024 @@ -129,10 +164,36 @@ def download_image_from_url(remote_url, timeout=2.5): response = requests.get( remote_url, timeout=timeout, - allow_redirects=True, + allow_redirects=False, stream=True, headers=headers, ) + + # Handle redirects manually to validate each destination + max_redirects = 5 + redirect_count = 0 + + while response.is_redirect and redirect_count < max_redirects: + redirect_url = response.headers.get('Location') + if not redirect_url: + break + + # Validate the redirect destination against SSRF + validator(redirect_url) + validate_url_no_ssrf(redirect_url) + + redirect_count += 1 + response = requests.get( + redirect_url, + timeout=timeout, + allow_redirects=False, + stream=True, + headers=headers, + ) + + if redirect_count >= max_redirects: + raise ValueError(_('Too many redirects')) + # Throw an error if anything goes wrong response.raise_for_status() except requests.exceptions.ConnectionError as exc: @@ -143,6 +204,8 @@ def download_image_from_url(remote_url, timeout=2.5): raise requests.exceptions.HTTPError( _('Server responded with invalid status code') + f': {response.status_code}' ) + except ValueError: + raise except Exception as exc: raise Exception(_('Exception occurred') + f': {exc!s}') diff --git a/src/backend/InvenTree/InvenTree/settings.py b/src/backend/InvenTree/InvenTree/settings.py index e86062783c..80cd434952 100644 --- a/src/backend/InvenTree/InvenTree/settings.py +++ b/src/backend/InvenTree/InvenTree/settings.py @@ -198,6 +198,11 @@ PLUGINS_INSTALL_DISABLED = get_boolean_setting( 'INVENTREE_PLUGIN_NOINSTALL', 'plugin_noinstall', False ) +if not PLUGINS_ENABLED: + PLUGINS_INSTALL_DISABLED = ( + True # If plugins are disabled, also disable installation + ) + PLUGIN_FILE = config.get_plugin_file() # Plugin test settings diff --git a/src/backend/InvenTree/common/setting/system.py b/src/backend/InvenTree/common/setting/system.py index 156e6e61b3..3895303d3c 100644 --- a/src/backend/InvenTree/common/setting/system.py +++ b/src/backend/InvenTree/common/setting/system.py @@ -49,7 +49,8 @@ def validate_part_name_format(value): }) # Attempt to render the template with a dummy Part instance - p = Part(name='test part', description='some test part') + # Use pk=1 to ensure conditional checks like {% if part.pk %} are evaluated + p = Part(pk=1, name='test part', description='some test part') try: SandboxedEnvironment().from_string(value).render({'part': p}) diff --git a/src/backend/InvenTree/part/helpers.py b/src/backend/InvenTree/part/helpers.py index d07e3cfc13..7c55c9810e 100644 --- a/src/backend/InvenTree/part/helpers.py +++ b/src/backend/InvenTree/part/helpers.py @@ -5,7 +5,7 @@ import os from django.conf import settings import structlog -from jinja2 import Environment, select_autoescape +from jinja2.sandbox import SandboxedEnvironment from common.settings import get_global_setting @@ -37,11 +37,7 @@ def compile_full_name_template(*args, **kwargs): # Cache the template string _part_full_name_template_string = template_string - env = Environment( - autoescape=select_autoescape(default_for_string=False, default=False), - variable_start_string='{{', - variable_end_string='}}', - ) + env = SandboxedEnvironment(variable_start_string='{{', variable_end_string='}}') # Compile the template try: diff --git a/src/backend/InvenTree/plugin/api.py b/src/backend/InvenTree/plugin/api.py index 91580062b3..9093d6e7f4 100644 --- a/src/backend/InvenTree/plugin/api.py +++ b/src/backend/InvenTree/plugin/api.py @@ -210,6 +210,7 @@ class PluginInstall(CreateAPI): queryset = PluginConfig.objects.none() serializer_class = PluginSerializers.PluginConfigInstallSerializer + permission_classes = [InvenTree.permissions.IsSuperuserOrSuperScope] def create(self, request, *args, **kwargs): """Install a plugin via the API.""" diff --git a/src/backend/InvenTree/plugin/installer.py b/src/backend/InvenTree/plugin/installer.py index b1fb2de03f..acf6584a8d 100644 --- a/src/backend/InvenTree/plugin/installer.py +++ b/src/backend/InvenTree/plugin/installer.py @@ -236,8 +236,8 @@ def install_plugin(url=None, packagename=None, user=None, version=None): user: Optional user performing the installation version: Optional version specifier """ - if user and not user.is_staff: - raise ValidationError(_('Only staff users can administer plugins')) + if user and not user.is_superuser: + raise ValidationError(_('Only superuser accounts can administer plugins')) if settings.PLUGINS_INSTALL_DISABLED: raise ValidationError(_('Plugin installation is disabled')) @@ -269,6 +269,13 @@ def install_plugin(url=None, packagename=None, user=None, version=None): if version: full_pkg = f'{full_pkg}=={version}' + if not full_pkg: + raise ValidationError(_('No package name or URL provided for installation')) + + # Sanitize the package name for installation + if any(c in full_pkg for c in ';&|`$()'): + raise ValidationError(_('Invalid characters in package name or URL')) + install_name.append(full_pkg) ret = {} @@ -333,6 +340,9 @@ def uninstall_plugin(cfg: plugin.models.PluginConfig, user=None, delete_config=T """ from plugin.registry import registry + if user and not user.is_superuser: + raise ValidationError(_('Only superuser accounts can administer plugins')) + if settings.PLUGINS_INSTALL_DISABLED: raise ValidationError(_('Plugin uninstalling is disabled')) diff --git a/src/backend/InvenTree/plugin/serializers.py b/src/backend/InvenTree/plugin/serializers.py index 5fbff385b4..b12d0d3590 100644 --- a/src/backend/InvenTree/plugin/serializers.py +++ b/src/backend/InvenTree/plugin/serializers.py @@ -165,6 +165,9 @@ class PluginConfigInstallSerializer(serializers.Serializer): version = data.get('version', None) user = self.context['request'].user + if not user or not user.is_superuser: + raise ValidationError(_('Only superuser accounts can administer plugins')) + return install_plugin( url=url, packagename=packagename, version=version, user=user ) @@ -266,10 +269,13 @@ class PluginUninstallSerializer(serializers.Serializer): """Uninstall the specified plugin.""" from plugin.installer import uninstall_plugin + user = self.context['request'].user + + if not user or not user.is_superuser: + raise ValidationError(_('Only superuser accounts can administer plugins')) + return uninstall_plugin( - instance, - user=self.context['request'].user, - delete_config=validated_data.get('delete_config', True), + instance, user=user, delete_config=validated_data.get('delete_config', True) ) diff --git a/src/backend/InvenTree/plugin/test_api.py b/src/backend/InvenTree/plugin/test_api.py index ca6c312670..820c5c96fa 100644 --- a/src/backend/InvenTree/plugin/test_api.py +++ b/src/backend/InvenTree/plugin/test_api.py @@ -63,6 +63,21 @@ class PluginDetailAPITest(PluginMixin, InvenTreeAPITestCase): """Test the plugin install command.""" url = reverse('api-plugin-install') + # Requires superuser permissions + self.user.is_superuser = False + self.user.save() + + self.post( + url, + {'confirm': True, 'packagename': self.PKG_NAME}, + expected_code=403, + max_query_time=30, + ) + + # Provide superuser permissions + self.user.is_superuser = True + self.user.save() + # invalid package name data = self.post( url, @@ -209,7 +224,7 @@ class PluginDetailAPITest(PluginMixin, InvenTreeAPITestCase): test_plg.refresh_from_db() self.assertTrue(test_plg.is_active()) - def test_pluginCfg_delete(self): + def test_plugin_config_delete(self): """Test deleting a config.""" test_plg = self.plugin_confs.first() assert test_plg is not None diff --git a/src/backend/InvenTree/plugin/test_plugin.py b/src/backend/InvenTree/plugin/test_plugin.py index cdabd46477..0626860c1d 100644 --- a/src/backend/InvenTree/plugin/test_plugin.py +++ b/src/backend/InvenTree/plugin/test_plugin.py @@ -11,11 +11,14 @@ from typing import Optional from unittest import mock from unittest.mock import patch +from django.contrib.auth.models import User +from django.core.exceptions import ValidationError from django.test import TestCase, override_settings import plugin.templatetags.plugin_extras as plugin_tags from InvenTree.unit_test import PluginRegistryMixin, TestQueryMixin from plugin import InvenTreePlugin, PluginMixinEnum +from plugin.installer import install_plugin from plugin.registry import registry from plugin.samples.integration.another_sample import ( NoIntegrationPlugin, @@ -278,6 +281,9 @@ class RegistryTests(TestQueryMixin, PluginRegistryMixin, TestCase): def test_broken_samples(self): """Test that the broken samples trigger reloads.""" + # Reset the registry to a known state + registry.errors = {} + # In the base setup there are no errors self.assertEqual(len(registry.errors), 0) @@ -638,3 +644,40 @@ class RegistryTests(TestQueryMixin, PluginRegistryMixin, TestCase): self.assertTrue(cfg.is_builtin()) self.assertFalse(cfg.is_package()) self.assertFalse(cfg.is_sample()) + + +class InstallerTests(TestCase): + """Tests for the plugin installer code.""" + + def test_plugin_install_errors(self): + """Test error handling for plugin installation.""" + # No data provided + with self.assertRaises(ValidationError) as e: + install_plugin() + + self.assertIn( + 'No package name or URL provided for installation', str(e.exception) + ) + + # Invalid package name + for pkg in [ + 'invalid;name', + 'invalid&name', + 'invalid|name', + 'invalid`name', + 'invalid$(name)', + ]: + with self.assertRaises(ValidationError) as e: + install_plugin(packagename=pkg) + + self.assertIn('Invalid characters in package name or URL', str(e.exception)) + + # Non superuser account + user = User.objects.create(username='my-user', is_superuser=False) + + with self.assertRaises(ValidationError) as e: + install_plugin(user=user, packagename='some-package') + + self.assertIn( + 'Only superuser accounts can administer plugins', str(e.exception) + ) diff --git a/src/backend/InvenTree/users/serializers.py b/src/backend/InvenTree/users/serializers.py index 0fdd481137..4358094290 100644 --- a/src/backend/InvenTree/users/serializers.py +++ b/src/backend/InvenTree/users/serializers.py @@ -238,8 +238,21 @@ class ApiTokenSerializer(InvenTreeModelSerializer): def validate(self, data): """Validate the data for the serializer.""" + request_user = self.context['request'].user + if not request_user: + raise serializers.ValidationError( + _('User must be authenticated') + ) # pragma: no cover + if 'user' not in data: - data['user'] = self.context['request'].user + data['user'] = request_user + + # Only superusers can create tokens for other users + if data['user'] != request_user and not request_user.is_superuser: + raise serializers.ValidationError( + _('Only a superuser can create a token for another user') + ) + return super().validate(data) user_detail = UserSerializer(source='user', read_only=True) @@ -380,6 +393,9 @@ class MeUserSerializer(ExtendedUserSerializer): but ensures that certain fields are read-only. """ + # Remove the 'group_ids' field, as this is not relevant for the 'me' endpoint + fields = [f for f in ExtendedUserSerializer.Meta.fields if f != 'group_ids'] + read_only_fields = [ *ExtendedUserSerializer.Meta.read_only_fields, 'is_active', @@ -389,6 +405,28 @@ class MeUserSerializer(ExtendedUserSerializer): profile = UserProfileSerializer(many=False, read_only=True) + # Redefine the fields from ExtendedUserSerializer, to ensure they are marked as read-only + is_staff = serializers.BooleanField( + label=_('Staff'), + help_text=_('Does this user have staff permissions'), + required=False, + read_only=True, + ) + + is_superuser = serializers.BooleanField( + label=_('Superuser'), + help_text=_('Is this user a superuser'), + required=False, + read_only=True, + ) + + is_active = serializers.BooleanField( + label=_('Active'), + help_text=_('Is this user account active'), + required=False, + read_only=True, + ) + def make_random_password(length=14): """Generate a random password of given length.""" diff --git a/src/backend/InvenTree/users/test_api.py b/src/backend/InvenTree/users/test_api.py index 473b70946b..19219c6b93 100644 --- a/src/backend/InvenTree/users/test_api.py +++ b/src/backend/InvenTree/users/test_api.py @@ -258,6 +258,8 @@ class SuperuserAPITests(InvenTreeAPITestCase): class UserTokenTests(InvenTreeAPITestCase): """Tests for user token functionality.""" + fixtures = ['users'] + def test_token_generation(self): """Test user token generation.""" url = reverse('api-token') @@ -396,6 +398,28 @@ class UserTokenTests(InvenTreeAPITestCase): self.client.logout() self.get(reverse('api-token'), expected_code=401) + def test_token_security(self): + """Test that token generation is only available to users with the correct permissions.""" + url = reverse('api-token-list') + + # Try to generate a token for a different user (should fail) + response = self.post(url, data={'name': 'test', 'user': 1}, expected_code=400) + self.assertIn( + 'Only a superuser can create a token for another user', str(response.data) + ) + + # there should be no tokens created + self.assertEqual(ApiToken.objects.count(), 0) + + # now with superuser permissions + self.user.is_superuser = True + self.user.save() + + response = self.post(url, data={'name': 'test', 'user': 1}, expected_code=201) + self.assertIn('token', response.data) + + self.assertEqual(ApiToken.objects.count(), 1) + class GroupDetialTests(InvenTreeAPITestCase): """Tests for the GroupDetail API endpoint.""" diff --git a/src/frontend/src/tables/plugin/PluginListTable.tsx b/src/frontend/src/tables/plugin/PluginListTable.tsx index 711d61f987..580414ffaa 100644 --- a/src/frontend/src/tables/plugin/PluginListTable.tsx +++ b/src/frontend/src/tables/plugin/PluginListTable.tsx @@ -220,7 +220,6 @@ export default function PluginListTable() { // Uninstall an installed plugin // Must be inactive, not a builtin, not a sample, and installed as a package hidden: - !user.isSuperuser() || record.active || record.is_builtin || record.is_mandatory || @@ -244,8 +243,7 @@ export default function PluginListTable() { record.is_builtin || record.is_mandatory || record.is_sample || - record.is_installed || - !user.isSuperuser(), + record.is_installed, title: t`Delete`, tooltip: t`Delete selected plugin configuration`, color: 'red', @@ -355,7 +353,12 @@ export default function PluginListTable() { // Custom table actions const tableActions = useMemo(() => { - if (!user.isSuperuser() || !server.plugins_enabled) { + if ( + !user.isSuperuser() || + !server.plugins_enabled || + server.plugins_install_disabled + ) { + // Prevent installation if plugins are disabled or user is not superuser return []; } @@ -376,7 +379,6 @@ export default function PluginListTable() { setPluginPackage(''); installPluginModal.open(); }} - disabled={server.plugins_install_disabled || false} /> ]; }, [user, server]); diff --git a/src/frontend/tests/pui_printing.spec.ts b/src/frontend/tests/pui_printing.spec.ts index f37c32cbff..bde9082dbe 100644 --- a/src/frontend/tests/pui_printing.spec.ts +++ b/src/frontend/tests/pui_printing.spec.ts @@ -1,4 +1,5 @@ import { expect, test } from './baseFixtures.js'; +import { adminuser } from './defaults.js'; import { activateTableView, loadTab } from './helpers.js'; import { doCachedLogin } from './login.js'; import { setPluginState } from './settings.js'; @@ -6,7 +7,8 @@ import { setPluginState } from './settings.js'; // Test for the label editing interface test('Printing - Label Editing', async ({ browser }) => { const page = await doCachedLogin(browser, { - user: adminuser, + username: adminuser.username, + password: adminuser.password, url: 'settings/admin/labels' });