mirror of
https://github.com/inventree/InvenTree.git
synced 2025-07-16 09:46:31 +00:00
fix(backend): missing scope (#10001)
* move permission and fix scope * add a way to only use scopes for doc purposes * add a check to stop this from happening again * bump api version
This commit is contained in:
@@ -1,12 +1,15 @@
|
||||
"""InvenTree API version information."""
|
||||
|
||||
# InvenTree API version
|
||||
INVENTREE_API_VERSION = 366
|
||||
INVENTREE_API_VERSION = 367
|
||||
|
||||
"""Increment this API version number whenever there is a significant change to the API that any clients need to know about."""
|
||||
|
||||
INVENTREE_API_TEXT = """
|
||||
|
||||
v367 -> 2025-07-10 : https://github.com/inventree/InvenTree/pull/10001
|
||||
- Adds OAuth2 scopes for importer sessions
|
||||
|
||||
v366 -> 2025-07-09 : https://github.com/inventree/InvenTree/pull/9987
|
||||
- Adds "category" filter to BomItem API endpoint
|
||||
|
||||
|
@@ -13,6 +13,7 @@ from users.oauth2_scopes import (
|
||||
DEFAULT_READ,
|
||||
DEFAULT_STAFF,
|
||||
DEFAULT_SUPERUSER,
|
||||
_roles,
|
||||
get_granular_scope,
|
||||
)
|
||||
|
||||
@@ -46,6 +47,7 @@ def map_scope(
|
||||
read_name=DEFAULT_READ,
|
||||
map_read: Optional[list[str]] = None,
|
||||
map_read_name=DEFAULT_READ,
|
||||
override_all_actions: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Generate the required scopes for OAS permission views.
|
||||
|
||||
@@ -55,6 +57,7 @@ def map_scope(
|
||||
read_name (str): The read scope name to use when `only_read` is True.
|
||||
map_read (Optional[list[str]]): A list of HTTP methods that should map to the default read scope (use if some actions requirea differing role).
|
||||
map_read_name (str): The read scope name to use for methods specified in `map_read` when `map_read` is specified.
|
||||
override_all_actions (Optional[str]): If specified, all actions will be overridden to use the provided action name instead of the default action names.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary mapping HTTP methods to their corresponding scopes.
|
||||
@@ -71,7 +74,7 @@ def map_scope(
|
||||
def get_scope(method, action):
|
||||
if map_read and method in map_read:
|
||||
return [[map_read_name]]
|
||||
return scope_name(action)
|
||||
return scope_name(override_all_actions if override_all_actions else action)
|
||||
|
||||
return {
|
||||
method: get_scope(method, action) if method != 'OPTIONS' else [[DEFAULT_READ]]
|
||||
@@ -92,8 +95,12 @@ for role, tables in roles.items():
|
||||
class OASTokenMixin:
|
||||
"""Mixin that combines the permissions of normal classes and token classes."""
|
||||
|
||||
ENFORCE_USER_PERMS: bool = False
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""Check if the user has the required scopes or was authenticated another way."""
|
||||
if self.ENFORCE_USER_PERMS:
|
||||
return super().has_permission(request, view)
|
||||
return self.check_oauth2_authentication(
|
||||
request, view
|
||||
) or super().has_permission(request, view)
|
||||
@@ -384,3 +391,42 @@ class GlobalSettingsPermissions(OASTokenMixin, permissions.BasePermission):
|
||||
return map_scope(
|
||||
only_read=True, read_name=DEFAULT_STAFF, map_read=permissions.SAFE_METHODS
|
||||
)
|
||||
|
||||
|
||||
class DataImporterPermission(OASTokenMixin, permissions.BasePermission):
|
||||
"""Mixin class for determining if the user has correct permissions."""
|
||||
|
||||
ENFORCE_USER_PERMS = True
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""Class level permission checks are handled via InvenTree.permissions.IsAuthenticatedOrReadScope."""
|
||||
return request.user and request.user.is_authenticated
|
||||
|
||||
def get_required_alternate_scopes(self, request, view):
|
||||
"""Return the required scopes for the current request."""
|
||||
return map_scope(
|
||||
roles=_roles,
|
||||
map_read=permissions.SAFE_METHODS,
|
||||
override_all_actions='change', # this is done to match the custom has_object_permission method
|
||||
)
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
"""Check if the user has permission to access the imported object."""
|
||||
import importer.models
|
||||
|
||||
# For safe methods (GET, HEAD, OPTIONS), allow access
|
||||
if request.method in permissions.SAFE_METHODS:
|
||||
return True
|
||||
|
||||
if isinstance(obj, importer.models.DataImportSession):
|
||||
session = obj
|
||||
else:
|
||||
session = getattr(obj, 'session', None)
|
||||
|
||||
if session:
|
||||
if model_class := session.model_class:
|
||||
return users.permissions.check_user_permission(
|
||||
request.user, model_class, 'change'
|
||||
)
|
||||
|
||||
return True
|
||||
|
@@ -3,6 +3,8 @@
|
||||
from itertools import chain
|
||||
from typing import Optional
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from drf_spectacular.contrib.django_oauth_toolkit import DjangoOAuthToolkitScheme
|
||||
from drf_spectacular.drainage import warn
|
||||
from drf_spectacular.openapi import AutoSchema
|
||||
@@ -174,4 +176,14 @@ def postprocess_print_stats(result, generator, request, public):
|
||||
if scope not in oauth2_scopes:
|
||||
warn(f'unknown scope `{scope}` in {len(paths)} paths')
|
||||
|
||||
# Raise error if the paths missing scopes are not specifically excluded from oauth2
|
||||
wrong_url = [
|
||||
path for path in no_oauth2_wa if path not in settings.OAUTH2_CHECK_EXCLUDED
|
||||
]
|
||||
if len(wrong_url) > 0:
|
||||
warn(
|
||||
f'Found {len(wrong_url)} paths without oauth2 that are not excluded:\n{", ".join(wrong_url)}. '
|
||||
'\n\nPlease check the schema and add oauth2 scopes where necessary.'
|
||||
)
|
||||
|
||||
return result
|
||||
|
@@ -540,15 +540,6 @@ TEMPLATES = [
|
||||
}
|
||||
]
|
||||
|
||||
OAUTH2_PROVIDER = {
|
||||
# default scopes
|
||||
'SCOPES': oauth2_scopes,
|
||||
# OIDC
|
||||
'OIDC_ENABLED': True,
|
||||
'OIDC_RSA_PRIVATE_KEY': get_oidc_private_key(),
|
||||
'PKCE_REQUIRED': False,
|
||||
}
|
||||
|
||||
REST_FRAMEWORK = {
|
||||
'EXCEPTION_HANDLER': 'InvenTree.exceptions.exception_handler',
|
||||
'DATETIME_FORMAT': '%Y-%m-%d %H:%M',
|
||||
@@ -1480,7 +1471,7 @@ if CUSTOM_FLAGS:
|
||||
SESAME_MAX_AGE = 300
|
||||
LOGIN_REDIRECT_URL = '/api/auth/login-redirect/'
|
||||
|
||||
# Configuration for API schema generation
|
||||
# Configuration for API schema generation / oAuth2
|
||||
SPECTACULAR_SETTINGS = {
|
||||
'TITLE': 'InvenTree API',
|
||||
'DESCRIPTION': 'API for InvenTree - the intuitive open source inventory management system',
|
||||
@@ -1515,6 +1506,18 @@ SPECTACULAR_SETTINGS = {
|
||||
'OAUTH2_TOKEN_URL': '/o/token/',
|
||||
'OAUTH2_REFRESH_URL': '/o/revoke_token/',
|
||||
}
|
||||
OAUTH2_PROVIDER = {
|
||||
# default scopes
|
||||
'SCOPES': oauth2_scopes,
|
||||
# OIDC
|
||||
'OIDC_ENABLED': True,
|
||||
'OIDC_RSA_PRIVATE_KEY': get_oidc_private_key(),
|
||||
'PKCE_REQUIRED': False,
|
||||
}
|
||||
OAUTH2_CHECK_EXCLUDED = [ # This setting mutes schema checks for these rule/method combinations
|
||||
'/api/email/generate/:post',
|
||||
'/api/webhook/{endpoint}/:post',
|
||||
]
|
||||
|
||||
if SITE_URL and not TESTING:
|
||||
SPECTACULAR_SETTINGS['SERVERS'] = [{'url': SITE_URL}]
|
||||
|
@@ -4,7 +4,7 @@ from django.shortcuts import get_object_or_404
|
||||
from django.urls import include, path
|
||||
|
||||
from drf_spectacular.utils import extend_schema
|
||||
from rest_framework import permissions, serializers
|
||||
from rest_framework import serializers
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework.response import Response
|
||||
from rest_framework.views import APIView
|
||||
@@ -25,38 +25,13 @@ from InvenTree.mixins import (
|
||||
from users.permissions import check_user_permission
|
||||
|
||||
|
||||
class DataImporterPermission(permissions.BasePermission):
|
||||
"""Mixin class for determining if the user has correct permissions."""
|
||||
|
||||
def has_permission(self, request, view):
|
||||
"""Class level permission checks are handled via InvenTree.permissions.IsAuthenticatedOrReadScope."""
|
||||
return request.user and request.user.is_authenticated
|
||||
|
||||
def has_object_permission(self, request, view, obj):
|
||||
"""Check if the user has permission to access the imported object."""
|
||||
# For safe methods (GET, HEAD, OPTIONS), allow access
|
||||
if request.method in permissions.SAFE_METHODS:
|
||||
return True
|
||||
|
||||
if isinstance(obj, importer.models.DataImportSession):
|
||||
session = obj
|
||||
else:
|
||||
session = getattr(obj, 'session', None)
|
||||
|
||||
if session:
|
||||
if model_class := session.model_class:
|
||||
return check_user_permission(request.user, model_class, 'change')
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class DataImporterPermissionMixin:
|
||||
"""Mixin class for checking permissions on DataImporter objects."""
|
||||
|
||||
# Default permissions: User must be authenticated
|
||||
permission_classes = [
|
||||
InvenTree.permissions.IsAuthenticatedOrReadScope,
|
||||
DataImporterPermission,
|
||||
InvenTree.permissions.DataImporterPermission,
|
||||
]
|
||||
|
||||
|
||||
@@ -96,7 +71,7 @@ class DataImportSessionMixin:
|
||||
|
||||
queryset = importer.models.DataImportSession.objects.all()
|
||||
serializer_class = importer.serializers.DataImportSessionSerializer
|
||||
permission_classes = [DataImporterPermission]
|
||||
permission_classes = [InvenTree.permissions.DataImporterPermission]
|
||||
|
||||
|
||||
class DataImportSessionList(BulkDeleteMixin, DataImportSessionMixin, ListCreateAPI):
|
||||
|
Reference in New Issue
Block a user