mirror of
				https://github.com/inventree/InvenTree.git
				synced 2025-10-31 13:15:43 +00:00 
			
		
		
		
	* bump vers * fix ssl? Added build dependencies for libbz2, libffi, and libssl. * try empty buildpack * clean up * fix ref * remove things we now do not need anymore * add 22.04 as a target * cleanup installer * add changelog entry * add dotenv * update skript * make task more robust for package usage * ensure we have a site-url set * fix style * fix syntax
		
			
				
	
	
		
			2005 lines
		
	
	
		
			59 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2005 lines
		
	
	
		
			59 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Tasks for automating certain actions and interacting with InvenTree from the CLI."""
 | |
| 
 | |
| import json
 | |
| import os
 | |
| import pathlib
 | |
| import re
 | |
| import shutil
 | |
| import subprocess
 | |
| import sys
 | |
| from functools import wraps
 | |
| from pathlib import Path
 | |
| from platform import python_version
 | |
| from typing import Optional
 | |
| 
 | |
| import invoke
 | |
| from invoke import Collection, task
 | |
| from invoke.exceptions import UnexpectedExit
 | |
| 
 | |
| 
 | |
| def is_true(x):
 | |
|     """Shortcut function to determine if a value "looks" like a boolean."""
 | |
|     return str(x).strip().lower() in ['1', 'y', 'yes', 't', 'true', 'on']
 | |
| 
 | |
| 
 | |
| def is_devcontainer_environment():
 | |
|     """Check if the InvenTree environment is running in a development container."""
 | |
|     return is_true(os.environ.get('INVENTREE_DEVCONTAINER', 'False'))
 | |
| 
 | |
| 
 | |
| def is_docker_environment():
 | |
|     """Check if the InvenTree environment is running in a Docker container."""
 | |
|     return is_true(os.environ.get('INVENTREE_DOCKER', 'False'))
 | |
| 
 | |
| 
 | |
| def is_rtd_environment():
 | |
|     """Check if the InvenTree environment is running on ReadTheDocs."""
 | |
|     return is_true(os.environ.get('READTHEDOCS', 'False'))
 | |
| 
 | |
| 
 | |
| def is_debug_environment():
 | |
|     """Check if the InvenTree environment is running in a debug environment."""
 | |
|     return is_true(os.environ.get('INVENTREE_DEBUG', 'False')) or is_true(
 | |
|         os.environ.get('RUNNER_DEBUG', 'False')
 | |
|     )
 | |
| 
 | |
| 
 | |
| def get_version_vals():
 | |
|     """Get values from the VERSION file."""
 | |
|     version_file = local_dir().joinpath('VERSION')
 | |
|     if not version_file.exists():
 | |
|         return {}
 | |
|     try:
 | |
|         from dotenv import dotenv_values
 | |
| 
 | |
|         return dotenv_values(version_file)
 | |
|     except ImportError:
 | |
|         error(
 | |
|             'ERROR: dotenv package not installed. You might not be running in the right environment.'
 | |
|         )
 | |
|         return {}
 | |
| 
 | |
| 
 | |
| def is_pkg_installer(content: Optional[dict] = None, load_content: bool = False):
 | |
|     """Check if the current environment is a package installer by VERSION/environment."""
 | |
|     if load_content:
 | |
|         content = get_version_vals()
 | |
|     return get_installer(content) == 'PKG'
 | |
| 
 | |
| 
 | |
| def is_pkg_installer_by_path():
 | |
|     """Check if the current environment is a package installer by checking the path."""
 | |
|     return len(sys.argv) >= 1 and sys.argv[0].startswith(
 | |
|         '/opt/inventree/env/bin/invoke'
 | |
|     )
 | |
| 
 | |
| 
 | |
| def get_installer(content: Optional[dict] = None):
 | |
|     """Get the installer for the current environment or a content dict."""
 | |
|     if content is None:
 | |
|         content = dict(os.environ)
 | |
|     return content.get('INVENTREE_PKG_INSTALLER', None)
 | |
| 
 | |
| 
 | |
| # region execution logging helpers
 | |
| def task_exception_handler(t, v, tb):
 | |
|     """Handle exceptions raised by tasks.
 | |
| 
 | |
|     The intent here is to provide more 'useful' error messages when tasks fail.
 | |
|     """
 | |
|     sys.__excepthook__(t, v, tb)
 | |
| 
 | |
|     if t is ModuleNotFoundError:
 | |
|         mod_name = str(v).split(' ')[-1].strip("'")
 | |
| 
 | |
|         error(f'Error importing required module: {mod_name}')
 | |
|         warning('- Ensure the correct Python virtual environment is active')
 | |
|         warning(
 | |
|             '- Ensure that the invoke tool is installed in the active Python environment'
 | |
|         )
 | |
|         warning(
 | |
|             "- Ensure all required packages are installed by running 'invoke install'"
 | |
|         )
 | |
| 
 | |
| 
 | |
| sys.excepthook = task_exception_handler
 | |
| 
 | |
| 
 | |
| def wrap_color(text: str, color: str) -> str:
 | |
|     """Wrap text in a color code."""
 | |
|     return f'\033[{color}m{text}\033[0m'
 | |
| 
 | |
| 
 | |
| def success(*args):
 | |
|     """Print a success message to the console."""
 | |
|     msg = ' '.join(map(str, args))
 | |
|     print(wrap_color(msg, '92'))
 | |
| 
 | |
| 
 | |
| def error(*args):
 | |
|     """Print an error message to the console."""
 | |
|     msg = ' '.join(map(str, args))
 | |
|     print(wrap_color(msg, '91'))
 | |
| 
 | |
| 
 | |
| def warning(*args):
 | |
|     """Print a warning message to the console."""
 | |
|     msg = ' '.join(map(str, args))
 | |
|     print(wrap_color(msg, '93'))
 | |
| 
 | |
| 
 | |
| def info(*args):
 | |
|     """Print an informational message to the console."""
 | |
|     msg = ' '.join(map(str, args))
 | |
|     print(wrap_color(msg, '94'))
 | |
| 
 | |
| 
 | |
| def state_logger(fn=None, method_name=None):
 | |
|     """Decorator to log state markers before/after function execution, optionally accepting arguments."""
 | |
| 
 | |
|     def decorator(func):
 | |
|         func.method_name = method_name or f'invoke task named `{func.__name__}`'
 | |
| 
 | |
|         @wraps(func)
 | |
|         def wrapped(c, *args, **kwargs):
 | |
|             do_log = is_debug_environment()
 | |
|             if do_log:
 | |
|                 info(f'# {func.method_name}| start')
 | |
|             func(c, *args, **kwargs)
 | |
|             if do_log:
 | |
|                 info(f'# {func.method_name}| done')
 | |
| 
 | |
|         return wrapped
 | |
| 
 | |
|     if fn and callable(fn):
 | |
|         return decorator(fn)
 | |
|     elif fn and isinstance(fn, str):
 | |
|         method_name = fn
 | |
|     return decorator
 | |
| 
 | |
| 
 | |
| # endregion
 | |
| 
 | |
| 
 | |
| # region environment checks
 | |
| def envcheck_invoke_version():
 | |
|     """Check that the installed invoke version meets minimum requirements."""
 | |
|     MIN_INVOKE_VERSION: str = '2.0.0'
 | |
| 
 | |
|     min_version = tuple(map(int, MIN_INVOKE_VERSION.split('.')))
 | |
|     invoke_version = tuple(map(int, invoke.__version__.split('.')))  # noqa: RUF048
 | |
| 
 | |
|     if invoke_version < min_version:
 | |
|         error(f'The installed invoke version ({invoke.__version__}) is not supported!')
 | |
|         error(f'InvenTree requires invoke version {MIN_INVOKE_VERSION} or above')
 | |
|         sys.exit(1)
 | |
| 
 | |
| 
 | |
| def envcheck_invoke_path():
 | |
|     """Check that the path of the used invoke is correct."""
 | |
|     if is_docker_environment() or is_rtd_environment():
 | |
|         return
 | |
| 
 | |
|     invoke_path: Path = Path(invoke.__file__)
 | |
|     env_path: Path = Path(sys.prefix).resolve()
 | |
|     loc_path: Path = Path(__file__).parent.resolve()
 | |
|     if not invoke_path.is_relative_to(loc_path) and not invoke_path.is_relative_to(
 | |
|         env_path
 | |
|     ):
 | |
|         error('INVE-E2 - Wrong Invoke Path')
 | |
|         error(
 | |
|             f'The invoke tool `{invoke_path}` is not correctly located, ensure you are using the invoke installed in an environment in `{loc_path}` or `{env_path}`'
 | |
|         )
 | |
|         sys.exit(1)
 | |
| 
 | |
| 
 | |
| def envcheck_python_version():
 | |
|     """Check that the installed python version meets minimum requirements.
 | |
| 
 | |
|     If the python version is not sufficient, exits with a non-zero exit code.
 | |
