2
0
mirror of https://github.com/inventree/InvenTree.git synced 2025-04-28 03:26:45 +00:00

chore(backend): increase coverage (#9039)

* move version tests

* factor out read_license_file

* add testing for license file

* ignore special case when we create the schema

* extent no found api tests

* extend info view tests

* try fixing test?

* fix?

* test user create api

* measure impact of removing bom import

* remove dead code

* Revert "measure impact of removing bom import"

This reverts commit bb31db05e3dedb09871c3fa0a46d34ce959a309e.

* remove dead code

* remove plugin tags that were made for CUI

* add testing for filters

* add test for config delete

* add more api tests

* adjust tests

* fix test

* use superuser

* adapt error code

* Add test for https://github.com/inventree/InvenTree/pull/9077

* add mixin_available mixin

* make check_reload more observable

* test check_reload too

* test clean_barcode

* reset after testing

* extend datamatrix testing

* debug print

* fix assertation
This commit is contained in:
Matthias Mair 2025-02-17 01:21:58 +01:00 committed by GitHub
parent ed4240a54c
commit 4a9138cc3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 284 additions and 239 deletions

View File

@ -35,6 +35,45 @@ from .version import inventreeApiText
logger = structlog.get_logger('inventree')
def read_license_file(path: Path) -> list:
"""Extract license information from the provided file.
Arguments:
path: Path to the license file
Returns: A list of items containing the license information
"""
# Check if the file exists
if not path.exists():
logger.error("License file not found at '%s'", path)
return []
try:
data = json.loads(path.read_text())
except Exception as e:
logger.exception("Failed to parse license file '%s': %s", path, e)
return []
output = []
names = set()
# Ensure we do not have any duplicate 'name' values in the list
for entry in data:
name = None
for key in entry:
if key.lower() == 'name':
name = entry[key]
break
if name is None or name in names:
continue
names.add(name)
output.append({key.lower(): value for key, value in entry.items()})
return output
class LicenseViewSerializer(serializers.Serializer):
"""Serializer for license information."""
@ -49,47 +88,6 @@ class LicenseView(APIView):
permission_classes = [permissions.IsAuthenticated]
def read_license_file(self, path: Path) -> list:
"""Extract license information from the provided file.
Arguments:
path: Path to the license file
Returns: A list of items containing the license information
"""
# Check if the file exists
if not path.exists():
logger.error("License file not found at '%s'", path)
return []
try:
data = json.loads(path.read_text())
except json.JSONDecodeError as e:
logger.exception("Failed to parse license file '%s': %s", path, e)
return []
except Exception as e:
logger.exception("Exception while reading license file '%s': %s", path, e)
return []
output = []
names = set()
# Ensure we do not have any duplicate 'name' values in the list
for entry in data:
name = None
for key in entry:
if key.lower() == 'name':
name = entry[key]
break
if name is None or name in names:
continue
names.add(name)
output.append({key.lower(): value for key, value in entry.items()})
return output
@extend_schema(responses={200: OpenApiResponse(response=LicenseViewSerializer)})
def get(self, request, *args, **kwargs):
"""Return information about the InvenTree server."""
@ -98,8 +96,8 @@ class LicenseView(APIView):
'web/static/web/.vite/dependencies.json'
)
return JsonResponse({
'backend': self.read_license_file(backend),
'frontend': self.read_license_file(frontend),
'backend': read_license_file(backend),
'frontend': read_license_file(frontend),
})
@ -595,7 +593,7 @@ class MetadataView(RetrieveUpdateAPI):
if model is None:
raise ValidationError(
f"MetadataView called without '{self.MODEL_REF}' parameter"
)
) # pragma: no cover
return model
@ -611,5 +609,5 @@ class MetadataView(RetrieveUpdateAPI):
"""Return MetadataSerializer instance."""
# Detect if we are currently generating the OpenAPI schema
if 'spectacular' in sys.argv:
return MetadataSerializer(Part, *args, **kwargs)
return MetadataSerializer(Part, *args, **kwargs) # pragma: no cover
return MetadataSerializer(self.get_model_type(), *args, **kwargs)

View File

