2
0
mirror of https://github.com/inventree/InvenTree.git synced 2025-11-13 19:36:46 +00:00

fix(backend): auth check middleware for specific media access (#10784)

* simplify

* fix return type

* handle token (app access)

* reduce lookup amount

* add positive test again

* add poisitive test

* move out settings

* add tests for Check2FAMiddleware

* add test for auth_request

* add a reverse name for auth_request

* auth tests refactors

* move test

* disable check for things that do not trigger

* fix typing for python 3.9

* make names clearer and add comments

* finish tests

* fix call

* re-enable mfa test without the timing component

* cleanup helper

* ignore easy out

* ignore scenario that can not happen
This commit is contained in:
Matthias Mair
2025-11-09 22:58:58 +01:00
committed by GitHub
parent 726e852b7b
commit f3e8482469
9 changed files with 213 additions and 109 deletions

View File

@@ -0,0 +1,22 @@
"""Helper functions for allauth MFA testing."""
from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes
from allauth.mfa.totp.internal import auth as allauth_totp_auth
def get_codes(user):
"""Generate active TOTP and recovery codes for a user.
Args:
user: User instance
Returns:
Tuple of (TOTP Authenticator instance, list of recovery codes, TOTP secret)
"""
secret = allauth_totp_auth.generate_totp_secret()
totp_auth = allauth_totp_auth.TOTP.activate(user, secret).instance
rc_auth = RecoveryCodes.activate(user).instance
# Get usable codes
rc_codes = rc_auth.wrap().get_unused_codes()
return totp_auth, rc_codes, secret

View File

@@ -1,13 +1,14 @@
"""Middleware for InvenTree.""" """Middleware for InvenTree."""
import sys import sys
from typing import Optional
from urllib.parse import urlsplit from urllib.parse import urlsplit
from django.conf import settings from django.conf import settings
from django.contrib.auth.middleware import PersistentRemoteUserMiddleware from django.contrib.auth.middleware import PersistentRemoteUserMiddleware
from django.http import HttpRequest, HttpResponse, JsonResponse from django.http import HttpRequest, HttpResponse, JsonResponse
from django.shortcuts import redirect, render from django.shortcuts import redirect, render
from django.urls import resolve, reverse_lazy from django.urls import resolve, reverse, reverse_lazy
from django.utils.deprecation import MiddlewareMixin from django.utils.deprecation import MiddlewareMixin
from django.utils.http import is_same_domain from django.utils.http import is_same_domain
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
@@ -40,6 +41,15 @@ def get_token_from_request(request):
return None return None
def ensure_slashes(path: str):
"""Ensure that slashes are suroudning the passed path."""
if not path.startswith('/'):
path = f'/{path}'
if not path.endswith('/'):
path = f'{path}/'
return path
# List of target URL endpoints where *do not* want to redirect to # List of target URL endpoints where *do not* want to redirect to
urls = [ urls = [
reverse_lazy('account_login'), reverse_lazy('account_login'),
@@ -47,8 +57,46 @@ urls = [
reverse_lazy('admin:logout'), reverse_lazy('admin:logout'),
] ]
# Do not redirect requests to any of these paths paths_ignore_handling = [
paths_ignore = ['/api/', '/auth/', settings.MEDIA_URL, settings.STATIC_URL] '/api/',
reverse('auth-check'),
settings.MEDIA_URL,
settings.STATIC_URL,
]
"""Paths that should not use InvenTrees own auth rejection behaviour, no host checking or redirecting. Security
are still enforced."""
paths_own_security = [
'/api/', # DRF handles API
'/o/', # oAuth2 library - has its own auth model
'/anymail/', # Mails - wehbhooks etc
'/accounts/', # allauth account management - has its own auth model
'/assets/', # Web assets - only used for testing, no security model needed
ensure_slashes(
settings.STATIC_URL
), # Static files - static files are considered safe to serve
ensure_slashes(
settings.FRONTEND_URL_BASE
), # Frontend files - frontend paths have their own security model
]
"""Paths that handle their own security model."""
pages_mfa_bypass = [
'api-user-meta',
'api-user-me',
'api-user-roles',
'api-inventree-info',
'api-token',
# web platform urls
'password_reset_confirm',
'index',
'web',
'web-wildcard',
'web-assets',
]
"""Exact page names that bypass MFA enforcement - normal security model is still enforced."""
apps_mfa_bypass = [
'headless' # Headless allauth app - has its own security model
]
"""App namespaces that bypass MFA enforcement - normal security model is still enforced."""
class AuthRequiredMiddleware: class AuthRequiredMiddleware:
@@ -61,6 +109,7 @@ class AuthRequiredMiddleware:
def check_token(self, request) -> bool: def check_token(self, request) -> bool:
"""Check if the user is authenticated via token.""" """Check if the user is authenticated via token."""
if token := get_token_from_request(request): if token := get_token_from_request(request):
request.token = token
# Does the provided token match a valid user? # Does the provided token match a valid user?
try: try:
token = ApiToken.objects.get(key=token) token = ApiToken.objects.get(key=token)
@@ -69,8 +118,10 @@ class AuthRequiredMiddleware:
# Provide the user information to the request # Provide the user information to the request
request.user = token.user request.user = token.user
return True return True
except ApiToken.DoesNotExist: except ApiToken.DoesNotExist: # pragma: no cover
logger.warning('Access denied for unknown token %s', token) logger.warning(
'Access denied for unknown token %s', token
) # pragma: no cover
return False return False
@@ -79,70 +130,40 @@ class AuthRequiredMiddleware:
Redirects to login if not authenticated. Redirects to login if not authenticated.
""" """
path: str = request.path_info
# Code to be executed for each request before # Code to be executed for each request before
# the view (and later middleware) are called. # the view (and later middleware) are called.
assert hasattr(request, 'user') assert hasattr(request, 'user')
# API requests are handled by the DRF library # API requests that are handled elsewhere
if request.path_info.startswith('/api/'): if any(path.startswith(a) for a in paths_own_security):
return self.get_response(request)
# oAuth2 requests are handled by the oAuth2 library
if request.path_info.startswith('/o/'):
return self.get_response(request)
# anymail requests are handled by the anymail library
if request.path_info.startswith('/anymail/'):
return self.get_response(request) return self.get_response(request)
# Is the function exempt from auth requirements? # Is the function exempt from auth requirements?
path_func = resolve(request.path).func path_func = resolve(request.path).func
if getattr(path_func, 'auth_exempt', False) is True: if getattr(path_func, 'auth_exempt', False) is True:
return self.get_response(request) return self.get_response(request)
if not request.user.is_authenticated: if not request.user.is_authenticated and not (
path == f'/{settings.FRONTEND_URL_BASE}' or self.check_token(request)
):
""" """
Normally, a web-based session would use csrftoken based authentication. Normally, a web-based session would use csrftoken based authentication.
However when running an external application (e.g. the InvenTree app or Python library), However when running an external application (e.g. the InvenTree app or Python library),
we must validate the user token manually. we must validate the user token manually.
""" """
if path not in urls and not any(
authorized = False path.startswith(p) for p in paths_ignore_handling
# Allow static files to be accessed without auth
# Important for e.g. login page
if (
request.path_info.startswith('/static/')
or request.path_info.startswith('/accounts/')
or (
request.path_info.startswith(f'/{settings.FRONTEND_URL_BASE}/')
or request.path_info.startswith('/assets/')
or request.path_info == f'/{settings.FRONTEND_URL_BASE}'
)
or self.check_token(request)
): ):
authorized = True # Save the 'next' parameter to pass through to the login view
# No authorization was found for the request return redirect(f'{reverse_lazy("account_login")}?next={request.path}')
if not authorized: # Return a 401 (Unauthorized) response code for this request
path = request.path_info return HttpResponse('Unauthorized', status=401)
if path not in urls and not any(
path.startswith(p) for p in paths_ignore
):
# Save the 'next' parameter to pass through to the login view
return redirect(
f'{reverse_lazy("account_login")}?next={request.path}'
)
# Return a 401 (Unauthorized) response code for this request
return HttpResponse('Unauthorized', status=401)
response = self.get_response(request) response = self.get_response(request)
return response return response
@@ -152,20 +173,6 @@ class Check2FAMiddleware(MiddlewareMixin):
Adapted from https://github.com/pennersr/django-allauth/issues/3649 Adapted from https://github.com/pennersr/django-allauth/issues/3649
""" """
allowed_pages = [
'api-user-meta',
'api-user-me',
'api-user-roles',
'api-inventree-info',
'api-token',
# web platform urls
'password_reset_confirm',
'index',
'web',
'web-wildcard',
'web-assets',
]
app_names = ['headless']
require_2fa_message = _( require_2fa_message = _(
'You must enable two-factor authentication before doing anything else.' 'You must enable two-factor authentication before doing anything else.'
) )
@@ -180,10 +187,10 @@ class Check2FAMiddleware(MiddlewareMixin):
"""Check if the current page can be accessed without mfa.""" """Check if the current page can be accessed without mfa."""
match = request.resolver_match match = request.resolver_match
return ( return (
None False
if match is None if match is None
else any(ref in self.app_names for ref in match.app_names) else any(ref in apps_mfa_bypass for ref in match.app_names)
or match.url_name in self.allowed_pages or match.url_name in pages_mfa_bypass
or match.route == 'favicon.ico' or match.route == 'favicon.ico'
) )
@@ -202,7 +209,7 @@ class Check2FAMiddleware(MiddlewareMixin):
def process_view( def process_view(
self, request: HttpRequest, view_func, view_args, view_kwargs self, request: HttpRequest, view_func, view_args, view_kwargs
) -> HttpResponse: ) -> Optional[HttpResponse]:
"""Determine if the server is set up enforce 2fa registration.""" """Determine if the server is set up enforce 2fa registration."""
from django.conf import settings from django.conf import settings
@@ -216,6 +223,10 @@ class Check2FAMiddleware(MiddlewareMixin):
return None return None
if self.is_multifactor_logged_in(request): if self.is_multifactor_logged_in(request):
return None return None
if getattr(
request, 'token', get_token_from_request(request)
): # Token based login can not do MFA
return None
if self.enforce_2fa(request): if self.enforce_2fa(request):
return self.on_require_2fa(request) return self.on_require_2fa(request)
@@ -223,7 +234,9 @@ class Check2FAMiddleware(MiddlewareMixin):
def enforce_2fa(self, request): def enforce_2fa(self, request):
"""Use setting to check if MFA should be enforced.""" """Use setting to check if MFA should be enforced."""
return get_global_setting('LOGIN_ENFORCE_MFA') return get_global_setting(
'LOGIN_ENFORCE_MFA', None, 'INVENTREE_LOGIN_ENFORCE_MFA'
)
class InvenTreeRemoteUserMiddleware(PersistentRemoteUserMiddleware): class InvenTreeRemoteUserMiddleware(PersistentRemoteUserMiddleware):
@@ -304,7 +317,7 @@ class InvenTreeHostSettingsMiddleware(MiddlewareMixin):
# Handle commonly ignored paths that might also work without a correct setup (api, auth) # Handle commonly ignored paths that might also work without a correct setup (api, auth)
path = request.path_info path = request.path_info
if path in urls or any(path.startswith(p) for p in paths_ignore): if path in urls or any(path.startswith(p) for p in paths_ignore_handling):
return None return None
# treat the accessed scheme and host # treat the accessed scheme and host

View File

@@ -108,6 +108,13 @@ class ApiAccessTests(InvenTreeAPITestCase):
self.tokenAuth() self.tokenAuth()
self.assertIsNotNone(self.token) self.assertIsNotNone(self.token)
# Run explicit test with token auth
url = reverse('api-license')
response = self.get(
url, headers={'Authorization': f'Token {self.token}'}, expected_code=200
)
self.assertIn('backend', response.json())
def test_role_view(self): def test_role_view(self):
"""Test that we can access the 'roles' view for the logged in user. """Test that we can access the 'roles' view for the logged in user.

View File

@@ -4,6 +4,7 @@ from django.contrib.auth.models import Group, User
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.test import override_settings from django.test import override_settings
from django.test.testcases import TransactionTestCase from django.test.testcases import TransactionTestCase
from django.urls import reverse
from allauth.socialaccount.models import SocialAccount, SocialLogin from allauth.socialaccount.models import SocialAccount, SocialLogin
@@ -139,13 +140,15 @@ class TestAuth(InvenTreeAPITestCase):
"""Test authentication functionality.""" """Test authentication functionality."""
reg_url = '/api/auth/v1/auth/signup' reg_url = '/api/auth/v1/auth/signup'
login_url = '/api/auth/v1/auth/login'
test_email = 'tester@example.com' test_email = 'tester@example.com'
def test_buildin_token(self): def test_buildin_token(self):
"""Test the built-in token authentication.""" """Test the built-in token authentication."""
self.logout() self.logout()
response = self.post( response = self.post(
'/api/auth/v1/auth/login', self.login_url,
{'username': self.username, 'password': self.password}, {'username': self.username, 'password': self.password},
expected_code=200, expected_code=200,
) )
@@ -155,7 +158,7 @@ class TestAuth(InvenTreeAPITestCase):
# Test for conflicting login # Test for conflicting login
self.post( self.post(
'/api/auth/v1/auth/login', self.login_url,
{'username': self.username, 'password': self.password}, {'username': self.username, 'password': self.password},
expected_code=409, expected_code=409,
) )
@@ -222,3 +225,22 @@ class TestAuth(InvenTreeAPITestCase):
): ):
resp = self.post(self.reg_url, self.email_args(), expected_code=200) resp = self.post(self.reg_url, self.email_args(), expected_code=200)
self.assertEqual(resp.json()['data']['user']['email'], self.test_email) self.assertEqual(resp.json()['data']['user']['email'], self.test_email)
def test_auth_request(self):
"""Test the auth_request view."""
url = reverse('auth-check')
# Logged in user
self.get(url)
# Inactive user
# TODO @matmair - this part of auth_request is not triggering currently
# self.user.is_active = False
# self.user.save()
# self.get(url, expected_code=403)
# self.user.is_active = True
# self.user.save()
# Logged out user
self.client.logout()
self.get(url, expected_code=401)