|     """
 | |
|     REQ_MAJOR: int = 3
 | |
|     REQ_MINOR: int = 9
 | |
| 
 | |
|     version = sys.version.split(' ')[0]
 | |
| 
 | |
|     valid: bool = True
 | |
| 
 | |
|     if sys.version_info.major < REQ_MAJOR or (
 | |
|         sys.version_info.major == REQ_MAJOR and sys.version_info.minor < REQ_MINOR
 | |
|     ):
 | |
|         valid = False
 | |
| 
 | |
|     if not valid:
 | |
|         error(f'The installed python version ({version}) is not supported!')
 | |
|         error(f'InvenTree requires Python {REQ_MAJOR}.{REQ_MINOR} or above')
 | |
|         sys.exit(1)
 | |
| 
 | |
| 
 | |
| def envcheck_invoke_cmd():
 | |
|     """Checks if the rights invoke command for the current environment is used."""
 | |
|     first_cmd = sys.argv[0].replace(sys.prefix, '')
 | |
|     intendded = ['/bin/invoke', '/bin/inv']
 | |
| 
 | |
|     correct_cmd: Optional[str] = None
 | |
|     if is_rtd_environment() or is_docker_environment() or is_devcontainer_environment():
 | |
|         return  # Skip invoke command check for Docker/RTD/DevContainer environments
 | |
|     elif is_pkg_installer(load_content=True) and not is_pkg_installer_by_path():
 | |
|         correct_cmd = 'inventree run invoke'
 | |
|     else:
 | |
|         warning('Unknown environment, not checking used invoke command')
 | |
| 
 | |
|     if first_cmd not in intendded:
 | |
|         correct_cmd = correct_cmd if correct_cmd else 'invoke'
 | |
|         error('INVE-W9 - Wrong Invoke Environment')
 | |
|         error(
 | |
|             f'The detected invoke command `{first_cmd}` is not the intended one for this environment, ensure you are using one of the following command(s) `{correct_cmd}`'
 | |
|         )
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     """Main function to check the execution environment."""
 | |
|     envcheck_invoke_version()
 | |
|     envcheck_python_version()
 | |
|     envcheck_invoke_path()
 | |
|     envcheck_invoke_cmd()
 | |
| 
 | |
| 
 | |
| # endregion
 | |
| 
 | |
| 
 | |
| def apps():
 | |
|     """Returns a list of installed apps."""
 | |
|     return [
 | |
|         'build',
 | |
|         'common',
 | |
|         'company',
 | |
|         'importer',
 | |
|         'machine',
 | |
|         'order',
 | |
|         'part',
 | |
|         'report',
 | |
|         'stock',
 | |
|         'users',
 | |
|         'plugin',
 | |
|         'InvenTree',
 | |
|         'generic',
 | |
|         'machine',
 | |
|         'web',
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def content_excludes(
 | |
|     allow_auth: bool = True,
 | |
|     allow_tokens: bool = True,
 | |
|     allow_plugins: bool = True,
 | |
|     allow_sso: bool = True,
 | |
|     allow_session: bool = True,
 | |
| ):
 | |
|     """Returns a list of content types to exclude from import / export.
 | |
| 
 | |
|     Arguments:
 | |
|         allow_auth (bool): Allow user authentication data to be exported / imported
 | |
|         allow_tokens (bool): Allow tokens to be exported / imported
 | |
|         allow_plugins (bool): Allow plugin information to be exported / imported
 | |
|         allow_sso (bool): Allow SSO tokens to be exported / imported
 | |
|         allow_session (bool): Allow user session data to be exported / imported
 | |
|     """
 | |
|     excludes = [
 | |
|         'contenttypes',
 | |
|         'auth.permission',
 | |
|         'error_report.error',
 | |
|         'admin.logentry',
 | |
|         'django_q.schedule',
 | |
|         'django_q.task',
 | |
|         'django_q.ormq',
 | |
|         'exchange.rate',
 | |
|         'exchange.exchangebackend',
 | |
|         'common.dataoutput',
 | |
|         'common.newsfeedentry',
 | |
|         'common.notificationentry',
 | |
|         'common.notificationmessage',
 | |
|         'importer.dataimportsession',
 | |
|         'importer.dataimportcolumnmap',
 | |
|         'importer.dataimportrow',
 | |
|     ]
 | |
| 
 | |
|     # Optionally exclude user auth data
 | |
|     if not allow_auth:
 | |
|         excludes.append('auth.group')
 | |
|         excludes.append('auth.user')
 | |
| 
 | |
|     # Optionally exclude user token information
 | |
|     if not allow_tokens:
 | |
|         excludes.append('users.apitoken')
 | |
| 
 | |
|     # Optionally exclude plugin information
 | |
|     if not allow_plugins:
 | |
|         excludes.append('plugin.pluginconfig')
 | |
|         excludes.append('plugin.pluginsetting')
 | |
| 
 | |
|     # Optionally exclude SSO application information
 | |
|     if not allow_sso:
 | |
|         excludes.append('socialaccount.socialapp')
 | |
|         excludes.append('socialaccount.socialtoken')
 | |
| 
 | |
|     # Optionally exclude user session information
 | |
|     if not allow_session:
 | |
|         excludes.append('sessions.session')
 | |
|         excludes.append('usersessions.usersession')
 | |
| 
 | |
|     return ' '.join([f'--exclude {e}' for e in excludes])
 | |
| 
 | |
| 
 | |
| # region file helpers
 | |
| def local_dir() -> Path:
 | |
|     """Returns the directory of *THIS* file.
 | |
| 
 | |
|     Used to ensure that the various scripts always run
 | |
|     in the correct directory.
 | |
|     """
 | |
|     return Path(__file__).parent.resolve()
 | |
| 
 | |
| 
 | |
| def manage_py_dir():
 | |
|     """Returns the directory of the manage.py file."""
 | |
|     return local_dir().joinpath('src', 'backend', 'InvenTree')
 | |
| 
 | |
| 
 | |
| def manage_py_path():
 | |
|     """Return the path of the manage.py file."""
 | |
|     return manage_py_dir().joinpath('manage.py')
 | |
| 
 | |
| 
 | |
| # endregion
 | |
| 
 | |
| if __name__ in ['__main__', 'tasks']:
 | |
|     main()
 | |
| 
 | |
| 
 | |
| def run(c, cmd, path: Optional[Path] = None, pty=False, env=None):
 | |
|     """Runs a given command a given path.
 | |
| 
 | |
|     Args:
 | |
|         c: Command line context.
 | |
|         cmd: Command to run.
 | |
|         path: Path to run the command in.
 | |
|         pty (bool, optional): Run an interactive session. Defaults to False.
 | |
|         env (dict, optional): Environment variables to pass to the command. Defaults to None.
 | |
|     """
 | |
|     env = env or {}
 | |
|     path = path or local_dir()
 | |
| 
 | |
|     try:
 | |
|         c.run(f'cd "{path}" && {cmd}', pty=pty, env=env)
 | |
|     except UnexpectedExit as e:
 | |
|         error(f"ERROR: InvenTree command failed: '{cmd}'")
 | |
|         warning('- Refer to the error messages in the log above for more information')
 | |
|         raise e
 | |
| 
 | |
| 
 | |
| def manage(c, cmd, pty: bool = False, env=None):
 | |
|     """Runs a given command against django's "manage.py" script.
 | |
| 
 | |
|     Args:
 | |
|         c: Command line context.
 | |
|         cmd: Django command to run.
 | |
|         pty (bool, optional): Run an interactive session. Defaults to False.
 | |
|         env (dict, optional): Environment variables to pass to the command. Defaults to None.
 | |
|     """
 | |
|     run(c, f'python3 manage.py {cmd}', manage_py_dir(), pty, env)
 | |
| 
 | |
| 
 | |
| def yarn(c, cmd):
 | |
|     """Runs a given command against the yarn package manager.
 | |
| 
 | |
|     Args:
 | |
|         c: Command line context.
 | |
|         cmd: Yarn command to run.
 | |
|     """
 | |
|     path = local_dir().joinpath('src', 'frontend')
 | |
|     run(c, cmd, path, False)
 | |
| 
 | |
| 
 | |
| def node_available(versions: bool = False, bypass_yarn: bool = False):
 | |
|     """Checks if the frontend environment (ie node and yarn in bash) is available."""
 | |
| 
 | |
|     def ret(val, val0=None, val1=None):
 | |
|         if versions:
 | |
|             return val, val0, val1
 | |
|         return val
 | |
| 
 | |
|     def check(cmd):
 | |
|         try:
 | |
|             return str(
 | |
|                 subprocess.check_output([cmd], stderr=subprocess.STDOUT, shell=True),
 | |
|                 encoding='utf-8',
 | |
|             ).strip()
 | |
|         except subprocess.CalledProcessError:
 | |
|             return None
 | |
|         except FileNotFoundError:
 | |
|             return None
 | |
| 
 | |
|     yarn_version = check('yarn --version')
 | |
|     node_version = check('node --version')
 | |
| 
 | |
|     # Either yarn is available or we don't care about yarn
 | |
|     yarn_passes = bypass_yarn or yarn_version
 | |
| 
 | |
|     # Print a warning if node is available but yarn is not
 | |
|     if node_version and not yarn_passes:
 | |
|         warning(
 | |
|             'Node is available but yarn is not. Install yarn if you wish to build the frontend.'
 | |
|         )
 | |
| 
 | |
|     # Return the result
 | |
|     return ret(yarn_passes and node_version, node_version, yarn_version)
 | |
| 
 | |
| 
 | |
| @state_logger
 | |
| def check_file_existence(filename: Path, overwrite: bool = False):
 | |
|     """Checks if a file exists and asks the user if it should be overwritten.
 | |
| 
 | |
|     Args:
 | |
|         filename (str): Name of the file to check.
 | |
|         overwrite (bool, optional): Overwrite the file without asking. Defaults to False.
 | |
