mirror of
https://github.com/inventree/InvenTree.git
synced 2025-06-15 11:35:41 +00:00
Improve check for plugins file
This commit is contained in:
@ -1,7 +1,6 @@
|
||||
"""Install a plugin into the python virtual environment."""
|
||||
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
@ -12,7 +11,6 @@ from django.utils.translation import gettext_lazy as _
|
||||
|
||||
import plugin.models
|
||||
import plugin.staticfiles
|
||||
from InvenTree.config import get_plugin_dir
|
||||
from InvenTree.exceptions import log_error
|
||||
|
||||
logger = logging.getLogger('inventree')
|
||||
@ -68,91 +66,41 @@ def handle_pip_error(error, path: str) -> list:
|
||||
raise ValidationError(errors[0])
|
||||
|
||||
|
||||
def check_plugins_path(packagename: str) -> bool:
|
||||
"""Determine if the package is installed in the plugins directory."""
|
||||
# Remove version information
|
||||
for c in '<>=! ':
|
||||
packagename = packagename.split(c)[0]
|
||||
def get_install_info(packagename: str) -> str:
|
||||
"""Determine the install information for a particular package.
|
||||
|
||||
plugin_dir = get_plugin_dir()
|
||||
|
||||
if not plugin_dir:
|
||||
return False
|
||||
|
||||
plugin_dir_path = pathlib.Path(plugin_dir)
|
||||
|
||||
if not plugin_dir_path.exists():
|
||||
return False
|
||||
|
||||
result = pip_command('freeze', '--path', plugin_dir_path.absolute())
|
||||
output = result.decode('utf-8').split('\n')
|
||||
|
||||
# Check if the package is installed in the plugins directory
|
||||
return any(re.match(rf'^{packagename}[\s=@]', line.strip()) for line in output)
|
||||
|
||||
|
||||
def check_package_path(packagename: str) -> str:
|
||||
"""Determine the install path of a particular package.
|
||||
|
||||
- If installed, return the installation path
|
||||
- If not installed, return an empty string
|
||||
- Uses 'pip show' to determine the install location of a package.
|
||||
"""
|
||||
logger.debug('check_package_path: %s', packagename)
|
||||
|
||||
# First check if the package is installed in the plugins directory
|
||||
if check_plugins_path(packagename):
|
||||
return f'plugins/{packagename}'
|
||||
logger.debug('get_install_info: %s', packagename)
|
||||
|
||||
# Remove version information
|
||||
for c in '<>=! ':
|
||||
for c in '<>=!@ ':
|
||||
packagename = packagename.split(c)[0]
|
||||
|
||||
info = {}
|
||||
|
||||
try:
|
||||
result = pip_command('show', packagename)
|
||||
|
||||
output = result.decode('utf-8').split('\n')
|
||||
|
||||
for line in output:
|
||||
# Check if line matches pattern "Location: ..."
|
||||
match = re.match(r'^Location:\s+(.+)$', line.strip())
|
||||
parts = line.split(':')
|
||||
|
||||
if match:
|
||||
return match.group(1)
|
||||
if len(parts) >= 2:
|
||||
key = str(parts[0].strip().lower().replace('-', '_'))
|
||||
value = str(parts[1].strip())
|
||||
|
||||
info[key] = value
|
||||
|
||||
except subprocess.CalledProcessError as error:
|
||||
log_error('check_package_path')
|
||||
log_error('get_install_info')
|
||||
|
||||
output = error.output.decode('utf-8')
|
||||
info['error'] = output
|
||||
logger.exception('Plugin lookup failed: %s', str(output))
|
||||
return False
|
||||
|
||||
# If we get here, the package is not installed
|
||||
return ''
|
||||
|
||||
|
||||
def plugins_dir():
|
||||
"""Return the path to the InvenTree custom plugins director.
|
||||
|
||||
Returns:
|
||||
pathlib.Path: Path to the custom plugins directory
|
||||
|
||||
Raises:
|
||||
ValidationError: If the plugins directory is not specified, or does not exist
|
||||
"""
|
||||
pd = get_plugin_dir()
|
||||
|
||||
if not pd:
|
||||
raise ValidationError(_('Plugins directory not specified'))
|
||||
|
||||
pd = pathlib.Path(pd)
|
||||
|
||||
if not pd.exists():
|
||||
try:
|
||||
pd.mkdir(parents=True, exist_ok=True)
|
||||
except Exception:
|
||||
raise ValidationError(_('Failed to create plugin directory'))
|
||||
|
||||
return pd.absolute()
|
||||
return info
|
||||
|
||||
|
||||
def install_plugins_file():
|
||||
@ -165,9 +113,7 @@ def install_plugins_file():
|
||||
logger.warning('Plugin file %s does not exist', str(pf))
|
||||
return
|
||||
|
||||
plugin_dir = plugins_dir()
|
||||
|
||||
cmd = ['install', '-U', '--target', str(plugin_dir), '-r', str(pf)]
|
||||
cmd = ['install', '-U', '-r', str(pf)]
|
||||
|
||||
try:
|
||||
pip_command(*cmd)
|
||||
@ -189,9 +135,18 @@ def install_plugins_file():
|
||||
return True
|
||||
|
||||
|
||||
def update_plugins_file(install_name, remove=False):
|
||||
def update_plugins_file(install_name, full_package=None, version=None, remove=False):
|
||||
"""Add a plugin to the plugins file."""
|
||||
logger.info('Adding plugin to plugins file: %s', install_name)
|
||||
if remove:
|
||||
logger.info('Removing plugin from plugins file: %s', install_name)
|
||||
else:
|
||||
logger.info('Adding plugin to plugins file: %s', install_name)
|
||||
|
||||
# If a full package name is provided, use that instead
|
||||
if full_package and full_package != install_name:
|
||||
new_value = full_package
|
||||
else:
|
||||
new_value = f'{install_name}=={version}' if version else install_name
|
||||
|
||||
pf = settings.PLUGIN_FILE
|
||||
|
||||
@ -201,7 +156,7 @@ def update_plugins_file(install_name, remove=False):
|
||||
|
||||
def compare_line(line: str):
|
||||
"""Check if a line in the file matches the installname."""
|
||||
return line.strip().split('==')[0] == install_name.split('==')[0]
|
||||
return re.match(rf'^{install_name}[\s=@]', line.strip())
|
||||
|
||||
# First, read in existing plugin file
|
||||
try:
|
||||
@ -227,13 +182,13 @@ def update_plugins_file(install_name, remove=False):
|
||||
found = True
|
||||
if not remove:
|
||||
# Replace line with new install name
|
||||
output.append(install_name)
|
||||
output.append(new_value)
|
||||
else:
|
||||
output.append(line)
|
||||
|
||||
# Append plugin to file
|
||||
if not found and not remove:
|
||||
output.append(install_name)
|
||||
output.append(new_value)
|
||||
|
||||
# Write file back to disk
|
||||
try:
|
||||
@ -264,10 +219,8 @@ def install_plugin(url=None, packagename=None, user=None, version=None):
|
||||
|
||||
logger.info('install_plugin: %s, %s', url, packagename)
|
||||
|
||||
plugin_dir = plugins_dir()
|
||||
|
||||
# build up the command
|
||||
install_name = ['install', '-U', '--target', str(plugin_dir)]
|
||||
install_name = ['install', '-U']
|
||||
|
||||
full_pkg = ''
|
||||
|
||||
@ -302,23 +255,25 @@ def install_plugin(url=None, packagename=None, user=None, version=None):
|
||||
ret['result'] = ret['success'] = _('Installed plugin successfully')
|
||||
ret['output'] = str(result, 'utf-8')
|
||||
|
||||
if packagename and (path := check_package_path(packagename)):
|
||||
# Override result information
|
||||
ret['result'] = _(f'Installed plugin into {path}')
|
||||
if packagename and (info := get_install_info(packagename)):
|
||||
if path := info.get('location'):
|
||||
ret['result'] = _(f'Installed plugin into {path}')
|
||||
ret['version'] = info.get('version')
|
||||
|
||||
except subprocess.CalledProcessError as error:
|
||||
handle_pip_error(error, 'plugin_install')
|
||||
|
||||
# Save plugin to plugins file
|
||||
update_plugins_file(full_pkg)
|
||||
if version := ret.get('version'):
|
||||
# Save plugin to plugins file
|
||||
update_plugins_file(packagename, full_package=full_pkg, version=version)
|
||||
|
||||
# Reload the plugin registry, to discover the new plugin
|
||||
from plugin.registry import registry
|
||||
# Reload the plugin registry, to discover the new plugin
|
||||
from plugin.registry import registry
|
||||
|
||||
registry.reload_plugins(full_reload=True, force_reload=True, collect=True)
|
||||
registry.reload_plugins(full_reload=True, force_reload=True, collect=True)
|
||||
|
||||
# Update static files
|
||||
plugin.staticfiles.collect_plugins_static_files()
|
||||
# Update static files
|
||||
plugin.staticfiles.collect_plugins_static_files()
|
||||
|
||||
return ret
|
||||
|
||||
@ -364,10 +319,12 @@ def uninstall_plugin(cfg: plugin.models.PluginConfig, user=None, delete_config=T
|
||||
|
||||
validate_package_plugin(cfg, user)
|
||||
package_name = cfg.package_name
|
||||
logger.info('Uninstalling plugin: %s', package_name)
|
||||
|
||||
if check_package_path(package_name):
|
||||
pkg_info = get_install_info(package_name)
|
||||
|
||||
if path := pkg_info.get('location'):
|
||||
# Uninstall the plugin using pip
|
||||
logger.info('Uninstalling plugin: %s from %s', package_name, path)
|
||||
try:
|
||||
pip_command('uninstall', '-y', package_name)
|
||||
except subprocess.CalledProcessError as error:
|
||||
|
Reference in New Issue
Block a user