View File

@@ -1,5 +1,7 @@
"""Tests for middleware functions.""" """Tests for middleware functions."""
from unittest.mock import patch
from django.conf import settings from django.conf import settings
from django.http import Http404 from django.http import Http404
from django.urls import reverse from django.urls import reverse
@@ -7,17 +9,19 @@ from django.urls import reverse
from error_report.models import Error from error_report.models import Error
from InvenTree.exceptions import log_error from InvenTree.exceptions import log_error
from InvenTree.helpers_mfa import get_codes
from InvenTree.unit_test import InvenTreeTestCase from InvenTree.unit_test import InvenTreeTestCase
class MiddlewareTests(InvenTreeTestCase): class MiddlewareTests(InvenTreeTestCase):
"""Test for middleware functions.""" """Test for middleware functions."""
def check_path(self, url, code=200, **kwargs): def check_path(self, url, code=200, auth_header=None, **kwargs):
"""Helper function to run a request.""" """Helper function to run a request."""
response = self.client.get( headers = {'accept': 'application/json'}
url, headers={'accept': 'application/json'}, **kwargs if auth_header:
) headers['Authorization'] = auth_header
response = self.client.get(url, headers=headers, **kwargs)
self.assertEqual(response.status_code, code) self.assertEqual(response.status_code, code)
return response return response
@@ -36,13 +40,62 @@ class MiddlewareTests(InvenTreeTestCase):
response = self.check_path(reverse('index'), 302) response = self.check_path(reverse('index'), 302)
self.assertEqual(response.url, '/accounts/login/?next=/') self.assertEqual(response.url, '/accounts/login/?next=/')
def test_Check2FAMiddleware(self):
"""Test the 2FA middleware."""
url = reverse('api-part-list')
self.assignRole(role='part.view', group=self.group)
# Ensure that normal access works with mfa enabled
with self.settings(MFA_ENABLED=True):
self.check_path(url)
# Ensure that normal access works with mfa disabled
with self.settings(MFA_ENABLED=False):
self.check_path(url)
# Now enforce MFA for the user
with self.settings(MFA_ENABLED=True) and patch.dict(
'os.environ', {'INVENTREE_LOGIN_ENFORCE_MFA': 'True'}
):
# Enforced but not logged in via mfa -> should give 403
response = self.check_path(url, 401)
self.assertContains(
response,
'You must enable two-factor authentication before doing anything else.',
status_code=401,
)
# Register a token and try again
rc_codes = get_codes(self.user)[1]
self.client.logout()
# Login step 1
self.client.post(
reverse('browser:account:login'),
{'username': self.username, 'password': self.password},
content_type='application/json',
)
# Login step 2
self.client.post(
reverse('browser:mfa:authenticate'),
{'code': rc_codes[0]},
expected_code=401,
content_type='application/json',
)
rsp3 = self.client.post(
reverse('browser:mfa:trust'),
{'trust': False},
expected_code=200,
content_type='application/json',
)
self.assertEqual(rsp3.status_code, 200)
self.check_path(url)
def test_token_auth(self): def test_token_auth(self):
"""Test auth with token auth.""" """Test auth with token auth."""
target = reverse('api-license') target = reverse('api-license')
# get token # get token
# response = self.client.get(reverse('api-token'), format='json', data={}) response = self.client.get(reverse('api-token'), format='json', data={})
# token = response.data['token'] token = response.data['token']
# logout # logout
self.client.logout() self.client.logout()
@@ -51,13 +104,16 @@ class MiddlewareTests(InvenTreeTestCase):
self.check_path(target, 401) self.check_path(target, 401)
# Request with broken token # Request with broken token
self.check_path(target, 401, HTTP_Authorization='Token abcd123') self.check_path(target, 401, auth_header='Token abcd123')
# should still fail without token # should still fail without token
self.check_path(target, 401) self.check_path(target, 401)
# request with token # request with token - should work
# self.check_path(target, HTTP_Authorization=f'Token {token}') self.check_path(target, auth_header=f'Token {token}')
# Request something that is not on the API - should still work
self.check_path(reverse('auth-check'), auth_header=f'Token {token}')
def test_error_exceptions(self): def test_error_exceptions(self):
"""Test that ignored errors are not logged.""" """Test that ignored errors are not logged."""