|     """
 | |
|     if filename.is_file() and overwrite is False:
 | |
|         response = input(
 | |
|             'Warning: file already exists. Do you want to overwrite? [y/N]: '
 | |
|         )
 | |
|         response = str(response).strip().lower()
 | |
| 
 | |
|         if response not in ['y', 'yes']:
 | |
|             error('Cancelled export operation')
 | |
|             sys.exit(1)
 | |
| 
 | |
| 
 | |
| # Install tasks
 | |
| # region tasks
 | |
| @task(help={'uv': 'Use UV (experimental package manager)'})
 | |
| @state_logger('TASK01')
 | |
| def plugins(c, uv=False):
 | |
|     """Installs all plugins as specified in 'plugins.txt'."""
 | |
|     from src.backend.InvenTree.InvenTree.config import (  # type: ignore[import]
 | |
|         get_plugin_file,
 | |
|     )
 | |
| 
 | |
|     plugin_file = get_plugin_file()
 | |
| 
 | |
|     info(f"Installing plugin packages from '{plugin_file}'")
 | |
| 
 | |
|     # Install the plugins
 | |
|     if not uv:
 | |
|         run(c, f"pip3 install --disable-pip-version-check -U -r '{plugin_file}'")
 | |
|     else:
 | |
|         run(c, 'pip3 install --no-cache-dir --disable-pip-version-check uv')
 | |
|         run(c, f"uv pip install -r '{plugin_file}'")
 | |
| 
 | |
|     # Collect plugin static files
 | |
|     manage(c, 'collectplugins')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'uv': 'Use UV package manager (experimental)',
 | |
|         'skip_plugins': 'Skip plugin installation',
 | |
|     }
 | |
| )
 | |
| @state_logger('TASK02')
 | |
| def install(c, uv=False, skip_plugins=False):
 | |
|     """Installs required python packages."""
 | |
|     # Ensure path is relative to *this* directory
 | |
|     INSTALL_FILE = local_dir().joinpath('src/backend/requirements.txt')
 | |
| 
 | |
|     info(f"Installing required python packages from '{INSTALL_FILE}'")
 | |
| 
 | |
|     if not Path(INSTALL_FILE).is_file():
 | |
|         raise FileNotFoundError(f"Requirements file '{INSTALL_FILE}' not found")
 | |
| 
 | |
|     # Install required Python packages with PIP
 | |
|     if not uv:
 | |
|         run(
 | |
|             c,
 | |
|             'pip3 install --no-cache-dir --disable-pip-version-check -U pip setuptools',
 | |
|         )
 | |
|         run(
 | |
|             c,
 | |
|             f'pip3 install --no-cache-dir --disable-pip-version-check -U --require-hashes -r {INSTALL_FILE}',
 | |
|         )
 | |
|     else:
 | |
|         run(
 | |
|             c,
 | |
|             'pip3 install --no-cache-dir --disable-pip-version-check -U uv setuptools',
 | |
|         )
 | |
|         run(c, f'uv pip install -U --require-hashes  -r {INSTALL_FILE}')
 | |
| 
 | |
|     # Run plugins install
 | |
|     if not skip_plugins:
 | |
|         plugins(c, uv=uv)
 | |
| 
 | |
|     # Compile license information
 | |
|     lic_path = manage_py_dir().joinpath('InvenTree', 'licenses.txt')
 | |
|     run(
 | |
|         c,
 | |
|         f'pip-licenses --format=json --with-license-file --no-license-path > {lic_path}',
 | |
|     )
 | |
| 
 | |
|     success('Dependency installation complete')
 | |
| 
 | |
| 
 | |
| @task(help={'tests': 'Set up test dataset at the end'})
 | |
| def setup_dev(c, tests=False):
 | |
|     """Sets up everything needed for the dev environment."""
 | |
|     info("Installing required python packages from 'src/backend/requirements-dev.txt'")
 | |
| 
 | |
|     # Install required Python packages with PIP
 | |
|     run(c, 'pip3 install -U --require-hashes -r src/backend/requirements-dev.txt')
 | |
| 
 | |
|     # Install pre-commit hook
 | |
|     info('Installing pre-commit for checks before git commits...')
 | |
|     run(c, 'pre-commit install')
 | |
| 
 | |
|     # Update all the hooks
 | |
|     run(c, 'pre-commit autoupdate')
 | |
| 
 | |
|     success('pre-commit set up complete')
 | |
| 
 | |
|     # Set up test-data if flag is set
 | |
|     if tests:
 | |
|         setup_test(c)
 | |
| 
 | |
| 
 | |
| # Setup / maintenance tasks
 | |
| 
 | |
| 
 | |
| @task
 | |
| def shell(c):
 | |
|     """Launch a Django shell."""
 | |
|     manage(c, 'shell', pty=True)
 | |
| 
 | |
| 
 | |
| @task
 | |
| def superuser(c):
 | |
|     """Create a superuser/admin account for the database."""
 | |
|     manage(c, 'createsuperuser', pty=True)
 | |
| 
 | |
| 
 | |
| @task
 | |
| def rebuild_models(c):
 | |
|     """Rebuild database models with MPTT structures."""
 | |
|     info('Rebuilding internal database structures')
 | |
|     manage(c, 'rebuild_models', pty=True)
 | |
| 
 | |
| 
 | |
| @task
 | |
| def rebuild_thumbnails(c):
 | |
|     """Rebuild missing image thumbnails."""
 | |
|     from src.backend.InvenTree.InvenTree.config import (  # type: ignore[import]
 | |
|         get_media_dir,
 | |
|     )
 | |
| 
 | |
|     info(f'Rebuilding image thumbnails in {get_media_dir()}')
 | |
|     manage(c, 'rebuild_thumbnails', pty=True)
 | |
| 
 | |
| 
 | |
| @task
 | |
| @state_logger('TASK09')
 | |
| def clean_settings(c):
 | |
|     """Clean the setting tables of old settings."""
 | |
|     info('Cleaning old settings from the database')
 | |
|     manage(c, 'clean_settings')
 | |
|     success('Settings cleaned successfully')
 | |
| 
 | |
| 
 | |
| @task(help={'mail': "mail of the user who's MFA should be disabled"})
 | |
| def remove_mfa(c, mail=''):
 | |
|     """Remove MFA for a user."""
 | |
|     if not mail:
 | |
|         warning('You must provide a users mail')
 | |
|         return
 | |
| 
 | |
|     manage(c, f'remove_mfa {mail}')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'frontend': 'Build the frontend',
 | |
|         'clear': 'Remove existing static files',
 | |
|         'skip_plugins': 'Ignore collection of plugin static files',
 | |
|     }
 | |
| )
 | |
| @state_logger('TASK08')
 | |
| def static(c, frontend=False, clear=True, skip_plugins=False):
 | |
|     """Copies required static files to the STATIC_ROOT directory, as per Django requirements."""
 | |
|     if frontend and node_available():
 | |
|         frontend_compile(c)
 | |
| 
 | |
|     info('Collecting static files...')
 | |
| 
 | |
|     cmd = 'collectstatic --no-input --verbosity 0'
 | |
| 
 | |
|     if clear:
 | |
|         cmd += ' --clear'
 | |
| 
 | |
|     manage(c, cmd)
 | |
| 
 | |
|     # Collect plugin static files
 | |
|     if not skip_plugins:
 | |
|         manage(c, 'collectplugins')
 | |
| 
 | |
|     success('Static files collected successfully')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def translate(c, ignore_static=False, no_frontend=False):
 | |
|     """Rebuild translation source files. Advanced use only!
 | |
| 
 | |
|     Note: This command should not be used on a local install,
 | |
|     it is performed as part of the InvenTree translation toolchain.
 | |
|     """
 | |
|     info('Building translation files')
 | |
| 
 | |
|     # Translate applicable .py / .html / .js files
 | |
|     manage(c, 'makemessages --all -e py,html,js --no-wrap')
 | |
|     manage(c, 'compilemessages')
 | |
| 
 | |
|     if not no_frontend and node_available():
 | |
|         frontend_compile(c, extract=True)
 | |
| 
 | |
|     # Update static files
 | |
|     if not ignore_static:
 | |
|         static(c)
 | |
| 
 | |
|     success('Translation files built successfully')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'clean': 'Clean up old backup files',
 | |
|         'compress': 'Compress the backup files',
 | |
|         'encrypt': 'Encrypt the backup files (requires GPG recipient to be set)',
 | |
|         'path': 'Specify path for generated backup files (leave blank for default path)',
 | |
|         'quiet': 'Suppress informational output (only show errors)',
 | |
|         'skip_db': 'Skip database backup step (only backup media files)',
 | |
|         'skip_media': 'Skip media backup step (only backup database files)',
 | |
|     }
 | |
| )
 | |
| @state_logger('TASK04')
 | |
| def backup(
 | |
|     c,
 | |
|     clean: bool = False,
 | |
|     compress: bool = True,
 | |
|     encrypt: bool = False,
 | |
|     path=None,
 | |
|     quiet: bool = False,
 | |
|     skip_db: bool = False,
 | |
|     skip_media: bool = False,
 | |
| ):
 | |
|     """Backup the database and media files."""
 | |
|     cmd = '--noinput -v 2'
 | |
| 
 | |
|     if compress:
 | |
|         cmd += ' --compress'
 | |
| 
 | |
|     if encrypt:
 | |
|         cmd += ' --encrypt'
 | |
| 
 | |
|     # A path to the backup dir can be specified here
 | |
|     # If not specified, the default backup dir is used
 | |
|     if path:
 | |
|         path = Path(path)
 | |
|         if not os.path.isabs(path):
 | |
|             path = local_dir().joinpath(path).resolve()
 | |
| 
 | |
|         cmd += f' -O {path}'
 | |
| 
 | |
|     if clean:
 | |
|         cmd += ' --clean'
 | |
| 
 | |
|     if quiet:
 | |
|         cmd += ' --quiet'
 | |
| 
 | |
|     if skip_db:
 | |
|         info('Skipping database backup...')
 | |
|     else:
 | |
|         info('Backing up InvenTree database...')
 | |
|         manage(c, f'dbbackup {cmd}')
 | |
| 
 | |
|     if skip_media:
 | |
|         info('Skipping media backup...')
 | |
|     else:
 | |
|         info('Backing up InvenTree media files...')
 | |
|         manage(c, f'mediabackup {cmd}')
 | |
| 
 | |
|     if not skip_db or not skip_media:
 | |
|         success('Backup completed successfully')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'path': 'Specify path to locate backup files (leave blank for default path)',
 | |
|         'db_file': 'Specify filename of compressed database archive (leave blank to use most recent backup)',
 | |
|         'decrypt': 'Decrypt the backup files (requires GPG recipient to be set)',
 | |
|         'media_file': 'Specify filename of compressed media archive (leave blank to use most recent backup)',
 | |
|         'skip_db': 'Do not import database archive (media restore only)',
 | |
|         'skip_media': 'Do not import media archive (database restore only)',
 | |
|         'uncompress': 'Uncompress the backup files before restoring (default behavior)',
 | |
|     }
 | |
| )
 | |
| def restore(
 | |
|     c,
 | |
|     path=None,
 | |
|     db_file=None,
 | |
|     media_file=None,
 | |
|     decrypt: bool = False,
 | |
|     skip_db: bool = False,
 | |
|     skip_media: bool = False,
 | |
|     uncompress: bool = True,
 | |
| ):
 | |
|     """Restore the database and media files."""
 | |
|     base_cmd = '--noinput -v 2'
 | |
| 
 | |
|     if uncompress:
 | |
|         base_cmd += ' --uncompress'
 | |
| 
 | |
|     if decrypt:
 | |
|         base_cmd += ' --decrypt'
 | |
| 
 | |
|     if path:
 | |
|         # Resolve the provided path
 | |
|         path = Path(path)
 | |
|         if not os.path.isabs(path):
 | |
|             path = local_dir().joinpath(path).resolve()
 | |
| 
 | |
|         base_cmd += f' -I {path}'
 | |
| 
 | |
|     if skip_db:
 | |
|         info('Skipping database archive...')
 | |
|     else:
 | |
|         info('Restoring InvenTree database')
 | |
|         cmd = f'dbrestore {base_cmd}'
 | |
| 
 | |
|         if db_file:
 | |
|             cmd += f' -i {db_file}'
 | |
| 
 | |
|         manage(c, cmd)
 | |
| 
 | |
|     if skip_media:
 | |
|         info('Skipping media restore...')
 | |
|     else:
 | |
|         info('Restoring InvenTree media files')
 | |
|         cmd = f'mediarestore {base_cmd}'
 | |
| 
 | |
|         if media_file:
 | |
|             cmd += f' -i {media_file}'
 | |
| 
 | |
|         manage(c, cmd)
 | |
| 
 | |
| 
 | |
| @task()
 | |
| @state_logger()
 | |
| def listbackups(c):
 | |
|     """List available backup files."""
 | |
|     info('Finding available backup files...')
 | |
|     manage(c, 'listbackups')
 | |
| 
 | |
| 
 | |
| @task(post=[rebuild_models, rebuild_thumbnails])
 | |
| @state_logger('TASK05')
 | |
| def migrate(c):
 | |
|     """Performs database migrations.
 | |
