mirror of
https://github.com/inventree/InvenTree.git
synced 2025-11-13 11:26:42 +00:00
* fix(backend): auth check middleware for specific media access (#10784)
* simplify
* fix return type
* handle token (app access)
* reduce lookup amount
* add positive test again
* add poisitive test
* move out settings
* add tests for Check2FAMiddleware
* add test for auth_request
* add a reverse name for auth_request
* auth tests refactors
* move test
* disable check for things that do not trigger
* fix typing for python 3.9
* make names clearer and add comments
* finish tests
* fix call
* re-enable mfa test without the timing component
* cleanup helper
* ignore easy out
* ignore scenario that can not happen
(cherry picked from commit f3e8482469)
* fix merge
This commit is contained in:
22
src/backend/InvenTree/InvenTree/helpers_mfa.py
Normal file
22
src/backend/InvenTree/InvenTree/helpers_mfa.py
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
"""Helper functions for allauth MFA testing."""
|
||||||
|
|
||||||
|
from allauth.mfa.recovery_codes.internal.auth import RecoveryCodes
|
||||||
|
from allauth.mfa.totp.internal import auth as allauth_totp_auth
|
||||||
|
|
||||||
|
|
||||||
|
def get_codes(user):
|
||||||
|
"""Generate active TOTP and recovery codes for a user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user: User instance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (TOTP Authenticator instance, list of recovery codes, TOTP secret)
|
||||||
|
"""
|
||||||
|
secret = allauth_totp_auth.generate_totp_secret()
|
||||||
|
totp_auth = allauth_totp_auth.TOTP.activate(user, secret).instance
|
||||||
|
rc_auth = RecoveryCodes.activate(user).instance
|
||||||
|
|
||||||
|
# Get usable codes
|
||||||
|
rc_codes = rc_auth.wrap().get_unused_codes()
|
||||||
|
return totp_auth, rc_codes, secret
|
||||||
@@ -1,21 +1,22 @@
|
|||||||
"""Middleware for InvenTree."""
|
"""Middleware for InvenTree."""
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
from typing import Optional
|
||||||
from urllib.parse import urlsplit
|
from urllib.parse import urlsplit
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth.middleware import PersistentRemoteUserMiddleware
|
from django.contrib.auth.middleware import PersistentRemoteUserMiddleware
|
||||||
from django.http import HttpResponse
|
from django.http import HttpRequest, HttpResponse, JsonResponse
|
||||||
from django.shortcuts import redirect, render
|
from django.shortcuts import redirect, render
|
||||||
from django.urls import resolve, reverse_lazy
|
from django.urls import resolve, reverse, reverse_lazy
|
||||||
from django.utils.deprecation import MiddlewareMixin
|
from django.utils.deprecation import MiddlewareMixin
|
||||||
from django.utils.http import is_same_domain
|
from django.utils.http import is_same_domain
|
||||||
|
from django.utils.translation import gettext_lazy as _
|
||||||
|
|
||||||
import structlog
|
import structlog
|
||||||
from error_report.middleware import ExceptionProcessor
|
from error_report.middleware import ExceptionProcessor
|
||||||
|
|
||||||
from common.settings import get_global_setting
|
from common.settings import get_global_setting
|
||||||
from InvenTree.AllUserRequire2FAMiddleware import AllUserRequire2FAMiddleware
|
|
||||||
from InvenTree.cache import create_session_cache, delete_session_cache
|
from InvenTree.cache import create_session_cache, delete_session_cache
|
||||||
from InvenTree.config import CONFIG_LOOKUPS, inventreeInstaller
|
from InvenTree.config import CONFIG_LOOKUPS, inventreeInstaller
|
||||||
from users.models import ApiToken
|
from users.models import ApiToken
|
||||||
@@ -40,6 +41,15 @@ def get_token_from_request(request):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_slashes(path: str):
|
||||||
|
"""Ensure that slashes are suroudning the passed path."""
|
||||||
|
if not path.startswith('/'):
|
||||||
|
path = f'/{path}'
|
||||||
|
if not path.endswith('/'):
|
||||||
|
path = f'{path}/'
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
# List of target URL endpoints where *do not* want to redirect to
|
# List of target URL endpoints where *do not* want to redirect to
|
||||||
urls = [
|
urls = [
|
||||||
reverse_lazy('account_login'),
|
reverse_lazy('account_login'),
|
||||||
@@ -47,8 +57,46 @@ urls = [
|
|||||||
reverse_lazy('admin:logout'),
|
reverse_lazy('admin:logout'),
|
||||||
]
|
]
|
||||||
|
|
||||||
# Do not redirect requests to any of these paths
|
paths_ignore_handling = [
|
||||||
paths_ignore = ['/api/', '/auth/', settings.MEDIA_URL, settings.STATIC_URL]
|
'/api/',
|
||||||
|
reverse('auth-check'),
|
||||||
|
settings.MEDIA_URL,
|
||||||
|
settings.STATIC_URL,
|
||||||
|
]
|
||||||
|
"""Paths that should not use InvenTrees own auth rejection behaviour, no host checking or redirecting. Security
|
||||||
|
are still enforced."""
|
||||||
|
paths_own_security = [
|
||||||
|
'/api/', # DRF handles API
|
||||||
|
'/o/', # oAuth2 library - has its own auth model
|
||||||
|
'/anymail/', # Mails - wehbhooks etc
|
||||||
|
'/accounts/', # allauth account management - has its own auth model
|
||||||
|
'/assets/', # Web assets - only used for testing, no security model needed
|
||||||
|
ensure_slashes(
|
||||||
|
settings.STATIC_URL
|
||||||
|
), # Static files - static files are considered safe to serve
|
||||||
|
ensure_slashes(
|
||||||
|
settings.FRONTEND_URL_BASE
|
||||||
|
), # Frontend files - frontend paths have their own security model
|
||||||
|
]
|
||||||
|
"""Paths that handle their own security model."""
|
||||||
|
pages_mfa_bypass = [
|
||||||
|
'api-user-meta',
|
||||||
|
'api-user-me',
|
||||||
|
'api-user-roles',
|
||||||
|
'api-inventree-info',
|
||||||
|
'api-token',
|
||||||
|
# web platform urls
|
||||||
|
'password_reset_confirm',
|
||||||
|
'index',
|
||||||
|
'web',
|
||||||
|
'web-wildcard',
|
||||||
|
'web-assets',
|
||||||
|
]
|
||||||
|
"""Exact page names that bypass MFA enforcement - normal security model is still enforced."""
|
||||||
|
apps_mfa_bypass = [
|
||||||
|
'headless' # Headless allauth app - has its own security model
|
||||||
|
]
|
||||||
|
"""App namespaces that bypass MFA enforcement - normal security model is still enforced."""
|
||||||
|
|
||||||
|
|
||||||
class AuthRequiredMiddleware:
|
class AuthRequiredMiddleware:
|
||||||
@@ -61,6 +109,7 @@ class AuthRequiredMiddleware:
|
|||||||
def check_token(self, request) -> bool:
|
def check_token(self, request) -> bool:
|
||||||
"""Check if the user is authenticated via token."""
|
"""Check if the user is authenticated via token."""
|
||||||
if token := get_token_from_request(request):
|
if token := get_token_from_request(request):
|
||||||
|
request.token = token
|
||||||
# Does the provided token match a valid user?
|
# Does the provided token match a valid user?
|
||||||
try:
|
try:
|
||||||
token = ApiToken.objects.get(key=token)
|
token = ApiToken.objects.get(key=token)
|
||||||
@@ -69,8 +118,10 @@ class AuthRequiredMiddleware:
|
|||||||
# Provide the user information to the request
|
# Provide the user information to the request
|
||||||
request.user = token.user
|
request.user = token.user
|
||||||
return True
|
return True
|
||||||
except ApiToken.DoesNotExist:
|
except ApiToken.DoesNotExist: # pragma: no cover
|
||||||
logger.warning('Access denied for unknown token %s', token)
|
logger.warning(
|
||||||
|
'Access denied for unknown token %s', token
|
||||||
|
) # pragma: no cover
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -79,79 +130,113 @@ class AuthRequiredMiddleware:
|
|||||||
|
|
||||||
Redirects to login if not authenticated.
|
Redirects to login if not authenticated.
|
||||||
"""
|
"""
|
||||||
|
path: str = request.path_info
|
||||||
# Code to be executed for each request before
|
# Code to be executed for each request before
|
||||||
# the view (and later middleware) are called.
|
# the view (and later middleware) are called.
|
||||||
|
|
||||||
assert hasattr(request, 'user')
|
assert hasattr(request, 'user')
|
||||||
|
|
||||||
# API requests are handled by the DRF library
|
# API requests that are handled elsewhere
|
||||||
if request.path_info.startswith('/api/'):
|
if any(path.startswith(a) for a in paths_own_security):
|
||||||
return self.get_response(request)
|
|
||||||
|
|
||||||
# oAuth2 requests are handled by the oAuth2 library
|
|
||||||
if request.path_info.startswith('/o/'):
|
|
||||||
return self.get_response(request)
|
|
||||||
|
|
||||||
# anymail requests are handled by the anymail library
|
|
||||||
if request.path_info.startswith('/anymail/'):
|
|
||||||
return self.get_response(request)
|
return self.get_response(request)
|
||||||
|
|
||||||
# Is the function exempt from auth requirements?
|
# Is the function exempt from auth requirements?
|
||||||
path_func = resolve(request.path).func
|
path_func = resolve(request.path).func
|
||||||
|
|
||||||
if getattr(path_func, 'auth_exempt', False) is True:
|
if getattr(path_func, 'auth_exempt', False) is True:
|
||||||
return self.get_response(request)
|
return self.get_response(request)
|
||||||
|
|
||||||
if not request.user.is_authenticated:
|
if not request.user.is_authenticated and not (
|
||||||
|
path == f'/{settings.FRONTEND_URL_BASE}' or self.check_token(request)
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Normally, a web-based session would use csrftoken based authentication.
|
Normally, a web-based session would use csrftoken based authentication.
|
||||||
|
|
||||||
However when running an external application (e.g. the InvenTree app or Python library),
|
However when running an external application (e.g. the InvenTree app or Python library),
|
||||||
we must validate the user token manually.
|
we must validate the user token manually.
|
||||||
"""
|
"""
|
||||||
|
if path not in urls and not any(
|
||||||
authorized = False
|
path.startswith(p) for p in paths_ignore_handling
|
||||||
|
|
||||||
# Allow static files to be accessed without auth
|
|
||||||
# Important for e.g. login page
|
|
||||||
if (
|
|
||||||
request.path_info.startswith('/static/')
|
|
||||||
or request.path_info.startswith('/accounts/')
|
|
||||||
or (
|
|
||||||
request.path_info.startswith(f'/{settings.FRONTEND_URL_BASE}/')
|
|
||||||
or request.path_info.startswith('/assets/')
|
|
||||||
or request.path_info == f'/{settings.FRONTEND_URL_BASE}'
|
|
||||||
)
|
|
||||||
or self.check_token(request)
|
|
||||||
):
|
):
|
||||||
authorized = True
|
# Save the 'next' parameter to pass through to the login view
|
||||||
|
|
||||||
# No authorization was found for the request
|
return redirect(f'{reverse_lazy("account_login")}?next={request.path}')
|
||||||
if not authorized:
|
# Return a 401 (Unauthorized) response code for this request
|
||||||
path = request.path_info
|
return HttpResponse('Unauthorized', status=401)
|
||||||
|
|
||||||
if path not in urls and not any(
|
|
||||||
path.startswith(p) for p in paths_ignore
|
|
||||||
):
|
|
||||||
# Save the 'next' parameter to pass through to the login view
|
|
||||||
|
|
||||||
return redirect(
|
|
||||||
f'{reverse_lazy("account_login")}?next={request.path}'
|
|
||||||
)
|
|
||||||
# Return a 401 (Unauthorized) response code for this request
|
|
||||||
return HttpResponse('Unauthorized', status=401)
|
|
||||||
|
|
||||||
response = self.get_response(request)
|
response = self.get_response(request)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
class Check2FAMiddleware(AllUserRequire2FAMiddleware):
|
class Check2FAMiddleware(MiddlewareMixin):
|
||||||
"""Ensure that mfa is enforced if set so."""
|
"""Ensure that users have two-factor authentication enabled before they have access restricted endpoints.
|
||||||
|
|
||||||
|
Adapted from https://github.com/pennersr/django-allauth/issues/3649
|
||||||
|
"""
|
||||||
|
|
||||||
|
require_2fa_message = _(
|
||||||
|
'You must enable two-factor authentication before doing anything else.'
|
||||||
|
)
|
||||||
|
|
||||||
|
def on_require_2fa(self, request: HttpRequest) -> HttpResponse:
|
||||||
|
"""Force user to mfa activation."""
|
||||||
|
return JsonResponse(
|
||||||
|
{'id': 'mfa_register', 'error': self.require_2fa_message}, status=401
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_allowed_page(self, request: HttpRequest) -> bool:
|
||||||
|
"""Check if the current page can be accessed without mfa."""
|
||||||
|
match = request.resolver_match
|
||||||
|
return (
|
||||||
|
False
|
||||||
|
if match is None
|
||||||
|
else any(ref in apps_mfa_bypass for ref in match.app_names)
|
||||||
|
or match.url_name in pages_mfa_bypass
|
||||||
|
or match.route == 'favicon.ico'
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_multifactor_logged_in(self, request: HttpRequest) -> bool:
|
||||||
|
"""Check if the user is logged in with multifactor authentication."""
|
||||||
|
from allauth.account.authentication import get_authentication_records
|
||||||
|
from allauth.mfa.utils import is_mfa_enabled
|
||||||
|
from allauth.mfa.webauthn.internal.flows import did_use_passwordless_login
|
||||||
|
|
||||||
|
authns = get_authentication_records(request)
|
||||||
|
|
||||||
|
return is_mfa_enabled(request.user) and (
|
||||||
|
did_use_passwordless_login(request)
|
||||||
|
or any(record.get('method') == 'mfa' for record in authns)
|
||||||
|
)
|
||||||
|
|
||||||
|
def process_view(
|
||||||
|
self, request: HttpRequest, view_func, view_args, view_kwargs
|
||||||
|
) -> Optional[HttpResponse]:
|
||||||
|
"""Determine if the server is set up enforce 2fa registration."""
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
# Exit early if MFA is not enabled
|
||||||
|
if not settings.MFA_ENABLED:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if request.user.is_anonymous:
|
||||||
|
return None
|
||||||
|
if self.is_allowed_page(request):
|
||||||
|
return None
|
||||||
|
if self.is_multifactor_logged_in(request):
|
||||||
|
return None
|
||||||
|
if getattr(
|
||||||
|
request, 'token', get_token_from_request(request)
|
||||||
|
): # Token based login can not do MFA
|
||||||
|
return None
|
||||||
|
|
||||||
|
if self.enforce_2fa(request):
|
||||||
|
return self.on_require_2fa(request)
|
||||||
|
return None
|
||||||
|
|
||||||
def enforce_2fa(self, request):
|
def enforce_2fa(self, request):
|
||||||
"""Use setting to check if MFA should be enforced."""
|
"""Use setting to check if MFA should be enforced."""
|
||||||
return get_global_setting('LOGIN_ENFORCE_MFA')
|
return get_global_setting(
|
||||||
|
'LOGIN_ENFORCE_MFA', None, 'INVENTREE_LOGIN_ENFORCE_MFA'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class InvenTreeRemoteUserMiddleware(PersistentRemoteUserMiddleware):
|
class InvenTreeRemoteUserMiddleware(PersistentRemoteUserMiddleware):
|
||||||
@@ -232,7 +317,7 @@ class InvenTreeHostSettingsMiddleware(MiddlewareMixin):
|
|||||||
|
|
||||||
# Handle commonly ignored paths that might also work without a correct setup (api, auth)
|
# Handle commonly ignored paths that might also work without a correct setup (api, auth)
|
||||||
path = request.path_info
|
path = request.path_info
|
||||||
if path in urls or any(path.startswith(p) for p in paths_ignore):
|
if path in urls or any(path.startswith(p) for p in paths_ignore_handling):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# treat the accessed scheme and host
|
# treat the accessed scheme and host
|
||||||
|
|||||||
@@ -108,6 +108,13 @@ class ApiAccessTests(InvenTreeAPITestCase):
|
|||||||
self.tokenAuth()
|
self.tokenAuth()
|
||||||
self.assertIsNotNone(self.token)
|
self.assertIsNotNone(self.token)
|
||||||
|
|
||||||
|
# Run explicit test with token auth
|
||||||
|
url = reverse('api-license')
|
||||||
|
response = self.get(
|
||||||
|
url, headers={'Authorization': f'Token {self.token}'}, expected_code=200
|
||||||
|
)
|
||||||
|
self.assertIn('backend', response.json())
|
||||||
|
|
||||||
def test_role_view(self):
|
def test_role_view(self):
|
||||||
"""Test that we can access the 'roles' view for the logged in user.
|
"""Test that we can access the 'roles' view for the logged in user.
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from django.contrib.auth.models import Group, User
|
|||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
from django.test import override_settings
|
from django.test import override_settings
|
||||||
from django.test.testcases import TransactionTestCase
|
from django.test.testcases import TransactionTestCase
|
||||||
|
from django.urls import reverse
|
||||||
|
|
||||||
from allauth.socialaccount.models import SocialAccount, SocialLogin
|
from allauth.socialaccount.models import SocialAccount, SocialLogin
|
||||||
|
|
||||||
@@ -139,13 +140,15 @@ class TestAuth(InvenTreeAPITestCase):
|
|||||||
"""Test authentication functionality."""
|
"""Test authentication functionality."""
|
||||||
|
|
||||||
reg_url = '/api/auth/v1/auth/signup'
|
reg_url = '/api/auth/v1/auth/signup'
|
||||||
|
login_url = '/api/auth/v1/auth/login'
|
||||||
test_email = 'tester@example.com'
|
test_email = 'tester@example.com'
|
||||||
|
|
||||||
def test_buildin_token(self):
|
def test_buildin_token(self):
|
||||||
"""Test the built-in token authentication."""
|
"""Test the built-in token authentication."""
|
||||||
self.logout()
|
self.logout()
|
||||||
|
|
||||||
response = self.post(
|
response = self.post(
|
||||||
'/api/auth/v1/auth/login',
|
self.login_url,
|
||||||
{'username': self.username, 'password': self.password},
|
{'username': self.username, 'password': self.password},
|
||||||
expected_code=200,
|
expected_code=200,
|
||||||
)
|
)
|
||||||
@@ -155,7 +158,7 @@ class TestAuth(InvenTreeAPITestCase):
|
|||||||
|
|
||||||
# Test for conflicting login
|
# Test for conflicting login
|
||||||
self.post(
|
self.post(
|
||||||
'/api/auth/v1/auth/login',
|
self.login_url,
|
||||||
{'username': self.username, 'password': self.password},
|
{'username': self.username, 'password': self.password},
|
||||||
expected_code=409,
|
expected_code=409,
|
||||||
)
|
)
|
||||||
@@ -222,3 +225,22 @@ class TestAuth(InvenTreeAPITestCase):
|
|||||||
):
|
):
|
||||||
resp = self.post(self.reg_url, self.email_args(), expected_code=200)
|
resp = self.post(self.reg_url, self.email_args(), expected_code=200)
|
||||||
self.assertEqual(resp.json()['data']['user']['email'], self.test_email)
|
self.assertEqual(resp.json()['data']['user']['email'], self.test_email)
|
||||||
|
|
||||||
|
def test_auth_request(self):
|
||||||
|
"""Test the auth_request view."""
|
||||||
|
url = reverse('auth-check')
|
||||||
|
|
||||||
|
# Logged in user
|
||||||
|
self.get(url)
|
||||||
|
|
||||||
|
# Inactive user
|
||||||
|
# TODO @matmair - this part of auth_request is not triggering currently
|
||||||
|
# self.user.is_active = False
|
||||||
|
# self.user.save()
|
||||||
|
# self.get(url, expected_code=403)
|
||||||
|
# self.user.is_active = True
|
||||||
|
# self.user.save()
|
||||||
|
|
||||||
|
# Logged out user
|
||||||
|
self.client.logout()
|
||||||
|
self.get(url, expected_code=401)
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
"""Tests for middleware functions."""
|
"""Tests for middleware functions."""
|
||||||
|
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.http import Http404
|
from django.http import Http404
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
@@ -7,17 +9,19 @@ from django.urls import reverse
|
|||||||
from error_report.models import Error
|
from error_report.models import Error
|
||||||
|
|
||||||
from InvenTree.exceptions import log_error
|
from InvenTree.exceptions import log_error
|
||||||
|
from InvenTree.helpers_mfa import get_codes
|
||||||
from InvenTree.unit_test import InvenTreeTestCase
|
from InvenTree.unit_test import InvenTreeTestCase
|
||||||
|
|
||||||
|
|
||||||
class MiddlewareTests(InvenTreeTestCase):
|
class MiddlewareTests(InvenTreeTestCase):
|
||||||
"""Test for middleware functions."""
|
"""Test for middleware functions."""
|
||||||
|
|
||||||
def check_path(self, url, code=200, **kwargs):
|
def check_path(self, url, code=200, auth_header=None, **kwargs):
|
||||||
"""Helper function to run a request."""
|
"""Helper function to run a request."""
|
||||||
response = self.client.get(
|
headers = {'accept': 'application/json'}
|
||||||
url, headers={'accept': 'application/json'}, **kwargs
|
if auth_header:
|
||||||
)
|
headers['Authorization'] = auth_header
|
||||||
|
response = self.client.get(url, headers=headers, **kwargs)
|
||||||
self.assertEqual(response.status_code, code)
|
self.assertEqual(response.status_code, code)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@@ -36,13 +40,62 @@ class MiddlewareTests(InvenTreeTestCase):
|
|||||||
response = self.check_path(reverse('index'), 302)
|
response = self.check_path(reverse('index'), 302)
|
||||||
self.assertEqual(response.url, '/accounts/login/?next=/')
|
self.assertEqual(response.url, '/accounts/login/?next=/')
|
||||||
|
|
||||||
|
def test_Check2FAMiddleware(self):
|
||||||
|
"""Test the 2FA middleware."""
|
||||||
|
url = reverse('api-part-list')
|
||||||
|
|
||||||
|
self.assignRole(role='part.view', group=self.group)
|
||||||
|
# Ensure that normal access works with mfa enabled
|
||||||
|
with self.settings(MFA_ENABLED=True):
|
||||||
|
self.check_path(url)
|
||||||
|
# Ensure that normal access works with mfa disabled
|
||||||
|
with self.settings(MFA_ENABLED=False):
|
||||||
|
self.check_path(url)
|
||||||
|
|
||||||
|
# Now enforce MFA for the user
|
||||||
|
with self.settings(MFA_ENABLED=True) and patch.dict(
|
||||||
|
'os.environ', {'INVENTREE_LOGIN_ENFORCE_MFA': 'True'}
|
||||||
|
):
|
||||||
|
# Enforced but not logged in via mfa -> should give 403
|
||||||
|
response = self.check_path(url, 401)
|
||||||
|
self.assertContains(
|
||||||
|
response,
|
||||||
|
'You must enable two-factor authentication before doing anything else.',
|
||||||
|
status_code=401,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Register a token and try again
|
||||||
|
rc_codes = get_codes(self.user)[1]
|
||||||
|
self.client.logout()
|
||||||
|
# Login step 1
|
||||||
|
self.client.post(
|
||||||
|
reverse('browser:account:login'),
|
||||||
|
{'username': self.username, 'password': self.password},
|
||||||
|
content_type='application/json',
|
||||||
|
)
|
||||||
|
# Login step 2
|
||||||
|
self.client.post(
|
||||||
|
reverse('browser:mfa:authenticate'),
|
||||||
|
{'code': rc_codes[0]},
|
||||||
|
expected_code=401,
|
||||||
|
content_type='application/json',
|
||||||
|
)
|
||||||
|
rsp3 = self.client.post(
|
||||||
|
reverse('browser:mfa:trust'),
|
||||||
|
{'trust': False},
|
||||||
|
expected_code=200,
|
||||||
|
content_type='application/json',
|
||||||
|
)
|
||||||
|
self.assertEqual(rsp3.status_code, 200)
|
||||||
|
self.check_path(url)
|
||||||
|
|
||||||
def test_token_auth(self):
|
def test_token_auth(self):
|
||||||
"""Test auth with token auth."""
|
"""Test auth with token auth."""
|
||||||
target = reverse('api-license')
|
target = reverse('api-license')
|
||||||
|
|
||||||
# get token
|
# get token
|
||||||
# response = self.client.get(reverse('api-token'), format='json', data={})
|
response = self.client.get(reverse('api-token'), format='json', data={})
|
||||||
# token = response.data['token']
|
token = response.data['token']
|
||||||
|
|
||||||
# logout
|
# logout
|
||||||
self.client.logout()
|
self.client.logout()
|
||||||
@@ -51,13 +104,16 @@ class MiddlewareTests(InvenTreeTestCase):
|
|||||||
self.check_path(target, 401)
|
self.check_path(target, 401)
|
||||||
|
|
||||||
# Request with broken token
|
# Request with broken token
|
||||||
self.check_path(target, 401, HTTP_Authorization='Token abcd123')
|
self.check_path(target, 401, auth_header='Token abcd123')
|
||||||
|
|
||||||
# should still fail without token
|
# should still fail without token
|
||||||
self.check_path(target, 401)
|
self.check_path(target, 401)
|
||||||
|
|
||||||
# request with token
|
# request with token - should work
|
||||||
# self.check_path(target, HTTP_Authorization=f'Token {token}')
|
self.check_path(target, auth_header=f'Token {token}')
|
||||||
|
|
||||||
|
# Request something that is not on the API - should still work
|
||||||
|
self.check_path(reverse('auth-check'), auth_header=f'Token {token}')
|
||||||
|
|
||||||
def test_error_exceptions(self):
|
def test_error_exceptions(self):
|
||||||
"""Test that ignored errors are not logged."""
|
"""Test that ignored errors are not logged."""
|
||||||
|
|||||||
@@ -188,7 +188,7 @@ class CorsTest(TestCase):
|
|||||||
Here, we are not authorized by default,
|
Here, we are not authorized by default,
|
||||||
but the CORS headers should still be included.
|
but the CORS headers should still be included.
|
||||||
"""
|
"""
|
||||||
url = '/auth/'
|
url = reverse('auth-check')
|
||||||
|
|
||||||
# First, a preflight request with a "valid" origin
|
# First, a preflight request with a "valid" origin
|
||||||
|
|
||||||
|
|||||||
@@ -130,7 +130,9 @@ backendpatterns = [
|
|||||||
path(
|
path(
|
||||||
'auth/', include('rest_framework.urls', namespace='rest_framework')
|
'auth/', include('rest_framework.urls', namespace='rest_framework')
|
||||||
), # Used for (DRF) browsable API auth
|
), # Used for (DRF) browsable API auth
|
||||||
path('auth/', auth_request), # Used for proxies to check if user is authenticated
|
path(
|
||||||
|
'auth/', auth_request, name='auth-check'
|
||||||
|
), # Used for proxies to check if user is authenticated
|
||||||
path('accounts/', include('allauth.urls')),
|
path('accounts/', include('allauth.urls')),
|
||||||
# OAuth2
|
# OAuth2
|
||||||
flagged_path('OIDC', 'o/', include(oauth2_urls)),
|
flagged_path('OIDC', 'o/', include(oauth2_urls)),
|
||||||
|
|||||||
@@ -12,12 +12,13 @@ def auth_request(request):
|
|||||||
|
|
||||||
Useful for (for example) redirecting authentication requests through django's permission framework.
|
Useful for (for example) redirecting authentication requests through django's permission framework.
|
||||||
"""
|
"""
|
||||||
if not request.user or not request.user.is_authenticated:
|
if (
|
||||||
return HttpResponse(status=403)
|
not request.user
|
||||||
|
or not request.user.is_authenticated
|
||||||
if not request.user.is_active:
|
or not request.user.is_active
|
||||||
# Reject requests from inactive users
|
):
|
||||||
return HttpResponse(status=403)
|
# This is very unlikely to be reached, as the middleware stack should intercept unauthenticated requests
|
||||||
|
return HttpResponse(status=403) # pragma: no cover
|
||||||
|
|
||||||
# User is authenticated and active
|
# User is authenticated and active
|
||||||
return HttpResponse(status=200)
|
return HttpResponse(status=200)
|
||||||
|
|||||||
@@ -5,9 +5,8 @@ from django.contrib.auth.models import Group
|
|||||||
from django.test import TestCase
|
from django.test import TestCase
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
|
|
||||||
from allauth.mfa.totp.internal import auth as totp_auth
|
|
||||||
|
|
||||||
from common.settings import set_global_setting
|
from common.settings import set_global_setting
|
||||||
|
from InvenTree.helpers_mfa import get_codes
|
||||||
from InvenTree.unit_test import AdminTestCase, InvenTreeAPITestCase, InvenTreeTestCase
|
from InvenTree.unit_test import AdminTestCase, InvenTreeAPITestCase, InvenTreeTestCase
|
||||||
from users.models import ApiToken, Owner
|
from users.models import ApiToken, Owner
|
||||||
from users.oauth2_scopes import _roles
|
from users.oauth2_scopes import _roles
|
||||||
@@ -334,8 +333,6 @@ class OwnerModelTest(InvenTreeTestCase):
|
|||||||
class MFALoginTest(InvenTreeAPITestCase):
|
class MFALoginTest(InvenTreeAPITestCase):
|
||||||
"""Some simplistic tests to ensure that MFA is working."""
|
"""Some simplistic tests to ensure that MFA is working."""
|
||||||
|
|
||||||
mfa_secret = None
|
|
||||||
|
|
||||||
def test_api(self):
|
def test_api(self):
|
||||||
"""Test that the API is working."""
|
"""Test that the API is working."""
|
||||||
auth_data = {'username': self.username, 'password': self.password}
|
auth_data = {'username': self.username, 'password': self.password}
|
||||||
@@ -349,13 +346,8 @@ class MFALoginTest(InvenTreeAPITestCase):
|
|||||||
response = self.post(login_url, auth_data, expected_code=200)
|
response = self.post(login_url, auth_data, expected_code=200)
|
||||||
self._helper_meta_val(response)
|
self._helper_meta_val(response)
|
||||||
|
|
||||||
return # TODO @matmair re-enable MFA tests once stable
|
|
||||||
# Add MFA - trying in a limited loop in case of timing issues
|
# Add MFA - trying in a limited loop in case of timing issues
|
||||||
response = self.post(
|
rc_code = get_codes(user=self.user)[1][0]
|
||||||
reverse('browser:mfa:manage_totp'),
|
|
||||||
{'code': self.get_topt()},
|
|
||||||
expected_code=200,
|
|
||||||
)
|
|
||||||
|
|
||||||
# There must be a TOTP device now - success
|
# There must be a TOTP device now - success
|
||||||
self.get(reverse('browser:mfa:manage_totp'), expected_code=200)
|
self.get(reverse('browser:mfa:manage_totp'), expected_code=200)
|
||||||
@@ -373,11 +365,9 @@ class MFALoginTest(InvenTreeAPITestCase):
|
|||||||
response = self.post(login_url, auth_data, expected_code=401)
|
response = self.post(login_url, auth_data, expected_code=401)
|
||||||
# MFA not finished - no access allowed
|
# MFA not finished - no access allowed
|
||||||
self.get(reverse('api-token'), expected_code=401)
|
self.get(reverse('api-token'), expected_code=401)
|
||||||
# Complete
|
# Complete MFA (with recovery code to avoid timing issues)
|
||||||
self.post(
|
self.post(
|
||||||
reverse('browser:mfa:authenticate'),
|
reverse('browser:mfa:authenticate'), {'code': rc_code}, expected_code=401
|
||||||
{'code': self.get_topt()},
|
|
||||||
expected_code=401,
|
|
||||||
)
|
)
|
||||||
self.post(reverse('browser:mfa:trust'), {'trust': False}, expected_code=200)
|
self.post(reverse('browser:mfa:trust'), {'trust': False}, expected_code=200)
|
||||||
# and run through trust
|
# and run through trust
|
||||||
@@ -405,15 +395,6 @@ class MFALoginTest(InvenTreeAPITestCase):
|
|||||||
flows = response.json()['data']['flows']
|
flows = response.json()['data']['flows']
|
||||||
return next(a for a in flows if a['id'] == flow_id)
|
return next(a for a in flows if a['id'] == flow_id)
|
||||||
|
|
||||||
def get_topt(self):
|
|
||||||
"""Helper to get a current totp code."""
|
|
||||||
if not self.mfa_secret:
|
|
||||||
mfa_init = self.get(reverse('browser:mfa:manage_totp'), expected_code=404)
|
|
||||||
self.mfa_secret = mfa_init.json()['meta']['secret']
|
|
||||||
return totp_auth.hotp_value(
|
|
||||||
self.mfa_secret, next(totp_auth.yield_hotp_counters_from_time())
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class AdminTest(AdminTestCase):
|
class AdminTest(AdminTestCase):
|
||||||
"""Tests for the admin interface integration."""
|
"""Tests for the admin interface integration."""
|
||||||
|
|||||||
Reference in New Issue
Block a user