2
0
mirror of https://github.com/inventree/InvenTree.git synced 2025-04-28 11:36:44 +00:00

Invoke error handling (#8454)

* Add exception handler in tasks.py

- Catch UnexpectedExit
- Print clear message on the command that failed
- Re-throw exception

* Change c.run to run(c)

- Pass all invoke commands through the exception handler

* Add more info to version command

* Pretty colors

* Remove dummy error

* Improve invoke task output:

- Add colors for different message types
- Add success messages for critical tasks

* Tweaks

* Improve output commands
This commit is contained in:
Oliver 2024-11-09 09:16:17 +11:00 committed by GitHub
parent 9fb93b899c
commit ad39d3fd95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

233
tasks.py
View File

@ -12,6 +12,31 @@ from platform import python_version
from typing import Optional from typing import Optional
from invoke import Collection, task from invoke import Collection, task
from invoke.exceptions import UnexpectedExit
def success(*args):
"""Print a success message to the console."""
msg = ' '.join(map(str, args))
print(f'\033[92m{msg}\033[0m')
def error(*args):
"""Print an error message to the console."""
msg = ' '.join(map(str, args))
print(f'\033[91m{msg}\033[0m')
def warning(*args):
"""Print a warning message to the console."""
msg = ' '.join(map(str, args))
print(f'\033[93m{msg}\033[0m')
def info(*args):
"""Print an informational message to the console."""
msg = ' '.join(map(str, args))
print(f'\033[94m{msg}\033[0m')
def checkPythonVersion(): def checkPythonVersion():
@ -147,7 +172,13 @@ def run(c, cmd, path: Optional[Path] = None, pty=False, env=None):
""" """
env = env or {} env = env or {}
path = path or localDir() path = path or localDir()
c.run(f'cd "{path}" && {cmd}', pty=pty, env=env)
try:
c.run(f'cd "{path}" && {cmd}', pty=pty, env=env)
except UnexpectedExit as e:
error(f"ERROR: InvenTree command failed: '{cmd}'")
warning('- Refer to the error messages in the log above for more information')
raise e
def manage(c, cmd, pty: bool = False, env=None): def manage(c, cmd, pty: bool = False, env=None):
@ -234,46 +265,54 @@ def plugins(c, uv=False):
plugin_file = get_plugin_file() plugin_file = get_plugin_file()
print(f"Installing plugin packages from '{plugin_file}'") info(f"Installing plugin packages from '{plugin_file}'")
# Install the plugins # Install the plugins
if not uv: if not uv:
run(c, f"pip3 install --disable-pip-version-check -U -r '{plugin_file}'") run(c, f"pip3 install --disable-pip-version-check -U -r '{plugin_file}'")
else: else:
c.run('pip3 install --no-cache-dir --disable-pip-version-check uv') run(c, 'pip3 install --no-cache-dir --disable-pip-version-check uv')
run(c, f"uv pip install -r '{plugin_file}'") run(c, f"uv pip install -r '{plugin_file}'")
# Collect plugin static files # Collect plugin static files
manage(c, 'collectplugins') manage(c, 'collectplugins')
@task(help={'uv': 'Use UV package manager (experimental)'}) @task(
def install(c, uv=False): help={
'uv': 'Use UV package manager (experimental)',
'skip_plugins': 'Skip plugin installation',
}
)
def install(c, uv=False, skip_plugins=False):
"""Installs required python packages.""" """Installs required python packages."""
INSTALL_FILE = 'src/backend/requirements.txt' INSTALL_FILE = 'src/backend/requirements.txt'
print(f"Installing required python packages from '{INSTALL_FILE}'") info(f"Installing required python packages from '{INSTALL_FILE}'")
if not Path(INSTALL_FILE).is_file(): if not Path(INSTALL_FILE).is_file():
raise FileNotFoundError(f"Requirements file '{INSTALL_FILE}' not found") raise FileNotFoundError(f"Requirements file '{INSTALL_FILE}' not found")
# Install required Python packages with PIP # Install required Python packages with PIP
if not uv: if not uv:
c.run( run(
'pip3 install --no-cache-dir --disable-pip-version-check -U pip setuptools' c,
'pip3 install --no-cache-dir --disable-pip-version-check -U pip setuptools',
) )
run( run(
c, c,
f'pip3 install --no-cache-dir --disable-pip-version-check -U --require-hashes -r {INSTALL_FILE}', f'pip3 install --no-cache-dir --disable-pip-version-check -U --require-hashes -r {INSTALL_FILE}',
) )
else: else:
c.run( run(
'pip3 install --no-cache-dir --disable-pip-version-check -U uv setuptools' c,
'pip3 install --no-cache-dir --disable-pip-version-check -U uv setuptools',
) )
run(c, f'uv pip install -U --require-hashes -r {INSTALL_FILE}') run(c, f'uv pip install -U --require-hashes -r {INSTALL_FILE}')
# Run plugins install # Run plugins install
plugins(c, uv=uv) if not skip_plugins:
plugins(c, uv=uv)
# Compile license information # Compile license information
lic_path = managePyDir().joinpath('InvenTree', 'licenses.txt') lic_path = managePyDir().joinpath('InvenTree', 'licenses.txt')
@ -282,22 +321,25 @@ def install(c, uv=False):
f'pip-licenses --format=json --with-license-file --no-license-path > {lic_path}', f'pip-licenses --format=json --with-license-file --no-license-path > {lic_path}',
) )
success('Dependency installation complete')
@task(help={'tests': 'Set up test dataset at the end'}) @task(help={'tests': 'Set up test dataset at the end'})
def setup_dev(c, tests=False): def setup_dev(c, tests=False):
"""Sets up everything needed for the dev environment.""" """Sets up everything needed for the dev environment."""
print("Installing required python packages from 'src/backend/requirements-dev.txt'") info("Installing required python packages from 'src/backend/requirements-dev.txt'")
# Install required Python packages with PIP # Install required Python packages with PIP
run(c, 'pip3 install -U --require-hashes -r src/backend/requirements-dev.txt') run(c, 'pip3 install -U --require-hashes -r src/backend/requirements-dev.txt')
# Install pre-commit hook # Install pre-commit hook
print('Installing pre-commit for checks before git commits...') info('Installing pre-commit for checks before git commits...')
run(c, 'pre-commit install') run(c, 'pre-commit install')
# Update all the hooks # Update all the hooks
run(c, 'pre-commit autoupdate') run(c, 'pre-commit autoupdate')
print('pre-commit set up is done...')
success('pre-commit set up complete')
# Set up test-data if flag is set # Set up test-data if flag is set
if tests: if tests:
@ -314,18 +356,21 @@ def superuser(c):
@task @task
def rebuild_models(c): def rebuild_models(c):
"""Rebuild database models with MPTT structures.""" """Rebuild database models with MPTT structures."""
info('Rebuilding internal database structures')
manage(c, 'rebuild_models', pty=True) manage(c, 'rebuild_models', pty=True)
@task @task
def rebuild_thumbnails(c): def rebuild_thumbnails(c):
"""Rebuild missing image thumbnails.""" """Rebuild missing image thumbnails."""
info('Rebuilding image thumbnails')
manage(c, 'rebuild_thumbnails', pty=True) manage(c, 'rebuild_thumbnails', pty=True)
@task @task
def clean_settings(c): def clean_settings(c):
"""Clean the setting tables of old settings.""" """Clean the setting tables of old settings."""
info('Cleaning old settings from the database')
manage(c, 'clean_settings') manage(c, 'clean_settings')
@ -333,13 +378,20 @@ def clean_settings(c):
def remove_mfa(c, mail=''): def remove_mfa(c, mail=''):
"""Remove MFA for a user.""" """Remove MFA for a user."""
if not mail: if not mail:
print('You must provide a users mail') warning('You must provide a users mail')
return
manage(c, f'remove_mfa {mail}') manage(c, f'remove_mfa {mail}')
@task(help={'frontend': 'Build the frontend', 'clear': 'Remove existing static files'}) @task(
def static(c, frontend=False, clear=True): help={
'frontend': 'Build the frontend',
'clear': 'Remove existing static files',
'skip_plugins': 'Ignore collection of plugin static files',
}
)
def static(c, frontend=False, clear=True, skip_plugins=False):
"""Copies required static files to the STATIC_ROOT directory, as per Django requirements.""" """Copies required static files to the STATIC_ROOT directory, as per Django requirements."""
manage(c, 'prerender') manage(c, 'prerender')
@ -347,7 +399,7 @@ def static(c, frontend=False, clear=True):
frontend_trans(c) frontend_trans(c)
frontend_build(c) frontend_build(c)
print('Collecting static files...') info('Collecting static files...')
cmd = 'collectstatic --no-input --verbosity 0' cmd = 'collectstatic --no-input --verbosity 0'
@ -357,7 +409,10 @@ def static(c, frontend=False, clear=True):
manage(c, cmd) manage(c, cmd)
# Collect plugin static files # Collect plugin static files
manage(c, 'collectplugins') if not skip_plugins:
manage(c, 'collectplugins')
success('Static files collected successfully')
@task @task
@ -371,10 +426,10 @@ def translate_stats(c):
try: try:
manage(c, 'compilemessages', pty=True) manage(c, 'compilemessages', pty=True)
except Exception: except Exception:
print('WARNING: Translation files could not be compiled:') warning('WARNING: Translation files could not be compiled:')
path = managePyDir().joinpath('script', 'translation_stats.py') path = managePyDir().joinpath('script', 'translation_stats.py')
c.run(f'python3 {path}') run(c, f'python3 {path}')
@task(post=[translate_stats]) @task(post=[translate_stats])
@ -384,6 +439,8 @@ def translate(c, ignore_static=False, no_frontend=False):
Note: This command should not be used on a local install, Note: This command should not be used on a local install,
it is performed as part of the InvenTree translation toolchain. it is performed as part of the InvenTree translation toolchain.
""" """
info('Building translation files')
# Translate applicable .py / .html / .js files # Translate applicable .py / .html / .js files
manage(c, 'makemessages --all -e py,html,js --no-wrap') manage(c, 'makemessages --all -e py,html,js --no-wrap')
manage(c, 'compilemessages') manage(c, 'compilemessages')
@ -397,6 +454,8 @@ def translate(c, ignore_static=False, no_frontend=False):
if not ignore_static: if not ignore_static:
static(c) static(c)
success('Translation files built successfully')
@task( @task(
help={ help={
@ -406,7 +465,7 @@ def translate(c, ignore_static=False, no_frontend=False):
) )
def backup(c, clean=False, path=None): def backup(c, clean=False, path=None):
"""Backup the database and media files.""" """Backup the database and media files."""
print('Backing up InvenTree database...') info('Backing up InvenTree database...')
cmd = '--noinput --compress -v 2' cmd = '--noinput --compress -v 2'
@ -422,9 +481,11 @@ def backup(c, clean=False, path=None):
cmd += ' --clean' cmd += ' --clean'
manage(c, f'dbbackup {cmd}') manage(c, f'dbbackup {cmd}')
print('Backing up InvenTree media files...') info('Backing up InvenTree media files...')
manage(c, f'mediabackup {cmd}') manage(c, f'mediabackup {cmd}')
success('Backup completed successfully')
@task( @task(
help={ help={
@ -457,7 +518,7 @@ def restore(
if ignore_database: if ignore_database:
print('Skipping database archive...') print('Skipping database archive...')
else: else:
print('Restoring InvenTree database') info('Restoring InvenTree database')
cmd = f'dbrestore {base_cmd}' cmd = f'dbrestore {base_cmd}'
if db_file: if db_file:
@ -468,7 +529,7 @@ def restore(
if ignore_media: if ignore_media:
print('Skipping media restore...') print('Skipping media restore...')
else: else:
print('Restoring InvenTree media files') info('Restoring InvenTree media files')
cmd = f'mediarestore {base_cmd}' cmd = f'mediarestore {base_cmd}'
if media_file: if media_file:
@ -483,8 +544,7 @@ def migrate(c):
This is a critical step if the database schema have been altered! This is a critical step if the database schema have been altered!
""" """
print('Running InvenTree database migrations...') info('Running InvenTree database migrations...')
print('========================================')
# Run custom management command which wraps migrations in "maintenance mode" # Run custom management command which wraps migrations in "maintenance mode"
manage(c, 'makemigrations') manage(c, 'makemigrations')
@ -492,8 +552,7 @@ def migrate(c):
manage(c, 'migrate --run-syncdb') manage(c, 'migrate --run-syncdb')
manage(c, 'remove_stale_contenttypes --include-stale-apps --no-input', pty=True) manage(c, 'remove_stale_contenttypes --include-stale-apps --no-input', pty=True)
print('========================================') success('InvenTree database migrations completed')
print('InvenTree database migrations completed!')
@task(help={'app': 'Specify an app to show migrations for (leave blank for all apps)'}) @task(help={'app': 'Specify an app to show migrations for (leave blank for all apps)'})
@ -535,6 +594,8 @@ def update(
- clean_settings - clean_settings
- translate_stats - translate_stats
""" """
info('Updating InvenTree installation...')
# Ensure required components are installed # Ensure required components are installed
install(c, uv=uv) install(c, uv=uv)
@ -553,7 +614,7 @@ def update(
frontend = False frontend = False
no_frontend = True no_frontend = True
else: else:
print('Updating frontend...') info('Updating frontend...')
# Decide if we should compile the frontend or try to download it # Decide if we should compile the frontend or try to download it
if node_available(bypass_yarn=True): if node_available(bypass_yarn=True):
frontend_compile(c) frontend_compile(c)
@ -608,7 +669,7 @@ def export_records(
if not target.is_absolute(): if not target.is_absolute():
target = localDir().joinpath(filename).resolve() target = localDir().joinpath(filename).resolve()
print(f"Exporting database records to file '{target}'") info(f"Exporting database records to file '{target}'")
check_file_existance(target, overwrite) check_file_existance(target, overwrite)
@ -625,7 +686,7 @@ def export_records(
# Dump data to temporary file # Dump data to temporary file
manage(c, cmd, pty=True) manage(c, cmd, pty=True)
print('Running data post-processing step...') info('Running data post-processing step...')
# Post-process the file, to remove any "permissions" specified for a user or group # Post-process the file, to remove any "permissions" specified for a user or group
with open(tmpfile, encoding='utf-8') as f_in: with open(tmpfile, encoding='utf-8') as f_in:
@ -654,12 +715,12 @@ def export_records(
with open(target, 'w', encoding='utf-8') as f_out: with open(target, 'w', encoding='utf-8') as f_out:
f_out.write(json.dumps(data_out, indent=2)) f_out.write(json.dumps(data_out, indent=2))
print('Data export completed')
if not retain_temp: if not retain_temp:
print('Removing temporary files') print('Removing temporary files')
os.remove(tmpfile) os.remove(tmpfile)
success('Data export completed')
@task( @task(
help={ help={
@ -679,13 +740,13 @@ def import_records(
target = localDir().joinpath(filename) target = localDir().joinpath(filename)
if not target.exists(): if not target.exists():
print(f"Error: File '{target}' does not exist") error(f"ERROR: File '{target}' does not exist")
sys.exit(1) sys.exit(1)
if clear: if clear:
delete_data(c, force=True) delete_data(c, force=True)
print(f"Importing database records from '{target}'") info(f"Importing database records from '{target}'")
# We need to load 'auth' data (users / groups) *first* # We need to load 'auth' data (users / groups) *first*
# This is due to the users.owner model, which has a ContentType foreign key # This is due to the users.owner model, which has a ContentType foreign key
@ -698,7 +759,7 @@ def import_records(
try: try:
data = json.loads(f_in.read()) data = json.loads(f_in.read())
except json.JSONDecodeError as exc: except json.JSONDecodeError as exc:
print(f'Error: Failed to decode JSON file: {exc}') error(f'ERROR: Failed to decode JSON file: {exc}')
sys.exit(1) sys.exit(1)
auth_data = [] auth_data = []
@ -720,7 +781,7 @@ def import_records(
else: else:
load_data.append(entry) load_data.append(entry)
else: else:
print('Warning: Invalid entry in data file') warning('WARNING: Invalid entry in data file')
print(entry) print(entry)
# Write the auth file data # Write the auth file data
@ -734,22 +795,22 @@ def import_records(
excludes = content_excludes(allow_auth=False) excludes = content_excludes(allow_auth=False)
# Import auth models first # Import auth models first
print('Importing user auth data...') info('Importing user auth data...')
cmd = f"loaddata '{authfile}'" cmd = f"loaddata '{authfile}'"
manage(c, cmd, pty=True) manage(c, cmd, pty=True)
# Import everything else next # Import everything else next
print('Importing database records...') info('Importing database records...')
cmd = f"loaddata '{datafile}' -i {excludes}" cmd = f"loaddata '{datafile}' -i {excludes}"
manage(c, cmd, pty=True) manage(c, cmd, pty=True)
if not retain_temp: if not retain_temp:
print('Removing temporary files') info('Removing temporary files')
os.remove(datafile) os.remove(datafile)
os.remove(authfile) os.remove(authfile)
print('Data import completed') info('Data import completed')
@task @task
@ -758,7 +819,7 @@ def delete_data(c, force=False):
Warning: This will REALLY delete all records in the database!! Warning: This will REALLY delete all records in the database!!
""" """
print('Deleting all data from InvenTree database...') info('Deleting all data from InvenTree database...')
if force: if force:
manage(c, 'flush --noinput') manage(c, 'flush --noinput')
@ -811,6 +872,7 @@ def import_fixtures(c):
@task @task
def wait(c): def wait(c):
"""Wait until the database connection is ready.""" """Wait until the database connection is ready."""
info('Waiting for database connection...')
return manage(c, 'wait_for_db') return manage(c, 'wait_for_db')
@ -834,10 +896,8 @@ def gunicorn(c, address='0.0.0.0:8000', workers=None):
if workers: if workers:
cmd += f' --workers={workers}' cmd += f' --workers={workers}'
print('Starting Gunicorn Server:') info(f'Starting Gunicorn Server: {cmd}')
print(cmd) run(c, cmd, pty=True)
c.run(cmd, pty=True)
@task(pre=[wait], help={'address': 'Server address:port (default=127.0.0.1:8000)'}) @task(pre=[wait], help={'address': 'Server address:port (default=127.0.0.1:8000)'})
@ -877,13 +937,11 @@ def test_translations(c):
django.setup() django.setup()
# Add language # Add language
print('Add dummy language...') info('Add dummy language...')
print('========================================')
manage(c, 'makemessages -e py,html,js --no-wrap -l xx') manage(c, 'makemessages -e py,html,js --no-wrap -l xx')
# change translation # change translation
print('Fill in dummy translations...') info('Fill in dummy translations...')
print('========================================')
file_path = pathlib.Path(settings.LOCALE_PATHS[0], 'xx', 'LC_MESSAGES', 'django.po') file_path = pathlib.Path(settings.LOCALE_PATHS[0], 'xx', 'LC_MESSAGES', 'django.po')
new_file_path = str(file_path) + '_new' new_file_path = str(file_path) + '_new'
@ -922,8 +980,7 @@ def test_translations(c):
new_file_path.rename(file_path) new_file_path.rename(file_path)
# compile languages # compile languages
print('Compile languages ...') info('Compile languages ...')
print('========================================')
manage(c, 'compilemessages') manage(c, 'compilemessages')
# reset cwd # reset cwd
@ -991,8 +1048,8 @@ def test(
if coverage: if coverage:
# Run tests within coverage environment, and generate report # Run tests within coverage environment, and generate report
c.run(f'coverage run {managePyPath()} {cmd}') run(c, f'coverage run {managePyPath()} {cmd}')
c.run('coverage xml -i') run(c, 'coverage xml -i')
else: else:
# Run simple test runner, without coverage # Run simple test runner, without coverage
manage(c, cmd, pty=pty) manage(c, cmd, pty=pty)
@ -1010,34 +1067,33 @@ def setup_test(c, ignore_update=False, dev=False, path='inventree-demo-dataset')
# Remove old data directory # Remove old data directory
if template_dir.exists(): if template_dir.exists():
print('Removing old data ...') info('Removing old data ...')
c.run(f'rm {template_dir} -r') run(c, f'rm {template_dir} -r')
# Get test data # Get test data
print('Cloning demo dataset ...') info('Cloning demo dataset ...')
c.run( run(
f'git clone https://github.com/inventree/demo-dataset {template_dir} -v --depth=1' c,
f'git clone https://github.com/inventree/demo-dataset {template_dir} -v --depth=1',
) )
print('========================================')
# Make sure migrations are done - might have just deleted sqlite database # Make sure migrations are done - might have just deleted sqlite database
if not ignore_update: if not ignore_update:
migrate(c) migrate(c)
# Load data # Load data
print('Loading database records ...') info('Loading database records ...')
import_records(c, filename=template_dir.joinpath('inventree_data.json'), clear=True) import_records(c, filename=template_dir.joinpath('inventree_data.json'), clear=True)
# Copy media files # Copy media files
print('Copying media files ...') info('Copying media files ...')
src = template_dir.joinpath('media') src = template_dir.joinpath('media')
dst = get_media_dir() dst = get_media_dir()
print(f'Copying media files - "{src}" to "{dst}"') info(f'Copying media files - "{src}" to "{dst}"')
shutil.copytree(src, dst, dirs_exist_ok=True) shutil.copytree(src, dst, dirs_exist_ok=True)
print('Done setting up test environment...') info('Done setting up test environment...')
print('========================================')
# Set up development setup if flag is set # Set up development setup if flag is set
if dev: if dev:
@ -1058,7 +1114,7 @@ def schema(
filename = Path(filename).resolve() filename = Path(filename).resolve()
check_file_existance(filename, overwrite) check_file_existance(filename, overwrite)
print(f"Exporting schema file to '{filename}'") info(f"Exporting schema file to '{filename}'")
cmd = f'spectacular --file {filename} --validate --color' cmd = f'spectacular --file {filename} --validate --color'
@ -1081,7 +1137,7 @@ def schema(
assert filename.exists() assert filename.exists()
print('Schema export completed:', filename) success(f'Schema export completed: {filename}')
@task @task
@ -1090,7 +1146,7 @@ def export_settings_definitions(c, filename='inventree_settings.json', overwrite
filename = Path(filename).resolve() filename = Path(filename).resolve()
check_file_existance(filename, overwrite) check_file_existance(filename, overwrite)
print(f"Exporting settings definition to '{filename}'...") info(f"Exporting settings definition to '{filename}'...")
manage(c, f'export_settings_definitions {filename}', pty=True) manage(c, f'export_settings_definitions {filename}', pty=True)
@ -1112,6 +1168,10 @@ def version(c):
InvenTree - inventree.org InvenTree - inventree.org
The Open-Source Inventory Management System\n The Open-Source Inventory Management System\n
Python paths:
Executable {sys.executable}
Environment {sys.prefix}
Installation paths: Installation paths:
Base {localDir()} Base {localDir()}
Config {get_config_file()} Config {get_config_file()}
@ -1152,11 +1212,11 @@ def frontend_compile(c):
Args: Args:
c: Context variable c: Context variable
""" """
print('Compiling frontend code...') info('Compiling frontend code...')
frontend_install(c) frontend_install(c)
frontend_trans(c) frontend_trans(c)
frontend_build(c) frontend_build(c)
success('Frontend compilation complete')
@task @task
@ -1166,7 +1226,7 @@ def frontend_install(c):
Args: Args:
c: Context variable c: Context variable
""" """
print('Installing frontend dependencies') info('Installing frontend dependencies')
yarn(c, 'yarn install') yarn(c, 'yarn install')
@ -1177,7 +1237,7 @@ def frontend_trans(c):
Args: Args:
c: Context variable c: Context variable
""" """
print('Compiling frontend translations') info('Compiling frontend translations')
yarn(c, 'yarn run extract') yarn(c, 'yarn run extract')
yarn(c, 'yarn run compile') yarn(c, 'yarn run compile')
@ -1189,7 +1249,7 @@ def frontend_build(c):
Args: Args:
c: Context variable c: Context variable
""" """
print('Building frontend') info('Building frontend')
yarn(c, 'yarn run build --emptyOutDir') yarn(c, 'yarn run build --emptyOutDir')
@ -1200,7 +1260,7 @@ def frontend_server(c):
Args: Args:
c: Context variable c: Context variable
""" """
print('Starting frontend development server') info('Starting frontend development server')
yarn(c, 'yarn run compile') yarn(c, 'yarn run compile')
yarn(c, 'yarn run dev') yarn(c, 'yarn run dev')
@ -1244,7 +1304,7 @@ def frontend_download(
import requests import requests
print('Downloading frontend...') info('Downloading frontend...')
# globals # globals
default_headers = {'Accept': 'application/vnd.github.v3+json'} default_headers = {'Accept': 'application/vnd.github.v3+json'}
@ -1267,13 +1327,13 @@ def frontend_download(
if clean: if clean:
shutil.rmtree(dest_path, ignore_errors=True) shutil.rmtree(dest_path, ignore_errors=True)
dest_path.mkdir() dest_path.mkdir()
print(f'Cleaned directory: {dest_path}') info(f'Cleaned directory: {dest_path}')
# unzip build to static folder # unzip build to static folder
with ZipFile(file, 'r') as zip_ref: with ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(dest_path) zip_ref.extractall(dest_path)
print(f'Unzipped downloaded frontend build to: {dest_path}') info(f'Unzipped downloaded frontend build to: {dest_path}')
def handle_download(url): def handle_download(url):
# download frontend-build.zip to temporary file # download frontend-build.zip to temporary file
@ -1288,7 +1348,7 @@ def frontend_download(
) )
with open(dst.name, 'wb') as f: with open(dst.name, 'wb') as f:
shutil.copyfileobj(response.raw, f) shutil.copyfileobj(response.raw, f)
print(f'Downloaded frontend build to temporary file: {dst.name}') info(f'Downloaded frontend build to temporary file: {dst.name}')
handle_extract(dst.name) handle_extract(dst.name)
@ -1304,7 +1364,7 @@ def frontend_download(
raise ValueError('Either tag or sha needs to be set') raise ValueError('Either tag or sha needs to be set')
if not current.exists(): if not current.exists():
print( warning(
f'Current frontend information for {ref} is not available - this is expected in some cases' f'Current frontend information for {ref} is not available - this is expected in some cases'
) )
return False return False
@ -1327,7 +1387,7 @@ def frontend_download(
# check arguments # check arguments
if ref is not None and tag is not None: if ref is not None and tag is not None:
print('[ERROR] Do not set ref and tag.') error('ERROR: Do not set ref and tag.')
return return
if ref is None and tag is None: if ref is None and tag is None:
@ -1352,11 +1412,11 @@ def frontend_download(
f'[INFO] Running in package environment, got commit "{ref}" from VERSION file' f'[INFO] Running in package environment, got commit "{ref}" from VERSION file'
) )
else: else:
print("[ERROR] Cannot get current ref via 'git rev-parse HEAD'") error("ERROR: Cannot get current ref via 'git rev-parse HEAD'")
return return
if ref is None and tag is None: if ref is None and tag is None:
print('[ERROR] Either ref or tag needs to be set.') error('ERROR: Either ref or tag needs to be set.')
if tag: if tag:
tag = tag.lstrip('v') tag = tag.lstrip('v')
@ -1369,8 +1429,8 @@ def frontend_download(
except Exception as e: except Exception as e:
if not isinstance(e, requests.HTTPError): if not isinstance(e, requests.HTTPError):
raise e raise e
print( error(
f"""[ERROR] An Error occurred. Unable to download frontend build, release or build does not exist, f"""ERROR: An Error occurred. Unable to download frontend build, release or build does not exist,
try downloading the frontend-build.zip yourself via: https://github.com/{repo}/releases try downloading the frontend-build.zip yourself via: https://github.com/{repo}/releases
Then try continuing by running: invoke frontend-download --file <path-to-downloaded-zip-file>""" Then try continuing by running: invoke frontend-download --file <path-to-downloaded-zip-file>"""
) )
@ -1387,7 +1447,7 @@ Then try continuing by running: invoke frontend-download --file <path-to-downloa
).json() ).json()
if not (qc_run := find_resource(workflow_runs['workflow_runs'], 'name', 'QC')): if not (qc_run := find_resource(workflow_runs['workflow_runs'], 'name', 'QC')):
print('[ERROR] Cannot find any workflow runs for current sha') error('ERROR: Cannot find any workflow runs for current SHA')
return return
print( print(
f"Found workflow {qc_run['name']} (run {qc_run['run_number']}-{qc_run['run_attempt']})" f"Found workflow {qc_run['name']} (run {qc_run['run_number']}-{qc_run['run_attempt']})"
@ -1445,6 +1505,7 @@ def clear_generated(c):
# pyc/pyo files # pyc/pyo files
run(c, 'find src -name "*.pyc" -exec rm -f {} +') run(c, 'find src -name "*.pyc" -exec rm -f {} +')
run(c, 'find src -name "*.pyo" -exec rm -f {} +') run(c, 'find src -name "*.pyo" -exec rm -f {} +')
# cache folders # cache folders
run(c, 'find src -name "__pycache__" -exec rm -rf {} +') run(c, 'find src -name "__pycache__" -exec rm -rf {} +')