| 
 | |
|     This is a critical step if the database schema have been altered!
 | |
|     """
 | |
|     info('Running InvenTree database migrations...')
 | |
| 
 | |
|     # Run custom management command which wraps migrations in "maintenance mode"
 | |
|     manage(c, 'makemigrations')
 | |
|     manage(c, 'runmigrations', pty=True)
 | |
|     manage(c, 'migrate --run-syncdb')
 | |
|     manage(c, 'remove_stale_contenttypes --include-stale-apps --no-input', pty=True)
 | |
| 
 | |
|     success('InvenTree database migrations completed')
 | |
| 
 | |
| 
 | |
| @task(help={'app': 'Specify an app to show migrations for (leave blank for all apps)'})
 | |
| def showmigrations(c, app=''):
 | |
|     """Show the migration status of the database."""
 | |
|     manage(c, f'showmigrations {app}', pty=True)
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     post=[clean_settings],
 | |
|     help={
 | |
|         'skip_backup': 'Skip database backup step (advanced users)',
 | |
|         'frontend': 'Force frontend compilation/download step (ignores INVENTREE_DOCKER)',
 | |
|         'no_frontend': 'Skip frontend compilation/download step',
 | |
|         'skip_static': 'Skip static file collection step',
 | |
|         'uv': 'Use UV (experimental package manager)',
 | |
|     },
 | |
| )
 | |
| @state_logger('TASK03')
 | |
| def update(
 | |
|     c,
 | |
|     skip_backup: bool = False,
 | |
|     frontend: bool = False,
 | |
|     no_frontend: bool = False,
 | |
|     skip_static: bool = False,
 | |
|     uv: bool = False,
 | |
| ):
 | |
|     """Update InvenTree installation.
 | |
| 
 | |
|     This command should be invoked after source code has been updated,
 | |
|     e.g. downloading new code from GitHub.
 | |
| 
 | |
|     The following tasks are performed, in order:
 | |
| 
 | |
|     - install
 | |
|     - backup (optional)
 | |
|     - migrate
 | |
|     - frontend_compile or frontend_download (optional)
 | |
|     - static (optional)
 | |
|     - clean_settings
 | |
|     """
 | |
|     info('Updating InvenTree installation...')
 | |
| 
 | |
|     # Ensure required components are installed
 | |
|     install(c, uv=uv)
 | |
| 
 | |
|     if not skip_backup:
 | |
|         backup(c)
 | |
| 
 | |
|     # Perform database migrations
 | |
|     migrate(c)
 | |
| 
 | |
|     # Stop here if we are not building/downloading the frontend
 | |
|     # If:
 | |
|     # - INVENTREE_DOCKER is set (by the docker image eg.) and not overridden by `--frontend` flag
 | |
|     # - `--no-frontend` flag is set
 | |
|     if (is_docker_environment() and not frontend) or no_frontend:
 | |
|         if no_frontend:
 | |
|             info('Skipping frontend update (no_frontend flag set)')
 | |
|         else:
 | |
|             info('Skipping frontend update (INVENTREE_DOCKER flag set)')
 | |
| 
 | |
|         frontend = False
 | |
|         no_frontend = True
 | |
|     else:
 | |
|         info('Updating frontend...')
 | |
|         # Decide if we should compile the frontend or try to download it
 | |
|         if node_available(bypass_yarn=True):
 | |
|             frontend_compile(c)
 | |
|         else:
 | |
|             frontend_download(c)
 | |
| 
 | |
|     if not skip_static:
 | |
|         # Collect static files
 | |
|         # Note: frontend has already been compiled if required
 | |
|         static(c, frontend=False)
 | |
| 
 | |
|     success('InvenTree update complete!')
 | |
| 
 | |
| 
 | |
| # Data tasks
 | |
| @task(
 | |
|     help={
 | |
|         'filename': "Output filename (default = 'data.json')",
 | |
|         'overwrite': 'Overwrite existing files without asking first (default = False)',
 | |
|         'include_permissions': 'Include user and group permissions in the output file (default = False)',
 | |
|         'include_tokens': 'Include API tokens in the output file (default = False)',
 | |
|         'exclude_plugins': 'Exclude plugin data from the output file (default = False)',
 | |
|         'include_sso': 'Include SSO token data in the output file (default = False)',
 | |
|         'include_session': 'Include user session data in the output file (default = False)',
 | |
|         'retain_temp': 'Retain temporary files (containing permissions) at end of process (default = False)',
 | |
|     }
 | |
| )
 | |
| def export_records(
 | |
|     c,
 | |
|     filename='data.json',
 | |
|     overwrite=False,
 | |
|     include_permissions=False,
 | |
|     include_tokens=False,
 | |
|     exclude_plugins=False,
 | |
|     include_sso=False,
 | |
|     include_session=False,
 | |
|     retain_temp=False,
 | |
| ):
 | |
|     """Export all database records to a file.
 | |
| 
 | |
|     Write data to the file defined by filename.
 | |
|     If --overwrite is not set, the user will be prompted about overwriting an existing files.
 | |
|     If --include-permissions is not set, the file defined by filename will have permissions specified for a user or group removed.
 | |
|     If --delete-temp is not set, the temporary file (which includes permissions) will not be deleted. This file is named filename.tmp
 | |
| 
 | |
|     For historical reasons, calling this function without any arguments will thus result in two files:
 | |
|     - data.json: does not include permissions
 | |
|     - data.json.tmp: includes permissions
 | |
| 
 | |
|     If you want the script to overwrite any existing files without asking, add argument -o / --overwrite.
 | |
| 
 | |
|     If you only want one file, add argument - d / --delete-temp.
 | |
| 
 | |
|     If you want only one file, with permissions, then additionally add argument -i / --include-permissions
 | |