View File

@@ -188,7 +188,7 @@ class CorsTest(TestCase):
Here, we are not authorized by default, Here, we are not authorized by default,
but the CORS headers should still be included. but the CORS headers should still be included.
""" """
url = '/auth/' url = reverse('auth-check')
# First, a preflight request with a "valid" origin # First, a preflight request with a "valid" origin

View File

@@ -130,7 +130,9 @@ backendpatterns = [
path( path(
'auth/', include('rest_framework.urls', namespace='rest_framework') 'auth/', include('rest_framework.urls', namespace='rest_framework')
), # Used for (DRF) browsable API auth ), # Used for (DRF) browsable API auth
path('auth/', auth_request), # Used for proxies to check if user is authenticated path(
'auth/', auth_request, name='auth-check'
), # Used for proxies to check if user is authenticated
path('accounts/', include('allauth.urls')), path('accounts/', include('allauth.urls')),
# OAuth2 # OAuth2
flagged_path('OIDC', 'o/', include(oauth2_urls)), flagged_path('OIDC', 'o/', include(oauth2_urls)),

View File

@@ -12,12 +12,13 @@ def auth_request(request):
Useful for (for example) redirecting authentication requests through django's permission framework. Useful for (for example) redirecting authentication requests through django's permission framework.
""" """
if not request.user or not request.user.is_authenticated: if (
return HttpResponse(status=403) not request.user
or not request.user.is_authenticated
if not request.user.is_active: or not request.user.is_active
# Reject requests from inactive users ):
return HttpResponse(status=403) # This is very unlikely to be reached, as the middleware stack should intercept unauthenticated requests
return HttpResponse(status=403) # pragma: no cover
# User is authenticated and active # User is authenticated and active
return HttpResponse(status=200) return HttpResponse(status=200)

View File

@@ -5,9 +5,8 @@ from django.contrib.auth.models import Group
from django.test import TestCase from django.test import TestCase
from django.urls import reverse from django.urls import reverse
from allauth.mfa.totp.internal import auth as totp_auth
from common.settings import set_global_setting from common.settings import set_global_setting
from InvenTree.helpers_mfa import get_codes
from InvenTree.unit_test import AdminTestCase, InvenTreeAPITestCase, InvenTreeTestCase from InvenTree.unit_test import AdminTestCase, InvenTreeAPITestCase, InvenTreeTestCase
from users.models import ApiToken, Owner from users.models import ApiToken, Owner
from users.oauth2_scopes import _roles from users.oauth2_scopes import _roles
@@ -334,8 +333,6 @@ class OwnerModelTest(InvenTreeTestCase):
class MFALoginTest(InvenTreeAPITestCase): class MFALoginTest(InvenTreeAPITestCase):
"""Some simplistic tests to ensure that MFA is working.""" """Some simplistic tests to ensure that MFA is working."""
mfa_secret = None
def test_api(self): def test_api(self):
"""Test that the API is working.""" """Test that the API is working."""
auth_data = {'username': self.username, 'password': self.password} auth_data = {'username': self.username, 'password': self.password}
@@ -349,13 +346,8 @@ class MFALoginTest(InvenTreeAPITestCase):
response = self.post(login_url, auth_data, expected_code=200) response = self.post(login_url, auth_data, expected_code=200)
self._helper_meta_val(response) self._helper_meta_val(response)
return # TODO @matmair re-enable MFA tests once stable
# Add MFA - trying in a limited loop in case of timing issues # Add MFA - trying in a limited loop in case of timing issues
response = self.post( rc_code = get_codes(user=self.user)[1][0]
reverse('browser:mfa:manage_totp'),
{'code': self.get_topt()},
expected_code=200,
)
# There must be a TOTP device now - success # There must be a TOTP device now - success
self.get(reverse('browser:mfa:manage_totp'), expected_code=200) self.get(reverse('browser:mfa:manage_totp'), expected_code=200)
@@ -373,11 +365,9 @@ class MFALoginTest(InvenTreeAPITestCase):
response = self.post(login_url, auth_data, expected_code=401) response = self.post(login_url, auth_data, expected_code=401)
# MFA not finished - no access allowed # MFA not finished - no access allowed
self.get(reverse('api-token'), expected_code=401) self.get(reverse('api-token'), expected_code=401)
# Complete # Complete MFA (with recovery code to avoid timing issues)
self.post( self.post(
reverse('browser:mfa:authenticate'), reverse('browser:mfa:authenticate'), {'code': rc_code}, expected_code=401
{'code': self.get_topt()},
expected_code=401,
) )
self.post(reverse('browser:mfa:trust'), {'trust': False}, expected_code=200) self.post(reverse('browser:mfa:trust'), {'trust': False}, expected_code=200)
# and run through trust # and run through trust
@@ -405,15 +395,6 @@ class MFALoginTest(InvenTreeAPITestCase):
flows = response.json()['data']['flows'] flows = response.json()['data']['flows']
return next(a for a in flows if a['id'] == flow_id) return next(a for a in flows if a['id'] == flow_id)
def get_topt(self):
"""Helper to get a current totp code."""
if not self.mfa_secret:
mfa_init = self.get(reverse('browser:mfa:manage_totp'), expected_code=404)
self.mfa_secret = mfa_init.json()['meta']['secret']
return totp_auth.hotp_value(
self.mfa_secret, next(totp_auth.yield_hotp_counters_from_time())
)
class AdminTest(AdminTestCase): class AdminTest(AdminTestCase):
"""Tests for the admin interface integration.""" """Tests for the admin interface integration."""