mirror of
https://github.com/inventree/InvenTree.git
synced 2025-04-28 11:36:44 +00:00
* Updated type hints * Fix tooltip bug * Check user when sending notification * Fix test * Update unit test * More unit test fixes * Tweak playwright tests
This commit is contained in:
parent
b1a264bf2a
commit
cf60d809da
@ -190,26 +190,41 @@ class InvenTreeTaskTests(TestCase):
|
||||
from common.models import NotificationEntry, NotificationMessage
|
||||
|
||||
# Create a staff user (to ensure notifications are sent)
|
||||
User.objects.create_user(username='staff', password='staffpass', is_staff=True)
|
||||
user = User.objects.create_user(
|
||||
username='staff', password='staffpass', is_staff=True
|
||||
)
|
||||
|
||||
n_tasks = Task.objects.count()
|
||||
n_entries = NotificationEntry.objects.count()
|
||||
n_messages = NotificationMessage.objects.count()
|
||||
|
||||
test_data = {
|
||||
'name': 'failed_task',
|
||||
'func': 'InvenTree.tasks.failed_task',
|
||||
'group': 'test',
|
||||
'success': False,
|
||||
'started': timezone.now(),
|
||||
'stopped': timezone.now(),
|
||||
'attempt_count': 10,
|
||||
}
|
||||
|
||||
# Create a 'failed' task in the database
|
||||
# Note: The 'attempt count' is set to 10 to ensure that the task is properly marked as 'failed'
|
||||
Task.objects.create(
|
||||
id=n_tasks + 1,
|
||||
name='failed_task',
|
||||
func='InvenTree.tasks.failed_task',
|
||||
group='test',
|
||||
success=False,
|
||||
started=timezone.now(),
|
||||
stopped=timezone.now(),
|
||||
attempt_count=10,
|
||||
)
|
||||
Task.objects.create(id=n_tasks + 1, **test_data)
|
||||
|
||||
# A new notification entry should be created
|
||||
# A new notification entry should NOT be created (yet) - due to lack of permission for the user
|
||||
self.assertEqual(NotificationEntry.objects.count(), n_entries + 0)
|
||||
self.assertEqual(NotificationMessage.objects.count(), n_messages + 0)
|
||||
|
||||
# Give them all the permissions
|
||||
user.is_superuser = True
|
||||
user.save()
|
||||
|
||||
# Create a 'failed' task in the database
|
||||
# Note: The 'attempt count' is set to 10 to ensure that the task is properly marked as 'failed'
|
||||
Task.objects.create(id=n_tasks + 2, **test_data)
|
||||
|
||||
# A new notification entry should be created (as the user now has permission to see it)
|
||||
self.assertEqual(NotificationEntry.objects.count(), n_entries + 1)
|
||||
self.assertEqual(NotificationMessage.objects.count(), n_messages + 1)
|
||||
|
||||
|
@ -9,7 +9,7 @@ from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Group, Permission
|
||||
from django.contrib.auth.models import Group, Permission, User
|
||||
from django.db import connections, models
|
||||
from django.http.response import StreamingHttpResponse
|
||||
from django.test import TestCase
|
||||
@ -23,16 +23,23 @@ from plugin import registry
|
||||
from plugin.models import PluginConfig
|
||||
|
||||
|
||||
def addUserPermission(user, permission):
|
||||
"""Shortcut function for adding a certain permission to a user."""
|
||||
perm = Permission.objects.get(codename=permission)
|
||||
user.user_permissions.add(perm)
|
||||
def addUserPermission(user: User, app_name: str, model_name: str, perm: str) -> None:
|
||||
"""Add a specific permission for the provided user.
|
||||
|
||||
Arguments:
|
||||
user: The user to add the permission to
|
||||
app_name: The name of the app (e.g. 'part')
|
||||
model_name: The name of the model (e.g. 'location')
|
||||
perm: The permission to add (e.g. 'add', 'change', 'delete', 'view')
|
||||
"""
|
||||
# Get the permission object
|
||||
permission = Permission.objects.get(
|
||||
content_type__model=model_name, codename=f'{perm}_{model_name}'
|
||||
)
|
||||
|
||||
def addUserPermissions(user, permissions):
|
||||
"""Shortcut function for adding multiple permissions to a user."""
|
||||
for permission in permissions:
|
||||
addUserPermission(user, permission)
|
||||
# Add the permission to the user
|
||||
user.user_permissions.add(permission)
|
||||
user.save()
|
||||
|
||||
|
||||
def getMigrationFileNames(app):
|
||||
|
@ -6,6 +6,7 @@ from datetime import timedelta
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Group
|
||||
from django.db.models import Model
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import common.models
|
||||
@ -13,7 +14,7 @@ import InvenTree.helpers
|
||||
from InvenTree.ready import isImportingData, isRebuildingData
|
||||
from plugin import registry
|
||||
from plugin.models import NotificationUserSetting, PluginConfig
|
||||
from users.models import Owner
|
||||
from users.models import Owner, check_user_permission
|
||||
|
||||
logger = logging.getLogger('inventree')
|
||||
|
||||
@ -29,7 +30,7 @@ class NotificationMethod:
|
||||
GLOBAL_SETTING = None
|
||||
USER_SETTING = None
|
||||
|
||||
def __init__(self, obj, category, targets, context) -> None:
|
||||
def __init__(self, obj: Model, category: str, targets: list, context) -> None:
|
||||
"""Check that the method is read.
|
||||
|
||||
This checks that:
|
||||
@ -355,8 +356,19 @@ class InvenTreeNotificationBodies:
|
||||
)
|
||||
|
||||
|
||||
def trigger_notification(obj, category=None, obj_ref='pk', **kwargs):
|
||||
"""Send out a notification."""
|
||||
def trigger_notification(obj: Model, category, obj_ref: str = 'pk', **kwargs):
|
||||
"""Send out a notification.
|
||||
|
||||
Args:
|
||||
obj: The object (model instance) that is triggering the notification
|
||||
category: The category (label) for the notification
|
||||
obj_ref: The reference to the object that should be used for the notification
|
||||
kwargs: Additional arguments to pass to the notification method
|
||||
"""
|
||||
# Check if data is importing currently
|
||||
if isImportingData() or isRebuildingData():
|
||||
return
|
||||
|
||||
targets = kwargs.get('targets')
|
||||
target_fnc = kwargs.get('target_fnc')
|
||||
target_args = kwargs.get('target_args', [])
|
||||
@ -365,10 +377,6 @@ def trigger_notification(obj, category=None, obj_ref='pk', **kwargs):
|
||||
context = kwargs.get('context', {})
|
||||
delivery_methods = kwargs.get('delivery_methods')
|
||||
|
||||
# Check if data is importing currently
|
||||
if isImportingData() or isRebuildingData():
|
||||
return
|
||||
|
||||
# Resolve object reference
|
||||
refs = [obj_ref, 'pk', 'id', 'uid']
|
||||
|
||||
@ -436,26 +444,40 @@ def trigger_notification(obj, category=None, obj_ref='pk', **kwargs):
|
||||
)
|
||||
|
||||
if target_users:
|
||||
logger.info("Sending notification '%s' for '%s'", category, str(obj))
|
||||
# Filter out any users who are inactive, or do not have the required model permissions
|
||||
valid_users = list(
|
||||
filter(
|
||||
lambda u: u and u.is_active and check_user_permission(u, obj, 'view'),
|
||||
list(target_users),
|
||||
)
|
||||
)
|
||||
|
||||
# Collect possible methods
|
||||
if delivery_methods is None:
|
||||
delivery_methods = storage.methods or []
|
||||
else:
|
||||
delivery_methods = delivery_methods - IGNORED_NOTIFICATION_CLS
|
||||
if len(valid_users) > 0:
|
||||
logger.info(
|
||||
"Sending notification '%s' for '%s' to %s users",
|
||||
category,
|
||||
str(obj),
|
||||
len(valid_users),
|
||||
)
|
||||
|
||||
for method in delivery_methods:
|
||||
logger.info("Triggering notification method '%s'", method.METHOD_NAME)
|
||||
try:
|
||||
deliver_notification(method, obj, category, target_users, context)
|
||||
except NotImplementedError as error:
|
||||
# Allow any single notification method to fail, without failing the others
|
||||
logger.error(error)
|
||||
except Exception as error:
|
||||
logger.error(error)
|
||||
# Collect possible methods
|
||||
if delivery_methods is None:
|
||||
delivery_methods = storage.methods or []
|
||||
else:
|
||||
delivery_methods = delivery_methods - IGNORED_NOTIFICATION_CLS
|
||||
|
||||
# Set delivery flag
|
||||
common.models.NotificationEntry.notify(category, obj_ref_value)
|
||||
for method in delivery_methods:
|
||||
logger.info("Triggering notification method '%s'", method.METHOD_NAME)
|
||||
try:
|
||||
deliver_notification(method, obj, category, valid_users, context)
|
||||
except NotImplementedError as error:
|
||||
# Allow any single notification method to fail, without failing the others
|
||||
logger.error(error)
|
||||
except Exception as error:
|
||||
logger.error(error)
|
||||
|
||||
# Set delivery flag
|
||||
common.models.NotificationEntry.notify(category, obj_ref_value)
|
||||
else:
|
||||
logger.info("No possible users for notification '%s'", category)
|
||||
|
||||
@ -479,12 +501,18 @@ def trigger_superuser_notification(plugin: PluginConfig, msg: str):
|
||||
|
||||
|
||||
def deliver_notification(
|
||||
cls: NotificationMethod, obj, category: str, targets, context: dict
|
||||
cls: NotificationMethod, obj: Model, category: str, targets: list, context: dict
|
||||
):
|
||||
"""Send notification with the provided class.
|
||||
|
||||
This:
|
||||
- Intis the method
|
||||
Arguments:
|
||||
cls: The class that should be used to send the notification
|
||||
obj: The object (model instance) that triggered the notification
|
||||
category: The category (label) for the notification
|
||||
targets: List of users that should receive the notification
|
||||
context: Context dictionary with additional information for the notification
|
||||
|
||||
- Initializes the method
|
||||
- Checks that there are valid targets
|
||||
- Runs the delivery setup
|
||||
- Sends notifications either via `send_bulk` or send`
|
||||
|
@ -28,6 +28,7 @@ from InvenTree.unit_test import (
|
||||
InvenTreeAPITestCase,
|
||||
InvenTreeTestCase,
|
||||
PluginMixin,
|
||||
addUserPermission,
|
||||
)
|
||||
from part.models import Part, PartParameterTemplate
|
||||
from plugin import registry
|
||||
@ -1075,6 +1076,9 @@ class NotificationTest(InvenTreeAPITestCase):
|
||||
"""Tests for bulk deletion of user notifications."""
|
||||
from error_report.models import Error
|
||||
|
||||
# Ensure *this* user has permission to view error reports
|
||||
addUserPermission(self.user, 'error_report', 'error', 'view')
|
||||
|
||||
# Create some notification messages by throwing errors
|
||||
for _ii in range(10):
|
||||
Error.objects.create()
|
||||
@ -1086,7 +1090,7 @@ class NotificationTest(InvenTreeAPITestCase):
|
||||
# However, one user is marked as inactive
|
||||
self.assertEqual(messages.count(), 20)
|
||||
|
||||
# Only 10 messages related to *this* user
|
||||
# Only messages related to *this* user
|
||||
my_notifications = messages.filter(user=self.user)
|
||||
self.assertEqual(my_notifications.count(), 10)
|
||||
|
||||
|
@ -11,6 +11,7 @@ import order.tasks
|
||||
from common.models import InvenTreeSetting, NotificationMessage
|
||||
from company.models import Company
|
||||
from InvenTree import status_codes as status
|
||||
from InvenTree.unit_test import addUserPermission
|
||||
from order.models import (
|
||||
SalesOrder,
|
||||
SalesOrderAllocation,
|
||||
@ -318,7 +319,13 @@ class SalesOrderTest(TestCase):
|
||||
|
||||
def test_overdue_notification(self):
|
||||
"""Test overdue sales order notification."""
|
||||
self.order.created_by = get_user_model().objects.get(pk=3)
|
||||
user = get_user_model().objects.get(pk=3)
|
||||
|
||||
addUserPermission(user, 'order', 'salesorder', 'view')
|
||||
user.is_active = True
|
||||
user.save()
|
||||
|
||||
self.order.created_by = user
|
||||
self.order.responsible = Owner.create(obj=Group.objects.get(pk=2))
|
||||
self.order.target_date = datetime.now().date() - timedelta(days=1)
|
||||
self.order.save()
|
||||
|
@ -14,6 +14,7 @@ from djmoney.money import Money
|
||||
import common.models
|
||||
import order.tasks
|
||||
from company.models import Company, SupplierPart
|
||||
from InvenTree.unit_test import addUserPermission
|
||||
from order.status_codes import PurchaseOrderStatus
|
||||
from part.models import Part
|
||||
from stock.models import StockItem, StockLocation
|
||||
@ -320,6 +321,13 @@ class OrderTest(TestCase):
|
||||
"""
|
||||
po = PurchaseOrder.objects.get(pk=1)
|
||||
|
||||
# Ensure that the right users have the right permissions
|
||||
for user_id in [2, 4]:
|
||||
user = get_user_model().objects.get(pk=user_id)
|
||||
addUserPermission(user, 'order', 'purchaseorder', 'view')
|
||||
user.is_active = True
|
||||
user.save()
|
||||
|
||||
# Created by 'sam'
|
||||
po.created_by = get_user_model().objects.get(pk=4)
|
||||
|
||||
|
@ -21,7 +21,7 @@ from common.notifications import UIMessageNotification, storage
|
||||
from common.settings import get_global_setting, set_global_setting
|
||||
from InvenTree import version
|
||||
from InvenTree.templatetags import inventree_extras
|
||||
from InvenTree.unit_test import InvenTreeTestCase
|
||||
from InvenTree.unit_test import InvenTreeTestCase, addUserPermission
|
||||
|
||||
from .models import (
|
||||
Part,
|
||||
@ -807,6 +807,9 @@ class BaseNotificationIntegrationTest(InvenTreeTestCase):
|
||||
self.assertEqual(NotificationEntry.objects.all().count(), 0)
|
||||
|
||||
# Subscribe and run again
|
||||
addUserPermission(self.user, 'part', 'part', 'view')
|
||||
self.user.is_active = True
|
||||
self.user.save()
|
||||
self.part.set_starred(self.user, True)
|
||||
self.part.save()
|
||||
|
||||
|
@ -681,7 +681,7 @@ def clear_user_role_cache(user: User):
|
||||
cache.delete(key)
|
||||
|
||||
|
||||
def check_user_permission(user: User, model, permission):
|
||||
def check_user_permission(user: User, model: models.Model, permission: str) -> bool:
|
||||
"""Check if the user has a particular permission against a given model type.
|
||||
|
||||
Arguments:
|
||||
@ -696,7 +696,7 @@ def check_user_permission(user: User, model, permission):
|
||||
return user.has_perm(permission_name)
|
||||
|
||||
|
||||
def check_user_role(user: User, role, permission):
|
||||
def check_user_role(user: User, role: str, permission: str) -> bool:
|
||||
"""Check if a user has a particular role:permission combination.
|
||||
|
||||
If the user is a superuser, this will return True
|
||||
|
Loading…
x
Reference in New Issue
Block a user