|     """
 | |
|     # Get an absolute path to the file
 | |
|     target = Path(filename)
 | |
|     if not target.is_absolute():
 | |
|         target = local_dir().joinpath(filename).resolve()
 | |
| 
 | |
|     info(f"Exporting database records to file '{target}'")
 | |
| 
 | |
|     check_file_existence(target, overwrite)
 | |
| 
 | |
|     tmpfile = f'{target}.tmp'
 | |
| 
 | |
|     excludes = content_excludes(
 | |
|         allow_tokens=include_tokens,
 | |
|         allow_plugins=not exclude_plugins,
 | |
|         allow_session=include_session,
 | |
|         allow_sso=include_sso,
 | |
|     )
 | |
| 
 | |
|     cmd = f"dumpdata --natural-foreign --indent 2 --output '{tmpfile}' {excludes}"
 | |
| 
 | |
|     # Dump data to temporary file
 | |
|     manage(c, cmd, pty=True)
 | |
| 
 | |
|     info('Running data post-processing step...')
 | |
| 
 | |
|     # Post-process the file, to remove any "permissions" specified for a user or group
 | |
|     with open(tmpfile, encoding='utf-8') as f_in:
 | |
|         data = json.loads(f_in.read())
 | |
| 
 | |
|     data_out = []
 | |
| 
 | |
|     for entry in data:
 | |
|         model_name = entry.get('model', None)
 | |
| 
 | |
|         # Ignore any temporary settings (start with underscore)
 | |
|         if model_name in ['common.inventreesetting', 'common.inventreeusersetting']:
 | |
|             if entry['fields'].get('key', '').startswith('_'):
 | |
|                 continue
 | |
| 
 | |
|         if include_permissions is False:
 | |
|             if model_name == 'auth.group':
 | |
|                 entry['fields']['permissions'] = []
 | |
| 
 | |
|             if model_name == 'auth.user':
 | |
|                 entry['fields']['user_permissions'] = []
 | |
| 
 | |
|         data_out.append(entry)
 | |
| 
 | |
|     # Write the processed data to file
 | |
|     with open(target, 'w', encoding='utf-8') as f_out:
 | |
|         f_out.write(json.dumps(data_out, indent=2))
 | |
| 
 | |
|     if not retain_temp:
 | |
|         info('Removing temporary files')
 | |
|         os.remove(tmpfile)
 | |
| 
 | |
|     success('Data export completed')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'filename': 'Input filename',
 | |
|         'clear': 'Clear existing data before import',
 | |
|         'retain_temp': 'Retain temporary files at end of process (default = False)',
 | |
|     },
 | |
|     post=[rebuild_models, rebuild_thumbnails],
 | |
| )
 | |
| def import_records(
 | |
|     c, filename='data.json', clear: bool = False, retain_temp: bool = False
 | |
| ):
 | |
|     """Import database records from a file."""
 | |
|     # Get an absolute path to the supplied filename
 | |
|     target = Path(filename)
 | |
|     if not target.is_absolute():
 | |
|         target = local_dir().joinpath(filename)
 | |
| 
 | |
|     if not target.exists():
 | |
|         error(f"ERROR: File '{target}' does not exist")
 | |
|         sys.exit(1)
 | |
| 
 | |
|     if clear:
 | |
|         delete_data(c, force=True, migrate=True)
 | |
| 
 | |
|     info(f"Importing database records from '{target}'")
 | |
| 
 | |
|     # We need to load 'auth' data (users / groups) *first*
 | |
|     # This is due to the users.owner model, which has a ContentType foreign key
 | |
|     authfile = f'{target}.auth.json'
 | |
| 
 | |
|     # Pre-process the data, to remove any "permissions" specified for a user or group
 | |
|     datafile = f'{target}.data.json'
 | |
| 
 | |
|     with open(target, encoding='utf-8') as f_in:
 | |
|         try:
 | |
|             data = json.loads(f_in.read())
 | |
|         except json.JSONDecodeError as exc:
 | |
|             error(f'ERROR: Failed to decode JSON file: {exc}')
 | |
|             sys.exit(1)
 | |
| 
 | |
|     auth_data = []
 | |
|     load_data = []
 | |
| 
 | |
|     for entry in data:
 | |
|         if 'model' in entry:
 | |
|             # Clear out any permissions specified for a group
 | |
|             if entry['model'] == 'auth.group':
 | |
|                 entry['fields']['permissions'] = []
 | |
| 
 | |
|             # Clear out any permissions specified for a user
 | |
|             if entry['model'] == 'auth.user':
 | |
|                 entry['fields']['user_permissions'] = []
 | |
| 
 | |
|             # Save auth data for later
 | |
|             if entry['model'].startswith('auth.'):
 | |
|                 auth_data.append(entry)
 | |
|             else:
 | |
|                 load_data.append(entry)
 | |
|         else:
 | |
|             warning('WARNING: Invalid entry in data file')
 | |
|             print(entry)
 | |
| 
 | |
|     # Write the auth file data
 | |
|     with open(authfile, 'w', encoding='utf-8') as f_out:
 | |
|         f_out.write(json.dumps(auth_data, indent=2))
 | |
| 
 | |
|     # Write the processed data to the tmp file
 | |
|     with open(datafile, 'w', encoding='utf-8') as f_out:
 | |
|         f_out.write(json.dumps(load_data, indent=2))
 | |
| 
 | |
|     excludes = content_excludes(allow_auth=False)
 | |
| 
 | |
|     # Import auth models first
 | |
|     info('Importing user auth data...')
 | |
|     cmd = f"loaddata '{authfile}'"
 | |
|     manage(c, cmd, pty=True)
 | |
| 
 | |
|     # Import everything else next
 | |
|     info('Importing database records...')
 | |
|     cmd = f"loaddata '{datafile}' -i {excludes}"
 | |
| 
 | |
|     manage(c, cmd, pty=True)
 | |
| 
 | |
|     if not retain_temp:
 | |
|         info('Removing temporary files')
 | |
|         os.remove(datafile)
 | |
|         os.remove(authfile)
 | |
| 
 | |
|     info('Data import completed')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'force': 'Force deletion of all data without confirmation',
 | |
|         'migrate': 'Run migrations before deleting data (default = False)',
 | |
|     }
 | |
| )
 | |
| def delete_data(c, force: bool = False, migrate: bool = False):
 | |
|     """Delete all database records!
 | |
| 
 | |
|     Warning: This will REALLY delete all records in the database!!
 | |
|     """
 | |
|     info('Deleting all data from InvenTree database...')
 | |
| 
 | |
|     if migrate:
 | |
|         manage(c, 'migrate --run-syncdb')
 | |
| 
 | |
|     if force:
 | |
|         manage(c, 'flush --noinput')
 | |
|     else:
 | |
|         manage(c, 'flush')
 | |
| 
 | |
| 
 | |
| @task(post=[rebuild_models, rebuild_thumbnails])
 | |
| def import_fixtures(c):
 | |
|     """Import fixture data into the database.
 | |
| 
 | |
|     This command imports all existing test fixture data into the database.
 | |
| 
 | |
|     Warning:
 | |
|         - Intended for testing / development only!
 | |
|         - Running this command may overwrite existing database data!!
 | |
|         - Don't say you were not warned...
 | |
|     """
 | |
|     fixtures = [
 | |
|         # Build model
 | |
|         'build',
 | |
|         # Common models
 | |
|         'settings',
 | |
|         # Company model
 | |
|         'company',
 | |
|         'price_breaks',
 | |
|         'supplier_part',
 | |
|         # Order model
 | |
|         'order',
 | |
|         # Part model
 | |
|         'bom',
 | |
|         'category',
 | |
|         'params',
 | |
|         'part',
 | |
|         'test_templates',
 | |
|         # Stock model
 | |
|         'location',
 | |
|         'stock_tests',
 | |
|         'stock',
 | |
|         # Users
 | |
|         'users',
 | |
|     ]
 | |
| 
 | |
|     command = 'loaddata ' + ' '.join(fixtures)
 | |
| 
 | |
|     manage(c, command, pty=True)
 | |
| 
 | |
| 
 | |
| # Execution tasks
 | |
| @task
 | |
| @state_logger('TASK10')
 | |
| def wait(c):
 | |
|     """Wait until the database connection is ready."""
 | |
|     info('Waiting for database connection...')
 | |
|     return manage(c, 'wait_for_db')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     pre=[wait],
 | |
|     help={
 | |
|         'address': 'Server address:port (default=0.0.0.0:8000)',
 | |
|         'workers': 'Specify number of worker threads (override config file)',
 | |
|     },
 | |
| )
 | |
| def gunicorn(c, address='0.0.0.0:8000', workers=None):
 | |
|     """Launch a gunicorn webserver.
 | |
| 
 | |
|     Note: This server will not auto-reload in response to code changes.
 | |
|     """
 | |
|     config_file = local_dir().joinpath('contrib', 'container', 'gunicorn.conf.py')
 | |
|     cmd = f'gunicorn -c {config_file} InvenTree.wsgi -b {address} --chdir {manage_py_dir()}'
 | |
| 
 | |
|     if workers:
 | |
|         cmd += f' --workers={workers}'
 | |
| 
 | |
|     info(f'Starting Gunicorn Server: {cmd}')
 | |
|     run(c, cmd, pty=True)
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     pre=[wait],
 | |
|     help={
 | |
|         'address': 'Server address:port (default=0.0.0.0:8000)',
 | |
|         'no_reload': 'Do not automatically reload the server in response to code changes',
 | |
|         'no_threading': 'Disable multi-threading for the development server',
 | |
|     },
 | |
| )
 | |
| def server(c, address='0.0.0.0:8000', no_reload=False, no_threading=False):
 | |
|     """Launch a (development) server using Django's in-built webserver.
 | |
| 
 | |
|     - This is *not* sufficient for a production installation.
 | |
|     - The default address exposes the server on all network interfaces.
 | |
|     """
 | |
|     cmd = f'runserver {address}'
 | |
| 
 | |
|     if no_reload:
 | |
|         cmd += ' --noreload'
 | |
| 
 | |
|     if no_threading:
 | |
|         cmd += ' --nothreading'
 | |
| 
 | |
|     manage(c, cmd, pty=True)
 | |
| 
 | |
| 
 | |
| @task(pre=[wait])
 | |
| def worker(c):
 | |
|     """Run the InvenTree background worker process."""
 | |
|     manage(c, 'qcluster', pty=True)
 | |
| 
 | |
| 
 | |
| @task(post=[static, server])
 | |
| def test_translations(c):
 | |
|     """Add a fictional language to test if each component is ready for translations."""
 | |
|     import django
 | |
|     from django.conf import settings
 | |
| 
 | |
|     # setup django
 | |
|     base_path = Path.cwd()
 | |
|     new_base_path = pathlib.Path('InvenTree').resolve()
 | |
|     sys.path.append(str(new_base_path))
 | |
|     os.chdir(new_base_path)
 | |
|     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'InvenTree.settings')
 | |
|     django.setup()
 | |
| 
 | |
|     # Add language
 | |
|     info('Add dummy language...')
 | |
|     manage(c, 'makemessages -e py,html,js --no-wrap -l xx')
 | |
| 
 | |
|     # change translation
 | |
|     info('Fill in dummy translations...')
 | |
| 
 | |
|     file_path = pathlib.Path(settings.LOCALE_PATHS[0], 'xx', 'LC_MESSAGES', 'django.po')
 | |
|     new_file_path = Path(str(file_path) + '_new')
 | |
| 
 | |
|     # compile regex
 | |
|     reg = re.compile(
 | |
|         r'[a-zA-Z0-9]{1}'  # match any single letter and number
 | |
|         + r'(?![^{\(\<]*[}\)\>])'  # that is not inside curly brackets, brackets or a tag
 | |
|         + r'(?<![^\%][^\(][)][a-z])'  # that is not a specially formatted variable with singles
 | |
|         + r'(?![^\\][\n])'  # that is not a newline
 | |
|     )
 | |
|     last_string = ''
 | |
| 
 | |
|     # loop through input file lines
 | |
|     with open(file_path, encoding='utf-8') as file_org:
 | |
|         with open(new_file_path, 'w', encoding='utf-8') as file_new:
 | |
|             for line in file_org:
 | |
|                 if line.startswith('msgstr "'):
 | |
|                     # write output -> replace regex matches with x in the read in (multi)string
 | |
|                     file_new.write(f'msgstr "{reg.sub("x", last_string[7:-2])}"\n')
 | |
|                     last_string = ''  # reset (multi)string
 | |
|                 elif line.startswith('msgid "'):
 | |
|                     last_string = (
 | |
|                         last_string + line
 | |
|                     )  # a new translatable string starts -> start append
 | |
|                     file_new.write(line)
 | |
|                 else:
 | |
|                     if last_string:
 | |
|                         last_string = (
 | |
|                             last_string + line
 | |
|                         )  # a string is being read in -> continue appending
 | |
|                     file_new.write(line)
 | |
| 
 | |
|     # change out translation files
 | |
|     file_path.rename(str(file_path) + '_old')
 | |
|     new_file_path.rename(file_path)
 | |
| 
 | |
|     # compile languages
 | |
|     info('Compile languages ...')
 | |
|     manage(c, 'compilemessages')
 | |
| 
 | |
|     # reset cwd
 | |
|     os.chdir(base_path)
 | |
| 
 | |
|     # set env flag
 | |
|     os.environ['TEST_TRANSLATIONS'] = 'True'
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'check': 'Run sanity check on the django install (default = False)',
 | |
|         'disable_pty': 'Disable PTY',
 | |
|         'runtest': 'Specify which tests to run, in format <module>.<file>.<class>.<method>',
 | |
|         'migrations': 'Run migration unit tests',
 | |
|         'report': 'Display a report of slow tests',
 | |
|         'coverage': 'Run code coverage analysis (requires coverage package)',
 | |
|         'translations': 'Compile translations before running tests',
 | |
|         'keepdb': 'Keep the test database after running tests (default = False)',
 | |
|     }
 | |
| )
 | |
| def test(
 | |
|     c,
 | |
|     check=False,
 | |
|     disable_pty=False,
 | |
|     runtest='',
 | |
|     migrations=False,
 | |
|     report=False,
 | |
|     coverage=False,
 | |
|     translations=False,
 | |
|     keepdb=False,
 | |
| ):
 | |
|     """Run unit-tests for InvenTree codebase.
 | |
