mirror of
https://github.com/inventree/InvenTree.git
synced 2026-01-28 00:53:35 +00:00
feat(backend): enable reseting mfa via username from the cli (#11133)
* feat(backend): enable reseting mfa via username * fix tests * extend testing saveguards to username cli
This commit is contained in:
@@ -13,33 +13,59 @@ class Command(BaseCommand):
|
||||
|
||||
def add_arguments(self, parser):
|
||||
"""Add the arguments."""
|
||||
parser.add_argument('mail', type=str)
|
||||
parser.add_argument('--mail', type=str, nargs='?')
|
||||
parser.add_argument('--username', type=str, nargs='?')
|
||||
|
||||
def handle(self, *args, mail, **kwargs):
|
||||
"""Remove MFA for the supplied user (by mail)."""
|
||||
def handle(self, *args, mail, username, **kwargs):
|
||||
"""Remove MFA for the supplied user (by mail or username)."""
|
||||
user = get_user_model()
|
||||
mfa_user = [
|
||||
*set(
|
||||
user.objects.filter(email=mail)
|
||||
| user.objects.filter(emailaddress__email=mail)
|
||||
)
|
||||
]
|
||||
mfa_user = []
|
||||
success = False
|
||||
|
||||
if len(mfa_user) == 0:
|
||||
logger.warning('No user with this mail associated')
|
||||
elif len(mfa_user) > 1:
|
||||
emails_list = ', '.join(
|
||||
sorted(
|
||||
{b.email for a in mfa_user for b in a.emailaddress_set.all()}
|
||||
| {a.email for a in mfa_user}
|
||||
if mail is not None:
|
||||
mfa_user = [
|
||||
*set(
|
||||
user.objects.filter(email=mail)
|
||||
| user.objects.filter(emailaddress__email=mail)
|
||||
)
|
||||
)
|
||||
usernames_list = ', '.join(sorted({a.username for a in mfa_user}))
|
||||
logger.error(
|
||||
f"Multiple users found with the provided email; Usernames: '{usernames_list}', Emails: '{emails_list}'"
|
||||
)
|
||||
]
|
||||
if len(mfa_user) == 0:
|
||||
logger.warning('No user with this mail associated')
|
||||
elif len(mfa_user) > 1:
|
||||
emails_list = ', '.join(
|
||||
sorted(
|
||||
{b.email for a in mfa_user for b in a.emailaddress_set.all()}
|
||||
| {a.email for a in mfa_user}
|
||||
)
|
||||
)
|
||||
usernames_list = ', '.join(sorted({a.username for a in mfa_user}))
|
||||
logger.error(
|
||||
f"Multiple users found with the provided email; Usernames: '{usernames_list}', Emails: '{emails_list}'"
|
||||
)
|
||||
else:
|
||||
# found exactly one user
|
||||
success = True
|
||||
|
||||
elif username is not None:
|
||||
mfa_user = user.objects.filter(username=username)
|
||||
if len(mfa_user) == 0:
|
||||
logger.warning('No user with this username associated')
|
||||
elif (
|
||||
len(mfa_user) > 1
|
||||
): # pragma: no cover # Should not be possible due to unique constraint
|
||||
logger.error('Multiple users found with the provided username')
|
||||
else:
|
||||
# found exactly one user
|
||||
success = True
|
||||
|
||||
else:
|
||||
# and clean out all MFA methods
|
||||
logger.error('No mail or username provided')
|
||||
raise ValueError(
|
||||
'Error: one of the following arguments is required: mail, username'
|
||||
)
|
||||
|
||||
# Clean out all MFA methods
|
||||
if success:
|
||||
auths = mfa_user[0].authenticator_set.all()
|
||||
length = len(auths)
|
||||
auths.delete()
|
||||
|
||||
@@ -19,40 +19,50 @@ class CommandTestCase(TestCase):
|
||||
|
||||
def test_remove_mfa(self):
|
||||
"""Test the remove_mfa command."""
|
||||
|
||||
def get_dummyuser(uname='admin'):
|
||||
admin = User.objects.create_user(
|
||||
username=uname, email=f'{uname}@example.org'
|
||||
)
|
||||
admin.authenticator_set.create(type='TOTP', data={})
|
||||
self.assertEqual(admin.authenticator_set.all().count(), 1)
|
||||
return admin
|
||||
|
||||
# missing arg
|
||||
with self.assertRaises(Exception) as cm:
|
||||
call_command('remove_mfa', verbosity=0)
|
||||
self.assertEqual(
|
||||
'Error: the following arguments are required: mail', str(cm.exception)
|
||||
'Error: one of the following arguments is required: mail, username',
|
||||
str(cm.exception),
|
||||
)
|
||||
|
||||
# no user
|
||||
with self.assertLogs('inventree') as cm:
|
||||
self.assertFalse(
|
||||
call_command('remove_mfa', 'admin@example.org', verbosity=0)
|
||||
call_command('remove_mfa', mail='admin@example.org', verbosity=0)
|
||||
)
|
||||
self.assertIn('No user with this mail associated', str(cm[1]))
|
||||
|
||||
# correct removal
|
||||
my_admin1 = User.objects.create_user(
|
||||
username='admin', email='admin@example.org'
|
||||
)
|
||||
my_admin1.authenticator_set.create(type='TOTP', data={})
|
||||
self.assertEqual(my_admin1.authenticator_set.all().count(), 1)
|
||||
output = call_command('remove_mfa', 'admin@example.org', verbosity=0)
|
||||
my_admin1 = get_dummyuser()
|
||||
output = call_command('remove_mfa', mail=my_admin1.email, verbosity=0)
|
||||
self.assertEqual(output, 'done')
|
||||
self.assertEqual(my_admin1.authenticator_set.all().count(), 0)
|
||||
|
||||
# two users with same email
|
||||
my_admin2 = User.objects.create_user(
|
||||
username='admin2', email='admin@example.org'
|
||||
)
|
||||
my_admin2 = User.objects.create_user(username='admin2', email=my_admin1.email)
|
||||
my_admin2.emailaddress_set.create(email='456')
|
||||
my_admin2.emailaddress_set.create(email='123')
|
||||
with self.assertLogs('inventree') as cm:
|
||||
self.assertFalse(
|
||||
call_command('remove_mfa', 'admin@example.org', verbosity=0)
|
||||
call_command('remove_mfa', mail=my_admin1.email, verbosity=0)
|
||||
)
|
||||
self.assertIn('Multiple users found with the provided email', str(cm[1]))
|
||||
self.assertIn('admin, admin2', str(cm[1]))
|
||||
self.assertIn('123, 456, admin@example.org', str(cm[1]))
|
||||
self.assertIn(f'123, 456, {my_admin1.email}', str(cm[1]))
|
||||
|
||||
# correct removal by username
|
||||
my_admin3 = get_dummyuser('admin3')
|
||||
output = call_command('remove_mfa', username=my_admin3.username, verbosity=0)
|
||||
self.assertEqual(output, 'done')
|
||||
self.assertEqual(my_admin3.authenticator_set.all().count(), 0)
|
||||
|
||||
15
tasks.py
15
tasks.py
@@ -619,14 +619,19 @@ def clean_settings(c):
|
||||
success('Settings cleaned successfully')
|
||||
|
||||
|
||||
@task(help={'mail': "mail of the user who's MFA should be disabled"})
|
||||
def remove_mfa(c, mail=''):
|
||||
@task(
|
||||
help={
|
||||
'mail': "mail of the user who's MFA should be disabled",
|
||||
'username': "username of the user who's MFA should be disabled",
|
||||
}
|
||||
)
|
||||
def remove_mfa(c, mail='', username=''):
|
||||
"""Remove MFA for a user."""
|
||||
if not mail:
|
||||
warning('You must provide a users mail')
|
||||
if not mail and not username:
|
||||
warning('You must provide a users mail or username')
|
||||
return
|
||||
|
||||
manage(c, f'remove_mfa {mail}')
|
||||
manage(c, f'remove_mfa --mail {mail} --username {username}')
|
||||
|
||||
|
||||
@task(
|
||||
|
||||
Reference in New Issue
Block a user