2
0
mirror of https://github.com/inventree/InvenTree.git synced 2025-04-28 11:36:44 +00:00

Fix user role caching issues (#8973)

* Fix user role caching issues

* Handle null user case

* Fix typo

* More spelling fixes
This commit is contained in:
Oliver 2025-01-28 20:39:05 +11:00 committed by GitHub
parent cd19a8e508
commit c67e80b50e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 47 deletions

View File

@ -321,7 +321,7 @@ class StockLocationFilter(rest_filters.FilterSet):
def filter_parent(self, queryset, name, value):
"""Filter by parent location.
Note that the filtering behaviour here varies,
Note that the filtering behavior here varies,
depending on whether the 'cascade' value is set.
So, we have to check the "cascade" value here.

View File

@ -328,12 +328,12 @@ def default_delete_on_deplete():
"""Return a default value for the 'delete_on_deplete' field.
Prior to 2022-12-24, this field was set to True by default.
Now, there is a user-configurable setting to govern default behaviour.
Now, there is a user-configurable setting to govern default behavior.
"""
try:
return get_global_setting('STOCK_DELETE_DEPLETED_DEFAULT', True)
except (IntegrityError, OperationalError):
# Revert to original default behaviour
# Revert to original default behavior
return True

View File

@ -561,7 +561,7 @@ class StockItemListTest(StockAPITestCase):
self.assertEqual(response.status_code, status.HTTP_200_OK)
# Return JSON-ified data
# Return JSON data
return response.data
def test_top_level_filtering(self):
@ -1305,7 +1305,7 @@ class StockItemTest(StockAPITestCase):
expected_code=201,
)
def test_stock_item_create_withsupplierpart(self):
def test_stock_item_create_with_supplier_part(self):
"""Test creation of a StockItem via the API, including SupplierPart data."""
# POST with non-existent supplier part
response = self.post(
@ -1543,7 +1543,7 @@ class StockItemTest(StockAPITestCase):
self.assertEqual(data['purchase_price_currency'], 'NZD')
def test_install(self):
"""Test that stock item can be installed into antoher item, via the API."""
"""Test that stock item can be installed into another item, via the API."""
# Select the "parent" stock item
parent_part = part.models.Part.objects.get(pk=100)

View File

@ -8,7 +8,6 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.models import Group, Permission, User
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.validators import MinLengthValidator
from django.db import models
from django.db.models import Q, UniqueConstraint
@ -21,6 +20,7 @@ from django.utils.translation import gettext_lazy as _
import structlog
from rest_framework.authtoken.models import Token as AuthToken
import InvenTree.cache
import InvenTree.helpers
import InvenTree.models
from common.settings import get_global_setting
@ -666,28 +666,20 @@ def update_group_roles(group, debug=False):
)
def clear_user_role_cache(user: User):
"""Remove user role permission information from the cache.
- This function is called whenever the user / group is updated
Args:
user: The User object to be expunged from the cache
"""
for role in RuleSet.get_ruleset_models():
for perm in ['add', 'change', 'view', 'delete']:
key = f'role_{user.pk}_{role}_{perm}'
cache.delete(key)
def check_user_permission(user: User, model, permission):
def check_user_permission(user: User, model, permission) -> bool:
"""Check if the user has a particular permission against a given model type.
Arguments:
user: The user object to check
model: The model class to check (e.g. Part)
model: The model class to check (e.g. 'part')
permission: The permission to check (e.g. 'view' / 'delete')
Returns:
bool: True if the user has the specified permission
"""
if not user:
return False
if user.is_superuser:
return True
@ -695,21 +687,26 @@ def check_user_permission(user: User, model, permission):
return user.has_perm(permission_name)
def check_user_role(user: User, role, permission):
def check_user_role(user: User, role, permission) -> bool:
"""Check if a user has a particular role:permission combination.
If the user is a superuser, this will return True
Arguments:
user: The user object to check
role: The role to check (e.g. 'part' / 'stock')
permission: The permission to check (e.g. 'view' / 'delete')
Returns:
bool: True if the user has the specified role:permission combination
"""
if not user:
return False
if user.is_superuser:
return True
# First, check the cache
key = f'role_{user.pk}_{role}_{permission}'
try:
result = cache.get(key)
except Exception: # pragma: no cover
result = None
# First, check the session cache
cache_key = f'role_{user.pk}_{role}_{permission}'
result = InvenTree.cache.get_session_cache(cache_key)
if result is not None:
return result
@ -736,11 +733,8 @@ def check_user_role(user: User, role, permission):
result = True
break
# Save result to cache
try:
cache.set(key, result, timeout=3600)
except Exception: # pragma: no cover
pass
# Save result to session-cache
InvenTree.cache.set_session_cache(cache_key, result)
return result
@ -923,12 +917,6 @@ def delete_owner(sender, instance, **kwargs):
owner.delete()
@receiver(post_save, sender=get_user_model(), dispatch_uid='clear_user_cache')
def clear_user_cache(sender, instance, **kwargs):
"""Callback function when a user object is saved."""
clear_user_role_cache(instance)
@receiver(post_save, sender=Group, dispatch_uid='create_missing_rule_sets')
def create_missing_rule_sets(sender, instance, **kwargs):
"""Called *after* a Group object is saved.
@ -936,6 +924,3 @@ def create_missing_rule_sets(sender, instance, **kwargs):
As the linked RuleSet instances are saved *before* the Group, then we can now use these RuleSet values to update the group permissions.
"""
update_group_roles(instance)
for user in get_user_model().objects.filter(groups__name=instance.name):
clear_user_role_cache(user)

View File

@ -206,7 +206,7 @@ class UserTokenTests(InvenTreeAPITestCase):
self.client.get(me, expected_code=200)
def test_buildin_token(self):
def test_builtin_token(self):
"""Test the built-in token authentication."""
response = self.post(
reverse('rest_login'),