| 
 | |
|     To run only certain test, use the argument --runtest.
 | |
|     This can filter all the way down to:
 | |
|         <module>.<file>.<class>.<method>
 | |
| 
 | |
|     Example:
 | |
|         test --runtest=company.test_api
 | |
|     will run tests in the company/test_api.py file.
 | |
|     """
 | |
|     # Run sanity check on the django install
 | |
|     if check:
 | |
|         manage(c, 'check')
 | |
| 
 | |
|     if translations:
 | |
|         try:
 | |
|             manage(c, 'compilemessages', pty=True)
 | |
|         except Exception:
 | |
|             warning('Failed to compile translations')
 | |
| 
 | |
|     pty = not disable_pty
 | |
| 
 | |
|     tested_apps = ' '.join(apps())
 | |
| 
 | |
|     cmd = 'test'
 | |
| 
 | |
|     if runtest:
 | |
|         # Specific tests to run
 | |
|         cmd += f' {runtest}'
 | |
|     else:
 | |
|         # Run all tests
 | |
|         cmd += f' {tested_apps}'
 | |
| 
 | |
|     if report:
 | |
|         cmd += ' --slowreport'
 | |
| 
 | |
|     if keepdb:
 | |
|         cmd += ' --keepdb'
 | |
| 
 | |
|     if migrations:
 | |
|         cmd += ' --tag migration_test'
 | |
|     else:
 | |
|         cmd += ' --exclude-tag migration_test'
 | |
| 
 | |
|     if coverage:
 | |
|         # Run tests within coverage environment, and generate report
 | |
|         run(c, f'coverage run {manage_py_path()} {cmd}')
 | |
|         run(c, 'coverage xml -i')
 | |
|     else:
 | |
|         # Run simple test runner, without coverage
 | |
|         manage(c, cmd, pty=pty)
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'dev': 'Set up development environment at the end',
 | |
|         'validate_files': 'Validate media files are correctly copied',
 | |
|         'use_ssh': 'Use SSH protocol for cloning the demo dataset (requires SSH key)',
 | |
|     }
 | |
| )
 | |
| def setup_test(
 | |
|     c,
 | |
|     ignore_update=False,
 | |
|     dev=False,
 | |
|     validate_files=False,
 | |
|     use_ssh=False,
 | |
|     path='inventree-demo-dataset',
 | |
| ):
 | |
|     """Setup a testing environment."""
 | |
|     from src.backend.InvenTree.InvenTree.config import (  # type: ignore[import]
 | |
|         get_media_dir,
 | |
|     )
 | |
| 
 | |
|     if not ignore_update:
 | |
|         update(c)
 | |
| 
 | |
|     template_dir = local_dir().joinpath(path)
 | |
| 
 | |
|     # Remove old data directory
 | |
|     if template_dir.exists():
 | |
|         info('Removing old data ...')
 | |
|         run(c, f'rm {template_dir} -r')
 | |
| 
 | |
|     URL = 'https://github.com/inventree/demo-dataset'
 | |
| 
 | |
|     if use_ssh:
 | |
|         # Use SSH protocol for cloning the demo dataset
 | |
|         URL = 'git@github.com:inventree/demo-dataset.git'
 | |
| 
 | |
|     # Get test data
 | |
|     info('Cloning demo dataset ...')
 | |
|     run(c, f'git clone {URL} {template_dir} -v --depth=1')
 | |
| 
 | |
|     # Make sure migrations are done - might have just deleted sqlite database
 | |
|     if not ignore_update:
 | |
|         migrate(c)
 | |
| 
 | |
|     # Load data
 | |
|     info('Loading database records ...')
 | |
|     import_records(c, filename=template_dir.joinpath('inventree_data.json'), clear=True)
 | |
| 
 | |
|     # Copy media files
 | |
|     src = template_dir.joinpath('media')
 | |
|     dst = get_media_dir()
 | |
|     info(f'Copying media files - "{src}" to "{dst}"')
 | |
|     shutil.copytree(src, dst, dirs_exist_ok=True)
 | |
| 
 | |
|     if validate_files:
 | |
|         info(' - Validating media files')
 | |
|         missing = False
 | |
|         # Check that the media files are correctly copied across
 | |
|         for dirpath, _dirnames, filenames in os.walk(src):
 | |
|             rel_path = os.path.relpath(dirpath, src)
 | |
|             dst_path = os.path.join(dst, rel_path)
 | |
| 
 | |
|             if not os.path.exists(dst_path):
 | |
|                 error(f' - Missing directory: {dst_path}')
 | |
|                 missing = True
 | |
|                 continue
 | |
| 
 | |
|             for filename in filenames:
 | |
|                 dst_file = os.path.join(dst_path, filename)
 | |
|                 if not os.path.exists(dst_file):
 | |
|                     missing = True
 | |
|                     error(f' - Missing file: {dst_file}')
 | |
| 
 | |
|         if missing:
 | |
|             raise FileNotFoundError('Media files not correctly copied')
 | |
|         else:
 | |
|             success(' - All media files correctly copied')
 | |
| 
 | |
|     info('Done setting up test environment...')
 | |
| 
 | |
|     # Set up development setup if flag is set
 | |
|     if dev:
 | |
|         setup_dev(c)
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'filename': "Output filename (default = 'schema.yml')",
 | |
|         'overwrite': 'Overwrite existing files without asking first (default = off/False)',
 | |
|         'no_default': 'Do not use default settings for schema (default = off/False)',
 | |
|     }
 | |
| )
 | |
| @state_logger('TASK11')
 | |
| def schema(
 | |
|     c, filename='schema.yml', overwrite=False, ignore_warnings=False, no_default=False
 | |
| ):
 | |
|     """Export current API schema."""
 | |
|     filename = Path(filename).resolve()
 | |
|     check_file_existence(filename, overwrite)
 | |
| 
 | |
|     info(f"Exporting schema file to '{filename}'")
 | |
| 
 | |
|     cmd = f'schema --file {filename} --validate --color'
 | |
| 
 | |
|     if not ignore_warnings:
 | |
|         cmd += ' --fail-on-warn'
 | |
| 
 | |
|     envs = {}
 | |
|     if not no_default:
 | |
|         envs['INVENTREE_SITE_URL'] = (
 | |
|             'http://localhost:8000'  # Default site URL - to ensure server field is stable
 | |
|         )
 | |
|         envs['INVENTREE_PLUGINS_ENABLED'] = (
 | |
|             'False'  # Disable plugins to ensure they are kep out of schema
 | |
|         )
 | |
|         envs['INVENTREE_CURRENCY_CODES'] = (
 | |
|             'AUD,CNY,EUR,USD'  # Default currency codes to ensure they are stable
 | |
|         )
 | |
| 
 | |
|     manage(c, cmd, pty=True, env=envs)
 | |
| 
 | |
|     assert filename.exists()
 | |
| 
 | |
|     success(f'Schema export completed: {filename}')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def export_settings_definitions(c, filename='inventree_settings.json', overwrite=False):
 | |
|     """Export settings definition to a JSON file."""
 | |
|     filename = Path(filename).resolve()
 | |
|     check_file_existence(filename, overwrite)
 | |
| 
 | |
|     info(f"Exporting settings definition to '{filename}'...")
 | |
|     manage(c, f'export_settings_definitions {filename}', pty=True)
 | |
| 
 | |
| 
 | |
| @task(help={'basedir': 'Export to a base directory (default = False)'})
 | |
| def export_definitions(c, basedir: str = ''):
 | |
|     """Export various definitions."""
 | |
|     if basedir != '' and basedir.endswith('/') is False:
 | |
|         basedir += '/'
 | |
|     base_path = Path(basedir, 'generated').resolve()
 | |
| 
 | |
|     filenames = [
 | |
|         base_path.joinpath('inventree_settings.json'),
 | |
|         base_path.joinpath('inventree_tags.yml'),
 | |
|         base_path.joinpath('inventree_filters.yml'),
 | |
|         base_path.joinpath('inventree_report_context.json'),
 | |
|     ]
 | |
| 
 | |
|     info('Exporting definitions...')
 | |
|     export_settings_definitions(c, overwrite=True, filename=filenames[0])
 | |
| 
 | |
|     check_file_existence(filenames[1], overwrite=True)
 | |
|     manage(c, f'export_tags {filenames[1]}', pty=True)
 | |
| 
 | |
|     check_file_existence(filenames[2], overwrite=True)
 | |
|     manage(c, f'export_filters {filenames[2]}', pty=True)
 | |
| 
 | |
|     check_file_existence(filenames[3], overwrite=True)
 | |
|     manage(c, f'export_report_context {filenames[3]}', pty=True)
 | |
| 
 | |
|     info('Exporting definitions complete')
 | |
| 
 | |
| 
 | |
| @task(default=True)
 | |
| def version(c):
 | |
|     """Show the current version of InvenTree."""
 | |
|     import src.backend.InvenTree.InvenTree.version as InvenTreeVersion  # type: ignore[import]
 | |
|     from src.backend.InvenTree.InvenTree.config import (  # type: ignore[import]
 | |
|         get_backup_dir,
 | |
|         get_config_file,
 | |
|         get_media_dir,
 | |
|         get_plugin_file,
 | |
|         get_static_dir,
 | |
|     )
 | |
| 
 | |
|     # Gather frontend version information
 | |
|     _, node, yarn = node_available(versions=True)
 | |
| 
 | |
|     invoke_path = Path(invoke.__file__).resolve()
 | |
| 
 | |
|     # Special output messages
 | |
|     NOT_SPECIFIED = wrap_color('NOT SPECIFIED', '91')
 | |
|     NA = wrap_color('N/A', '93')
 | |
| 
 | |
|     platform = NOT_SPECIFIED
 | |
| 
 | |
|     if is_pkg_installer():
 | |
|         platform = 'Package Installer'
 | |
|     elif is_docker_environment():
 | |
|         platform = 'Docker'
 | |
|     elif is_devcontainer_environment():
 | |
|         platform = 'Devcontainer'
 | |
|     elif is_rtd_environment():
 | |
|         platform = 'ReadTheDocs'
 | |
| 
 | |
|     print(
 | |
|         f"""
 | |
