mirror of
https://github.com/inventree/InvenTree.git
synced 2025-06-17 04:25:42 +00:00
Match field step is now managed through form
This commit is contained in:
@ -56,7 +56,7 @@ class FileManager:
|
||||
raw_data = file.read().decode('utf-8')
|
||||
# Reset stream position to beginning of file
|
||||
file.seek(0)
|
||||
elif ext in ['.xls', '.xlsx']:
|
||||
elif ext in ['.xls', '.xlsx', '.json', '.yaml', ]:
|
||||
raw_data = file.read()
|
||||
# Reset stream position to beginning of file
|
||||
file.seek(0)
|
||||
|
@ -5,8 +5,12 @@ Django forms for interacting with common objects
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django import forms
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from InvenTree.forms import HelperForm
|
||||
|
||||
from .files import FileManager
|
||||
from .models import InvenTreeSetting
|
||||
|
||||
|
||||
@ -21,3 +25,77 @@ class SettingEditForm(HelperForm):
|
||||
fields = [
|
||||
'value'
|
||||
]
|
||||
|
||||
|
||||
class UploadFile(forms.Form):
|
||||
""" Step 1 of FileManagementFormView """
|
||||
|
||||
file = forms.FileField(
|
||||
label=_('File'),
|
||||
help_text=_('Select file to upload'),
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
""" Update label and help_text """
|
||||
|
||||
# Get file name
|
||||
name = None
|
||||
if 'name' in kwargs:
|
||||
name = kwargs.pop('name')
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
if name:
|
||||
# Update label and help_text with file name
|
||||
self.fields['file'].label = _(f'{name.title()} File')
|
||||
self.fields['file'].help_text = _(f'Select {name} file to upload')
|
||||
|
||||
def clean_file(self):
|
||||
"""
|
||||
Run tabular file validation.
|
||||
If anything is wrong with the file, it will raise ValidationError
|
||||
"""
|
||||
|
||||
file = self.cleaned_data['file']
|
||||
|
||||
# Validate file using FileManager class - will perform initial data validation
|
||||
# (and raise a ValidationError if there is something wrong with the file)
|
||||
FileManager.validate(file)
|
||||
|
||||
return file
|
||||
|
||||
|
||||
class MatchField(forms.Form):
|
||||
""" Step 2 of FileManagementFormView """
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
||||
# Get FileManager
|
||||
file_manager = None
|
||||
if 'file_manager' in kwargs:
|
||||
file_manager = kwargs.pop('file_manager')
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Setup headers
|
||||
file_manager.setup()
|
||||
columns = file_manager.columns()
|
||||
# Find headers choices
|
||||
headers_choices = [(header, header) for header in file_manager.HEADERS]
|
||||
|
||||
# Create column fields
|
||||
for col in columns:
|
||||
field_name = col['name']
|
||||
self.fields[field_name] = forms.ChoiceField(
|
||||
choices=[('', '-' * 10)] + headers_choices,
|
||||
required=False,
|
||||
)
|
||||
if col['guess']:
|
||||
self.fields[field_name].initial = col['guess']
|
||||
|
||||
|
||||
class MatchItem(forms.Form):
|
||||
""" Step 3 of FileManagementFormView """
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -19,6 +19,7 @@ from InvenTree.helpers import str2bool
|
||||
|
||||
from . import models
|
||||
from . import forms
|
||||
from .files import FileManager
|
||||
|
||||
|
||||
class SettingEdit(AjaxUpdateView):
|
||||
@ -164,3 +165,283 @@ class MultiStepFormView(SessionWizardView):
|
||||
context.update({'description': description})
|
||||
|
||||
return context
|
||||
|
||||
|
||||
class FileManagementFormView(MultiStepFormView):
|
||||
""" Setup form wizard to perform the following steps:
|
||||
1. Upload tabular data file
|
||||
2. Match headers to InvenTree fields
|
||||
3. Edit row data and match InvenTree items
|
||||
"""
|
||||
|
||||
name = None
|
||||
form_list = [
|
||||
('upload', forms.UploadFile),
|
||||
('fields', forms.MatchField),
|
||||
('items', forms.MatchItem),
|
||||
]
|
||||
form_steps_description = [
|
||||
_("Upload File"),
|
||||
_("Match Fields"),
|
||||
_("Match Items"),
|
||||
]
|
||||
media_folder = 'file_upload/'
|
||||
extra_context_data = {}
|
||||
|
||||
def get_context_data(self, form, **kwargs):
|
||||
context = super().get_context_data(form=form, **kwargs)
|
||||
|
||||
if self.steps.current == 'fields':
|
||||
# Get columns and row data
|
||||
columns = self.file_manager.columns()
|
||||
rows = self.file_manager.rows()
|
||||
# Optimize for template
|
||||
for row in rows:
|
||||
row_data = row['data']
|
||||
|
||||
data = []
|
||||
|
||||
for idx, item in enumerate(row_data):
|
||||
data.append({
|
||||
'cell': item,
|
||||
'idx': idx,
|
||||
'column': columns[idx]
|
||||
})
|
||||
|
||||
row['data'] = data
|
||||
|
||||
context.update({'rows': rows})
|
||||
|
||||
# Load extra context data
|
||||
print(f'{self.extra_context_data=}')
|
||||
for key, items in self.extra_context_data.items():
|
||||
context.update({key: items})
|
||||
|
||||
return context
|
||||
|
||||
def getFileManager(self, step=None, form=None):
|
||||
""" Get FileManager instance from uploaded file """
|
||||
|
||||
if self.file_manager:
|
||||
return
|
||||
|
||||
if step is not None:
|
||||
# Retrieve stored files from upload step
|
||||
upload_files = self.storage.get_step_files('upload')
|
||||
if upload_files:
|
||||
# Get file
|
||||
file = upload_files.get('upload-file', None)
|
||||
if file:
|
||||
self.file_manager = FileManager(file=file, name=self.name)
|
||||
|
||||
def get_form_kwargs(self, step=None):
|
||||
""" Update kwargs to dynamically build forms """
|
||||
|
||||
print(f'[STEP] {step}')
|
||||
|
||||
# Always retrieve FileManager instance from uploaded file
|
||||
self.getFileManager(step)
|
||||
|
||||
if step == 'upload':
|
||||
if self.name:
|
||||
# Dynamically build upload form
|
||||
kwargs = {
|
||||
'name': self.name
|
||||
}
|
||||
return kwargs
|
||||
elif step == 'fields':
|
||||
if self.file_manager:
|
||||
# Dynamically build match field form
|
||||
kwargs = {
|
||||
'file_manager': self.file_manager
|
||||
}
|
||||
return kwargs
|
||||
|
||||
return super().get_form_kwargs()
|
||||
|
||||
def getFormTableData(self, form_data):
|
||||
""" Extract table cell data from form data.
|
||||
These data are used to maintain state between sessions.
|
||||
|
||||
Table data keys are as follows:
|
||||
|
||||
col_name_<idx> - Column name at idx as provided in the uploaded file
|
||||
col_guess_<idx> - Column guess at idx as selected
|
||||
row_<x>_col<y> - Cell data as provided in the uploaded file
|
||||
|
||||
"""
|
||||
|
||||
# Store extra context data
|
||||
self.extra_context_data = {}
|
||||
|
||||
# Map the columns
|
||||
self.column_names = {}
|
||||
self.column_selections = {}
|
||||
|
||||
self.row_data = {}
|
||||
|
||||
for item in form_data:
|
||||
# print(f'{item} | {form_data[item]}')
|
||||
value = form_data[item]
|
||||
|
||||
# Column names as passed as col_name_<idx> where idx is an integer
|
||||
|
||||
# Extract the column names
|
||||
if item.startswith('col_name_'):
|
||||
try:
|
||||
col_id = int(item.replace('col_name_', ''))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
self.column_names[value] = col_id
|
||||
|
||||
# Extract the column selections (in the 'select fields' view)
|
||||
if item.startswith('fields-'):
|
||||
|
||||
try:
|
||||
col_name = item.replace('fields-', '')
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
self.column_selections[col_name] = value
|
||||
|
||||
# Extract the row data
|
||||
if item.startswith('row_'):
|
||||
# Item should be of the format row_<r>_col_<c>
|
||||
s = item.split('_')
|
||||
|
||||
if len(s) < 4:
|
||||
continue
|
||||
|
||||
# Ignore row/col IDs which are not correct numeric values
|
||||
try:
|
||||
row_id = int(s[1])
|
||||
col_id = int(s[3])
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if row_id not in self.row_data:
|
||||
self.row_data[row_id] = {}
|
||||
|
||||
self.row_data[row_id][col_id] = value
|
||||
|
||||
# self.col_ids = sorted(self.column_names.keys())
|
||||
|
||||
# Re-construct the data table
|
||||
self.rows = []
|
||||
|
||||
for row_idx in sorted(self.row_data.keys()):
|
||||
row = self.row_data[row_idx]
|
||||
items = []
|
||||
|
||||
for col_idx in sorted(row.keys()):
|
||||
|
||||
value = row[col_idx]
|
||||
items.append(value)
|
||||
|
||||
self.rows.append({
|
||||
'index': row_idx,
|
||||
'data': items,
|
||||
'errors': {},
|
||||
})
|
||||
|
||||
# Construct the column data
|
||||
self.columns = []
|
||||
|
||||
# Track any duplicate column selections
|
||||
duplicates = []
|
||||
|
||||
for col in self.column_names:
|
||||
|
||||
if col in self.column_selections:
|
||||
guess = self.column_selections[col]
|
||||
else:
|
||||
guess = None
|
||||
|
||||
header = ({
|
||||
'name': self.column_names[col],
|
||||
'guess': guess
|
||||
})
|
||||
|
||||
if guess:
|
||||
n = list(self.column_selections.values()).count(self.column_selections[col])
|
||||
if n > 1:
|
||||
header['duplicate'] = True
|
||||
duplicates.append(col)
|
||||
|
||||
self.columns.append(header)
|
||||
|
||||
# Are there any missing columns?
|
||||
missing_columns = []
|
||||
|
||||
# Check that all required fields are present
|
||||
for col in self.file_manager.REQUIRED_HEADERS:
|
||||
if col not in self.column_selections.values():
|
||||
missing_columns.append(col)
|
||||
|
||||
# Check that at least one of the part match field is present
|
||||
part_match_found = False
|
||||
for col in self.file_manager.PART_MATCH_HEADERS:
|
||||
if col in self.column_selections.values():
|
||||
part_match_found = True
|
||||
break
|
||||
|
||||
# If not, notify user
|
||||
if not part_match_found:
|
||||
for col in self.file_manager.PART_MATCH_HEADERS:
|
||||
missing_columns.append(col)
|
||||
|
||||
# Store extra context data
|
||||
self.extra_context_data['missing_columns'] = missing_columns
|
||||
self.extra_context_data['duplicates'] = duplicates
|
||||
|
||||
def checkFieldSelection(self, form):
|
||||
""" Check field matching """
|
||||
|
||||
# Extract form data
|
||||
self.getFormTableData(form.data)
|
||||
|
||||
valid = len(self.extra_context_data.get('missing_columns', [])) == 0 and not self.extra_context_data.get('duplicates', [])
|
||||
|
||||
return valid
|
||||
|
||||
def validate(self, step, form):
|
||||
""" Validate forms """
|
||||
|
||||
valid = False
|
||||
|
||||
# Process steps
|
||||
if step == 'upload':
|
||||
# Validation is done during POST
|
||||
valid = True
|
||||
elif step == 'fields':
|
||||
# Validate user form data
|
||||
valid = self.checkFieldSelection(form)
|
||||
|
||||
if not valid:
|
||||
form.add_error(None, 'Fields matching failed')
|
||||
|
||||
elif step == 'items':
|
||||
# valid = self.checkPartSelection(form)
|
||||
|
||||
# if not valid:
|
||||
# form.add_error(None, 'Items matching failed')
|
||||
pass
|
||||
|
||||
return valid
|
||||
|
||||
def post(self, request, *args, **kwargs):
|
||||
""" Perform validations before posting data """
|
||||
|
||||
wizard_goto_step = self.request.POST.get('wizard_goto_step', None)
|
||||
|
||||
form = self.get_form(data=self.request.POST, files=self.request.FILES)
|
||||
|
||||
form_valid = self.validate(self.steps.current, form)
|
||||
|
||||
if not form_valid and not wizard_goto_step:
|
||||
# Re-render same step
|
||||
return self.render(form)
|
||||
|
||||
print('\nPosting... ')
|
||||
return super().post(*args, **kwargs)
|
||||
|
Reference in New Issue
Block a user