@ -8,15 +8,14 @@ import os
import os.path
import re
from decimal import Decimal, InvalidOperation
from pathlib import Path
from typing import Optional, TypeVar, Union
from typing import Optional, TypeVar
from wsgiref.util import FileWrapper
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from django.conf import settings
from django.contrib.staticfiles.storage import StaticFilesStorage
from django.core.exceptions import FieldError, ValidationError
from django.core.files.storage import Storage, default_storage
from django.core.files.storage import default_storage
from django.http import StreamingHttpResponse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
@ -235,22 +234,6 @@ def str2bool(text, test=True):
return str(text).lower() in ['0', 'n', 'no', 'none', 'f', 'false', 'off']
def str2int(text, default=None):
"""Convert a string to int if possible.
Args:
text: Int like string
default: Return value if str is no int like
Returns:
Converted int value
"""
try:
return int(text)
except Exception:
return default
def is_bool(text):
"""Determine if a string value 'looks' like a boolean."""
return str2bool(text, True) or str2bool(text, False)
@ -902,16 +885,6 @@ def hash_barcode(barcode_data: str) -> str:
return str(barcode_hash.hexdigest())
def hash_file(filename: Union[str, Path], storage: Union[Storage, None] = None):
"""Return the MD5 hash of a file."""
content = (
open(filename, 'rb').read() # noqa: SIM115
if storage is None
else storage.open(str(filename), 'rb').read()
)
return hashlib.md5(content).hexdigest()
def current_time(local=True):
"""Return the current date and time as a datetime object.

View File

@ -205,6 +205,7 @@ PLUGIN_TESTING_SETUP = get_setting(
PLUGIN_TESTING_EVENTS = False # Flag if events are tested right now
PLUGIN_TESTING_EVENTS_ASYNC = False # Flag if events are tested asynchronously
PLUGIN_TESTING_RELOAD = False # Flag if plugin reloading is in testing (check_reload)
PLUGIN_RETRY = get_setting(
'INVENTREE_PLUGIN_RETRY', 'PLUGIN_RETRY', 3, typecast=int

View File

@ -1,12 +1,17 @@
"""Low level tests for the InvenTree API."""
from base64 import b64encode
from pathlib import Path
from tempfile import TemporaryDirectory
from django.urls import reverse
from rest_framework import status
from InvenTree.api import read_license_file
from InvenTree.api_version import INVENTREE_API_VERSION
from InvenTree.unit_test import InvenTreeAPITestCase, InvenTreeTestCase
from InvenTree.version import inventreeApiText, parse_version_text
from users.models import RuleSet, update_group_roles
@ -53,13 +58,15 @@ class HTMLAPITests(InvenTreeTestCase):
self.assertEqual(response.status_code, 200)
def test_not_found(self):
"""Test that the NotFoundView is working."""
response = self.client.get('/api/anc')
self.assertEqual(response.status_code, 404)
"""Test that the NotFoundView is working with all available methods."""
methods = ['options', 'get', 'post', 'patch', 'put', 'delete']
for method in methods:
response = getattr(self.client, method)('/api/anc')
self.assertEqual(response.status_code, 404)
class APITests(InvenTreeAPITestCase):
"""Tests for the InvenTree API."""
class ApiAccessTests(InvenTreeAPITestCase):
"""Tests for various access scenarios with the InvenTree API."""
fixtures = ['location', 'category', 'part', 'stock']
roles = ['part.view']
@ -100,19 +107,6 @@ class APITests(InvenTreeAPITestCase):
self.tokenAuth()
self.assertIsNotNone(self.token)
def test_info_view(self):
"""Test that we can read the 'info-view' endpoint."""
url = reverse('api-inventree-info')
response = self.get(url)
data = response.json()
self.assertIn('server', data)
self.assertIn('version', data)
self.assertIn('instance', data)
self.assertEqual('InvenTree', data['server'])
def test_role_view(self):
"""Test that we can access the 'roles' view for the logged in user.
@ -421,3 +415,91 @@ class SearchTests(InvenTreeAPITestCase):
self.assertEqual(
result['error'], 'User does not have permission to view this model'
)
class GeneralApiTests(InvenTreeAPITestCase):
"""Tests for various api endpoints."""
def test_api_version(self):
"""Test that the API text is correct."""
url = reverse('api-version-text')
response = self.get(url, format='json')
data = response.json()
self.assertEqual(len(data), 10)
response = self.get(reverse('api-version')).json()
self.assertIn('version', response)
self.assertIn('dev', response)
self.assertIn('up_to_date', response)
def test_inventree_api_text_fnc(self):
"""Test that the inventreeApiText function works expected."""
# Normal run
resp = inventreeApiText()
self.assertEqual(len(resp), 10)
# More responses
resp = inventreeApiText(20)
self.assertEqual(len(resp), 20)
# Specific version
resp = inventreeApiText(start_version=5)
self.assertEqual(list(resp)[0], 'v5')
def test_parse_version_text_fnc(self):
"""Test that api version text is correctly parsed."""
resp = parse_version_text()
# Check that all texts are parsed
self.assertEqual(len(resp), INVENTREE_API_VERSION - 1)
def test_api_license(self):
"""Test that the license endpoint is working."""
response = self.get(reverse('api-license')).json()
self.assertIn('backend', response)
self.assertIn('frontend', response)
# Various problem cases
# File does not exist
with self.assertLogs(logger='inventree', level='ERROR') as log:
respo = read_license_file(Path('does not exsist'))
self.assertEqual(respo, [])
self.assertIn('License file not found at', str(log.output))
with TemporaryDirectory() as tmp:
sample_file = Path(tmp, 'temp.txt')
sample_file.write_text('abc')
# File is not a json
with self.assertLogs(logger='inventree', level='ERROR') as log:
respo = read_license_file(sample_file)
self.assertEqual(respo, [])
self.assertIn('Failed to parse license file', str(log.output))
def test_info_view(self):
"""Test that we can read the 'info-view' endpoint."""
url = reverse('api-inventree-info')
response = self.get(url)
data = response.json()
self.assertIn('server', data)
self.assertIn('version', data)
self.assertIn('instance', data)
self.assertEqual('InvenTree', data['server'])
# Test with token
token = self.get(url=reverse('api-token')).data['token']
self.client.logout()
# Anon
response = self.get(url)
self.assertEqual(response.json()['database'], None)
# Staff
response = self.get(url, headers={'Authorization': f'Token {token}'})
self.assertGreater(len(response.json()['database']), 4)