| InvenTree - inventree.org
 | |
| The Open-Source Inventory Management System\n
 | |
| 
 | |
| Python paths:
 | |
| Executable  {sys.executable}
 | |
| Environment {sys.prefix}
 | |
| Invoke Tool {invoke_path}
 | |
| 
 | |
| Installation paths:
 | |
| Base        {local_dir()}
 | |
| Config      {get_config_file()}
 | |
| Plugin File {get_plugin_file() or NOT_SPECIFIED}
 | |
| Media       {get_media_dir(error=False) or NOT_SPECIFIED}
 | |
| Static      {get_static_dir(error=False) or NOT_SPECIFIED}
 | |
| Backup      {get_backup_dir(error=False) or NOT_SPECIFIED}
 | |
| 
 | |
| Versions:
 | |
| InvenTree   {InvenTreeVersion.inventreeVersion()}
 | |
| API         {InvenTreeVersion.inventreeApiVersion()}
 | |
| Python      {python_version()}
 | |
| Django      {InvenTreeVersion.inventreeDjangoVersion()}
 | |
| Node        {node if node else NA}
 | |
| Yarn        {yarn if yarn else NA}
 | |
| 
 | |
| Environment:
 | |
| Platform    {platform}
 | |
| Debug       {is_debug_environment()}
 | |
| 
 | |
| Commit hash: {InvenTreeVersion.inventreeCommitHash()}
 | |
| Commit date: {InvenTreeVersion.inventreeCommitDate()}"""
 | |
|     )
 | |
|     if is_pkg_installer_by_path():
 | |
|         print(
 | |
|             """
 | |
| You are probably running the package installer / single-line installer. Please mention this in any bug reports!
 | |
| 
 | |
| Use '--list' for a list of available commands
 | |
| Use '--help' for help on a specific command"""
 | |
|         )
 | |
| 
 | |
| 
 | |
| @task()
 | |
| def frontend_check(c):
 | |
|     """Check if frontend is available."""
 | |
|     print(node_available())
 | |
| 
 | |
| 
 | |
| @task(help={'extract': 'Extract translation strings. Default: False'})
 | |
| @state_logger('TASK06')
 | |
| def frontend_compile(c, extract: bool = False):
 | |
|     """Generate react frontend.
 | |
| 
 | |
|     Arguments:
 | |
|         c: Context variable
 | |
|         extract (bool): Whether to extract translations from source code. Defaults to False.
 | |
|     """
 | |
|     info('Compiling frontend code...')
 | |
|     frontend_install(c)
 | |
|     frontend_trans(c, extract=extract)
 | |
|     frontend_build(c)
 | |
|     success('Frontend compilation complete')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def frontend_install(c):
 | |
|     """Install frontend requirements.
 | |
| 
 | |
|     Args:
 | |
|         c: Context variable
 | |
|     """
 | |
|     info('Installing frontend dependencies')
 | |
|     yarn(c, 'yarn install')
 | |
| 
 | |
| 
 | |
| @task(help={'extract': 'Extract translations (changes sourcecode), default: True'})
 | |
| def frontend_trans(c, extract: bool = True):
 | |
|     """Compile frontend translations.
 | |
| 
 | |
|     Args:
 | |
|         c: Context variable
 | |
|         extract (bool): Whether to extract translations from source code. Defaults to True.
 | |
|     """
 | |
|     info('Compiling frontend translations')
 | |
|     if extract:
 | |
|         yarn(c, 'yarn run extract')
 | |
|     yarn(c, 'yarn run compile')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def frontend_build(c):
 | |
|     """Build frontend.
 | |
| 
 | |
|     Args:
 | |
|         c: Context variable
 | |
|     """
 | |
|     info('Building frontend')
 | |
|     yarn(c, 'yarn run build')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def frontend_server(c):
 | |
|     """Start frontend development server.
 | |
| 
 | |
|     Args:
 | |
|         c: Context variable
 | |
|     """
 | |
|     info('Starting frontend development server')
 | |
|     yarn(c, 'yarn run compile')
 | |
|     yarn(c, 'yarn run dev --host')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def frontend_test(c, host: str = '0.0.0.0'):
 | |
|     """Start the playwright test runner for the frontend code."""
 | |
|     info('Starting frontend test runner')
 | |
| 
 | |
|     frontend_path = local_dir().joinpath('src', 'frontend').resolve()
 | |
| 
 | |
|     cmd = 'npx playwright test --ui'
 | |
| 
 | |
|     if host:
 | |
|         cmd += f' --ui-host={host}'
 | |
| 
 | |
|     run(c, cmd, path=frontend_path)
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'ref': 'git ref, default: current git ref',
 | |
|         'tag': 'git tag to look for release',
 | |
|         'file': 'destination to frontend-build.zip file',
 | |
|         'repo': 'GitHub repository, default: InvenTree/inventree',
 | |
|         'extract': 'Also extract and place at the correct destination, default: True',
 | |
|         'clean': 'Delete old files from InvenTree/web/static/web first, default: True',
 | |
|     }
 | |
| )
 | |
| @state_logger('TASK07')
 | |
| def frontend_download(
 | |
|     c,
 | |
|     ref=None,
 | |
|     tag=None,
 | |
|     file=None,
 | |
|     repo='InvenTree/inventree',
 | |
|     extract=True,
 | |
|     clean=True,
 | |
| ):
 | |
|     """Download a pre-build frontend from GitHub if you dont want to install nodejs on your machine.
 | |
| 
 | |
|     There are 3 possibilities to install the frontend:
 | |
|     1. invoke frontend-download --ref 01f2aa5f746a36706e9a5e588c4242b7bf1996d5
 | |
|        if ref is omitted, it tries to auto detect the current git ref via `git rev-parse HEAD`.
 | |
|        Note: GitHub doesn't allow workflow artifacts to be downloaded from anonymous users, so
 | |
|              this will output a link where you can download the frontend with a signed in browser
 | |
|              and then continue with option 3
 | |
|     2. invoke frontend-download --tag 0.13.0
 | |
|        Downloads the frontend build from the releases.
 | |
|     3. invoke frontend-download --file /home/vscode/Downloads/frontend-build.zip
 | |
|        This will extract your zip file and place the contents at the correct destination
 | |