View File

@ -1,45 +0,0 @@
"""Tests for api_version."""
from django.urls import reverse
from InvenTree.api_version import INVENTREE_API_VERSION
from InvenTree.unit_test import InvenTreeAPITestCase
from InvenTree.version import inventreeApiText, parse_version_text
class ApiVersionTests(InvenTreeAPITestCase):
"""Tests for api_version functions and APIs."""
def test_api(self):
"""Test that the API text is correct."""
url = reverse('api-version-text')
response = self.get(url, format='json')
data = response.json()
self.assertEqual(len(data), 10)
response = self.get(reverse('api-version')).json()
self.assertIn('version', response)
self.assertIn('dev', response)
self.assertIn('up_to_date', response)
def test_inventree_api_text(self):
"""Test that the inventreeApiText function works expected."""
# Normal run
resp = inventreeApiText()
self.assertEqual(len(resp), 10)
# More responses
resp = inventreeApiText(20)
self.assertEqual(len(resp), 20)
# Specific version
resp = inventreeApiText(start_version=5)
self.assertEqual(list(resp)[0], 'v5')
def test_parse_version_text(self):
"""Test that api version text is correctly parsed."""
resp = parse_version_text()
# Check that all texts are parsed
self.assertEqual(len(resp), INVENTREE_API_VERSION - 1)

View File

@ -1,14 +1,11 @@
"""Tests for basic notification methods and functions in InvenTree."""
import plugin.templatetags.plugin_extras as plugin_tags
from common.notifications import (
BulkNotificationMethod,
NotificationMethod,
SingleNotificationMethod,
storage,
)
from part.test_part import BaseNotificationIntegrationTest
from plugin.models import NotificationUserSetting
class BaseNotificationTests(BaseNotificationIntegrationTest):
@ -150,21 +147,3 @@ class NotificationUserSettingTests(BaseNotificationIntegrationTest):
# run through notification
self._notification_run(SampleImplementation)
# make sure the array fits
array = storage.get_usersettings(self.user)
setting = NotificationUserSetting.objects.all().first()
# assertions for settings
self.assertEqual(setting.name, 'Enable test notifications')
self.assertEqual(setting.default_value, True)
self.assertEqual(
setting.description, 'Allow sending of test for event notifications'
)
self.assertEqual(setting.units, 'alpha')
# test tag and array
self.assertEqual(
plugin_tags.notification_settings_list({'user': self.user}), array
)
self.assertEqual(array[0]['key'], 'NOTIFICATION_METHOD_TEST')
self.assertEqual(array[0]['method'], 'test')

View File

@ -783,6 +783,9 @@ class PluginSettingsApiTest(PluginMixin, InvenTreeAPITestCase):
# Request with filter
self.get(url, expected_code=200, data={'mixin': 'settings'})
self.get(url, expected_code=200, data={'builtin': True})
self.get(url, expected_code=200, data={'sample': True})
self.get(url, expected_code=200, data={'installed': True})
def test_api_list(self):
"""Test list URL."""

View File

@ -1004,6 +1004,19 @@ class PartAPITest(PartAPITestBase):
self.assertIn(v.pk, id_values)
def test_filter_is_variant(self):
"""Test the is_variant filter."""
url = reverse('api-part-list')
all_count = Part.objects.all().count()
no_var_count = Part.objects.filter(variant_of__isnull=True).count()
response = self.get(url, {'is_variant': False}, expected_code=200)
self.assertEqual(no_var_count, len(response.data))
response = self.get(url, {'is_variant': True}, expected_code=200)
self.assertEqual(all_count - no_var_count, len(response.data))
def test_include_children(self):
"""Test the special 'include_child_categories' flag.

View File

@ -828,18 +828,23 @@ class PluginsRegistry:
return str(data.hexdigest())
def check_reload(self):
"""Determine if the registry needs to be reloaded."""
"""Determine if the registry needs to be reloaded.
Returns True if the registry has changed and was reloaded.
"""
if settings.TESTING:
# Skip if running during unit testing
return
return False
if not canAppAccessDatabase(allow_shell=True):
if not canAppAccessDatabase(
allow_shell=True, allow_test=settings.PLUGIN_TESTING_RELOAD
):
# Skip check if database cannot be accessed
return
return False
if InvenTree.cache.get_session_cache('plugin_registry_checked'):
# Return early if the registry has already been checked (for this request)
return
return False
InvenTree.cache.set_session_cache('plugin_registry_checked', True)
@ -853,11 +858,13 @@ class PluginsRegistry:
reg_hash = get_global_setting('_PLUGIN_REGISTRY_HASH', '', create=False)
except Exception as exc:
logger.exception('Failed to retrieve plugin registry hash: %s', str(exc))
return
return False
if reg_hash and reg_hash != self.registry_hash:
logger.info('Plugin registry hash has changed - reloading')
self.reload_plugins(full_reload=True, force_reload=True, collect=True)
return True
return False
# endregion

View File

@ -1,12 +1,8 @@
"""This module provides template tags for handling plugins."""
from django import template
from django.conf import settings as djangosettings
from django.templatetags.static import static
from django.urls import reverse
from common.notifications import storage
from common.settings import get_global_setting
from plugin.registry import registry
register = template.Library()
@ -30,15 +26,6 @@ def plugin_settings(plugin, *args, **kwargs):
return registry.mixins_settings.get(plugin)
@register.simple_tag(takes_context=True)
def plugin_settings_content(context, plugin, *args, **kwargs):
"""Get the settings content for the plugin."""
plg = registry.get_plugin(plugin)
if hasattr(plg, 'get_settings_content'):
return plg.get_settings_content(context.request)
return None
@register.simple_tag()
def mixin_enabled(plugin, key, *args, **kwargs):
"""Is the mixin registered and configured in the plugin?"""
@ -51,14 +38,6 @@ def mixin_available(mixin, *args, **kwargs):
return len(registry.with_mixin(mixin)) > 0
@register.simple_tag()
def navigation_enabled(*args, **kwargs):
"""Is plugin navigation enabled?"""
if djangosettings.PLUGIN_TESTING:
return True
return get_global_setting('ENABLE_PLUGINS_NAVIGATION') # pragma: no cover
@register.simple_tag()
def safe_url(view_name, *args, **kwargs):
"""Safe lookup fnc for URLs.
@ -69,52 +48,3 @@ def safe_url(view_name, *args, **kwargs):
return reverse(view_name, args=args, kwargs=kwargs)
except Exception:
return None
@register.simple_tag()
def plugin_errors(*args, **kwargs):
"""All plugin errors in the current session."""
return registry.errors
@register.simple_tag(takes_context=True)
def notification_settings_list(context, *args, **kwargs):
"""List of all user notification settings."""
return storage.get_usersettings(user=context.get('user', None))
@register.simple_tag(takes_context=True)
def notification_list(context, *args, **kwargs):
"""List of all notification methods."""
return [
{
'slug': a.METHOD_NAME,
'icon': a.METHOD_ICON,
'setting': a.GLOBAL_SETTING,
'plugin': a.plugin,
'description': a.__doc__,
'name': a.__name__,
}
for a in storage.methods
]
@register.simple_tag(takes_context=True)
def plugin_static(context, file: str, **kwargs):
"""Return the URL for a static file within a plugin.
Arguments:
file: The path to the file within the plugin static directory
Keyword Arguments:
plugin: The plugin slug (optional, will be inferred from the context if not provided)
"""
plugin = context.get('plugin', None)
plugin = plugin.slug if plugin else kwargs.get('plugin')
if not plugin:
return file
return static(f'plugins/{plugin}/{file}')