|     """
 | |
|     import functools
 | |
|     import subprocess
 | |
|     from tempfile import NamedTemporaryFile
 | |
|     from zipfile import ZipFile
 | |
| 
 | |
|     import requests
 | |
| 
 | |
|     info('Downloading frontend...')
 | |
| 
 | |
|     # globals
 | |
|     default_headers = {'Accept': 'application/vnd.github.v3+json'}
 | |
| 
 | |
|     # helper functions
 | |
|     def find_resource(resource, key, value):
 | |
|         for obj in resource:
 | |
|             if obj[key] == value:
 | |
|                 return obj
 | |
|         return None
 | |
| 
 | |
|     def handle_extract(file):
 | |
|         # if no extract is requested, exit here
 | |
|         if not extract:
 | |
|             return
 | |
| 
 | |
|         dest_path = manage_py_dir().joinpath('web', 'static', 'web')
 | |
| 
 | |
|         # if clean, delete static/web directory
 | |
|         if clean:
 | |
|             shutil.rmtree(dest_path, ignore_errors=True)
 | |
|             dest_path.mkdir(parents=True, exist_ok=True)
 | |
|             info(f'Cleaned directory: {dest_path}')
 | |
| 
 | |
|         # unzip build to static folder
 | |
|         with ZipFile(file, 'r') as zip_ref:
 | |
|             zip_ref.extractall(dest_path)
 | |
| 
 | |
|         info(f'Unzipped downloaded frontend build to: {dest_path}')
 | |
| 
 | |
|     def handle_download(url):
 | |
|         # download frontend-build.zip to temporary file
 | |
|         with (
 | |
|             requests.get(
 | |
|                 url, headers=default_headers, stream=True, allow_redirects=True
 | |
|             ) as response,
 | |
|             NamedTemporaryFile(suffix='.zip') as dst,
 | |
|         ):
 | |
|             response.raise_for_status()
 | |
| 
 | |
|             # auto decode the gzipped raw data
 | |
|             response.raw.read = functools.partial(
 | |
|                 response.raw.read, decode_content=True
 | |
|             )
 | |
|             with open(dst.name, 'wb') as f:
 | |
|                 shutil.copyfileobj(response.raw, f)
 | |
|             info(f'Downloaded frontend build to temporary file: {dst.name}')
 | |
| 
 | |
|             handle_extract(dst.name)
 | |
| 
 | |
|     def check_already_current(tag=None, sha=None):
 | |
|         """Check if the currently available frontend is already the requested one."""
 | |
|         ref = 'tag' if tag else 'commit'
 | |
| 
 | |
|         if tag:
 | |
|             current = manage_py_dir().joinpath(
 | |
|                 'web', 'static', 'web', '.vite', 'tag.txt'
 | |
|             )
 | |
|         elif sha:
 | |
|             current = manage_py_dir().joinpath(
 | |
|                 'web', 'static', 'web', '.vite', 'sha.txt'
 | |
|             )
 | |
|         else:
 | |
|             raise ValueError('Either tag or sha needs to be set')
 | |
| 
 | |
|         if not current.exists():
 | |
|             warning(
 | |
|                 f'Current frontend information for {ref} is not available in {current!s} - this is expected in some cases'
 | |
|             )
 | |
|             return False
 | |
| 
 | |
|         current_content = current.read_text().strip()
 | |
|         ref_value = tag or sha
 | |
|         if current_content == ref_value:
 | |
|             info(f'Frontend {ref} is already `{ref_value}`')
 | |
|             return True
 | |
|         else:
 | |
|             info(
 | |
|                 f'Frontend {ref} is not expected `{ref_value}` but `{current_content}` - new version will be downloaded'
 | |
|             )
 | |
|             return False
 | |
| 
 | |
|     # if zip file is specified, try to extract it directly
 | |
|     if file:
 | |
|         handle_extract(file)
 | |
|         static(c, frontend=False, skip_plugins=True)
 | |
|         return
 | |
| 
 | |
|     # check arguments
 | |
|     if ref is not None and tag is not None:
 | |
|         error('ERROR: Do not set ref and tag.')
 | |
|         return
 | |
| 
 | |
|     if ref is None and tag is None:
 | |
|         try:
 | |
|             ref = subprocess.check_output(
 | |
|                 ['git', 'rev-parse', 'HEAD'], encoding='utf-8'
 | |
|             ).strip()
 | |
|         except Exception:
 | |
|             # .deb Packages contain extra information in the VERSION file
 | |
|             content: dict = get_version_vals()
 | |
|             if is_pkg_installer(content):
 | |
|                 ref = content.get('INVENTREE_COMMIT_SHA')
 | |
|                 info(
 | |
|                     f'[INFO] Running in package environment, got commit "{ref}" from VERSION file'
 | |
|                 )
 | |
|             else:
 | |
|                 error("ERROR: Cannot get current ref via 'git rev-parse HEAD'")
 | |
|                 return
 | |
| 
 | |
|     if ref is None and tag is None:
 | |
|         error('ERROR: Either ref or tag needs to be set.')
 | |
| 
 | |
|     if tag:
 | |
|         tag = tag.lstrip('v')
 | |
|         try:
 | |
|             if check_already_current(tag=tag):
 | |
|                 return
 | |
|             handle_download(
 | |
|                 f'https://github.com/{repo}/releases/download/{tag}/frontend-build.zip'
 | |
|             )
 | |
|         except Exception as e:
 | |
|             if not isinstance(e, requests.HTTPError):
 | |
|                 raise e
 | |
|             error(
 | |
|                 f"""ERROR: An Error occurred. Unable to download frontend build, release or build does not exist,
 | |
| try downloading the frontend-build.zip yourself via: https://github.com/{repo}/releases
 | |
| Then try continuing by running: invoke frontend-download --file <path-to-downloaded-zip-file>"""
 | |
|             )
 | |
| 
 | |
|         return
 | |
| 
 | |
|     if ref:
 | |
|         if check_already_current(sha=ref):
 | |
|             return
 | |
|         # get workflow run from all workflow runs on that particular ref
 | |
|         workflow_runs = requests.get(
 | |
|             f'https://api.github.com/repos/{repo}/actions/runs?head_sha={ref}',
 | |
|             headers=default_headers,
 | |
|         ).json()
 | |
| 
 | |
|         if not (qc_run := find_resource(workflow_runs['workflow_runs'], 'name', 'QC')):
 | |
|             error(f'ERROR: Cannot find any workflow runs for current SHA {ref}')
 | |
|             return
 | |
| 
 | |
|         info(
 | |
|             f'Found workflow {qc_run["name"]} (run {qc_run["run_number"]}-{qc_run["run_attempt"]})'
 | |
|         )
 | |
| 
 | |
|         # get frontend-build artifact from all artifacts available for this workflow run
 | |
|         artifacts = requests.get(
 | |
|             qc_run['artifacts_url'], headers=default_headers
 | |
|         ).json()
 | |
|         if not (
 | |
|             frontend_artifact := find_resource(
 | |
|                 artifacts['artifacts'], 'name', 'frontend-build'
 | |
|             )
 | |
|         ):
 | |
|             error('[ERROR] Cannot find frontend-build.zip attachment for current sha')
 | |
|             return
 | |
| 
 | |
|         info(
 | |
|             f'Found artifact {frontend_artifact["name"]} with id {frontend_artifact["id"]} ({frontend_artifact["size_in_bytes"] / 1e6:.2f}MB).'
 | |
|         )
 | |
| 
 | |
|         print(
 | |
|             f"""
 | |
| GitHub doesn't allow artifact downloads from anonymous users. Either download the following file
 | |
| via your signed in browser, or consider using a point release download via invoke frontend-download --tag <git-tag>
 | |
| 
 | |
|     Download: https://github.com/{repo}/suites/{qc_run['check_suite_id']}/artifacts/{frontend_artifact['id']} manually and
 | |
|     continue by running: invoke frontend-download --file <path-to-downloaded-zip-file>"""
 | |
|         )
 | |
| 
 | |
| 
 | |
| def doc_schema(c):
 | |
|     """Generate schema documentation for the API."""
 | |
|     schema(
 | |
|         c, ignore_warnings=True, overwrite=True, filename='docs/generated/schema.yml'
 | |
|     )
 | |
|     run(c, 'python docs/extract_schema.py docs/generated/schema.yml')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={
 | |
|         'address': 'Host and port to run the server on (default: localhost:8080)',
 | |
|         'compile_schema': 'Compile the schema documentation first (default: False)',
 | |
|     }
 | |
| )
 | |
| def docs_server(c, address='localhost:8080', compile_schema=False):
 | |
|     """Start a local mkdocs server to view the documentation."""
 | |
|     # Extract settings definitions
 | |
|     export_definitions(c, basedir='docs')
 | |
| 
 | |
|     if compile_schema:
 | |
|         doc_schema(c)
 | |
| 
 | |
|     run(c, f'mkdocs serve -a {address} -f docs/mkdocs.yml')
 | |
| 
 | |
| 
 | |
| @task(
 | |
|     help={'mkdocs': 'Build the documentation using mkdocs at the end (default: False)'}
 | |
| )
 | |
| def build_docs(c, mkdocs=False):
 | |
|     """Build the required documents for building the docs. Optionally build the documentation using mkdocs."""
 | |
|     migrate(c)
 | |
|     export_definitions(c, basedir='docs')
 | |
|     doc_schema(c)
 | |
| 
 | |
|     if mkdocs:
 | |
|         run(c, 'mkdocs build  -f docs/mkdocs.yml')
 | |
|         info('Documentation build complete')
 | |
|     else:
 | |
|         info('Documentation build complete, but mkdocs not requested')
 | |
| 
 | |
| 
 | |
| @task
 | |
| def clear_generated(c):
 | |
|     """Clear generated files from `invoke update`."""
 | |
|     # pyc/pyo files
 | |
|     run(c, 'find src -name "*.pyc" -exec rm -f {} +')
 | |
|     run(c, 'find src -name "*.pyo" -exec rm -f {} +')
 | |
| 
 | |
|     # cache folders
 | |
|     run(c, 'find src -name "__pycache__" -exec rm -rf {} +')
 | |
| 
 | |
|     # Generated translations
 | |
|     run(c, 'find src -name "django.mo" -exec rm -f {} +')
 | |
|     run(c, 'find src -name "messages.mo" -exec rm -f {} +')
 | |
| 
 | |
| 
 | |
| @task(pre=[wait])
 | |
| def monitor(c):
 | |
|     """Monitor the worker performance."""
 | |
|     manage(c, 'qmonitor', pty=True)
 | |
| 
 | |
| 
 | |
| # endregion tasks
 | |
| 
 | |
| # Collection sorting
 | |
| development = Collection(
 | |
|     delete_data,
 | |
|     docs_server,
 | |
|     frontend_server,
 | |
|     frontend_test,
 | |
|     gunicorn,
 | |
|     import_fixtures,
 | |
|     schema,
 | |
|     server,
 | |
|     setup_dev,
 | |
|     setup_test,
 | |
|     shell,
 | |
|     test,
 | |
|     test_translations,
 | |
|     translate,
 | |
| )
 | |
| 
 | |
| internal = Collection(
 | |
|     clean_settings,
 | |
|     clear_generated,
 | |
|     export_settings_definitions,
 | |
|     export_definitions,
 | |
|     frontend_build,
 | |
|     frontend_check,
 | |
|     frontend_compile,
 | |
|     frontend_install,
 | |
|     frontend_trans,
 | |
|     rebuild_models,
 | |
|     rebuild_thumbnails,
 | |
|     showmigrations,
 | |
| )
 | |
| 
 | |
| ns = Collection(
 | |
|     backup,
 | |
|     export_records,
 | |
|     frontend_download,
 | |
|     import_records,
 | |
|     install,
 | |
|     listbackups,
 | |
|     migrate,
 | |
|     plugins,
 | |
|     remove_mfa,
 | |
|     restore,
 | |
|     static,
 | |
|     superuser,
 | |
|     update,
 | |
|     version,
 | |
|     wait,
 | |
|     worker,
 | |
|     monitor,
 | |
|     build_docs,
 | |
| )
 | |
| 
 | |
| ns.add_collection(development, 'dev')
 | |
| ns.add_collection(internal, 'int')
 |