View File

@ -135,6 +135,21 @@ class PluginDetailAPITest(PluginMixin, InvenTreeAPITestCase):
self.assertEqual(response.status_code, 200)
assert_plugin_active(self, True)
def test_pluginCfg_delete(self):
"""Test deleting a config."""
test_plg = self.plugin_confs.first()
assert test_plg is not None
self.user.is_superuser = True
self.user.save()
url = reverse('api-plugin-detail', kwargs={'plugin': test_plg.key})
response = self.delete(url, {}, expected_code=400)
self.assertIn(
'Plugin cannot be deleted as it is currently active',
str(response.data['detail']),
)
def test_admin_action(self):
"""Test the PluginConfig action commands."""
url = reverse('admin:plugin_pluginconfig_changelist')
@ -302,3 +317,24 @@ class PluginDetailAPITest(PluginMixin, InvenTreeAPITestCase):
url = reverse('api-plugin-metadata', kwargs={'plugin': cfg.key})
self.get(url, expected_code=200)
def test_settings(self):
"""Test settings endpoint for plugin."""
from plugin.registry import registry
registry.set_plugin_state('sample', True)
url = reverse('api-plugin-settings', kwargs={'plugin': 'sample'})
self.get(url, expected_code=200)
def test_registry(self):
"""Test registry endpoint for plugin."""
url = reverse('api-plugin-registry-status')
self.get(url, expected_code=403)
self.user.is_superuser = True
self.user.save()
self.get(url, expected_code=200)
self.user.is_superuser = False
self.user.save()

View File

@ -10,6 +10,7 @@ from pathlib import Path
from unittest import mock
from unittest.mock import patch
from django.conf import settings
from django.test import TestCase, override_settings
import plugin.templatetags.plugin_extras as plugin_tags
@ -58,6 +59,11 @@ class PluginTagTests(TestCase):
# mixin not existing
self.assertEqual(plugin_tags.mixin_enabled(self.plugin_no, key), False)
def test_mixin_available(self):
"""Check that mixin_available works."""
self.assertEqual(plugin_tags.mixin_available('barcode'), True)
self.assertEqual(plugin_tags.mixin_available('wrong'), False)
def test_tag_safe_url(self):
"""Test that the safe url tag works expected."""
# right url
@ -67,10 +73,6 @@ class PluginTagTests(TestCase):
# wrong url
self.assertEqual(plugin_tags.safe_url('indexas'), None)
def test_tag_plugin_errors(self):
"""Test that all errors are listed."""
self.assertEqual(plugin_tags.plugin_errors(), registry.errors)
class InvenTreePluginTests(TestCase):
"""Tests for InvenTreePlugin."""
@ -400,3 +402,22 @@ class RegistryTests(TestCase):
# Finally, ensure that the plugin file is removed after testing
os.remove(dummy_file)
def test_check_reload(self):
"""Test that check_reload works as expected."""
# Check that the registry is not reloaded
self.assertFalse(registry.check_reload())
settings.TESTING = False
settings.PLUGIN_TESTING_RELOAD = True
# Check that the registry is reloaded
registry.reload_plugins(full_reload=True, collect=True, force_reload=True)
self.assertFalse(registry.check_reload())
# Check that changed hashes run through
registry.registry_hash = 'abc'
self.assertTrue(registry.check_reload())
settings.TESTING = True
settings.PLUGIN_TESTING_RELOAD = False

View File

@ -3,6 +3,7 @@
from zoneinfo import ZoneInfo
from django.conf import settings
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
from django.utils import timezone
from django.utils.safestring import SafeString
@ -431,6 +432,16 @@ class BarcodeTagTest(TestCase):
with self.assertRaises(ValueError):
barcode_tags.qrcode('')
def test_clean_barcode(self):
"""Test clean_barcode tag."""
self.assertEqual(barcode_tags.clean_barcode('hello world'), 'hello world')
self.assertEqual(barcode_tags.clean_barcode('`hello world`'), 'hello world')
with self.assertRaises(ValidationError):
self.assertEqual(
barcode_tags.clean_barcode('<b>hello world</b>'), 'hello world'
)
def test_datamatrix(self):
"""Test the datamatrix generation tag."""
# Test with default settings
@ -451,3 +462,12 @@ class BarcodeTagTest(TestCase):
# Test empty tag
with self.assertRaises(ValueError):
barcode_tags.datamatrix('')
# Failure cases with wrong args
datamatrix = barcode_tags.datamatrix(
'hello world', border='abc', fill_color='aaaaaaa', back_color='aaaaaaa'
)
self.assertEqual(
datamatrix,
'data:image/png;charset=utf-8;base64,iVBORw0KGgoAAAANSUhEUgAAABIAAAASCAIAAADZrBkAAAAAlElEQVR4nJ1TQQ7AIAgri///cncw6wroEseBgEFbCgZJnNsFICKOPAAIjeSM5T11IznK5f5WRMgnkhP9JfCcTC/MxFZ5hxLOgqrn3o/z/OqtsNpdSL31Iu9W4Dq8Sulu+q5Nuqa3XYOdnuidlICPpXhZVBruyzAKSZehT+yNlzvZQcq6JiW7Ni592swf/43kdlDfdgMk1eOtR7kWpAAAAABJRU5ErkJggg==',
)

View File

@ -40,7 +40,8 @@ class UserAPITests(InvenTreeAPITestCase):
def test_user_api(self):
"""Tests for User API endpoints."""
response = self.get(reverse('api-user-list'), expected_code=200)
url = reverse('api-user-list')
response = self.get(url, expected_code=200)
# Check the correct number of results was returned
self.assertEqual(len(response.data), User.objects.count())
@ -58,6 +59,32 @@ class UserAPITests(InvenTreeAPITestCase):
self.assertIn('pk', response.data)
self.assertIn('username', response.data)
# Test create user
response = self.post(url, expected_code=403)
self.assertIn(
'You do not have permission to perform this action.', str(response.data)
)
self.user.is_superuser = True
self.user.save()
response = self.post(
url,
data={
'username': 'test',
'first_name': 'Test',
'last_name': 'User',
'email': 'aa@example.org',
},
expected_code=201,
)
self.assertEqual(response.data['username'], 'test')
self.assertEqual(response.data['first_name'], 'Test')
self.assertEqual(response.data['last_name'], 'User')
self.assertEqual(response.data['is_staff'], False)
self.assertEqual(response.data['is_superuser'], False)
self.assertEqual(response.data['is_active'], True)
def test_group_api(self):
"""Tests for the Group API endpoints."""
response = self.get(reverse('api-group-list'), expected_code=200)