mirror of
https://github.com/inventree/InvenTree.git
synced 2026-07-04 14:10:52 +00:00
[import] specify fk lookup field (#12180)
* Raise error on multiple matches * add new field to handle lookup_field selection * Add unit tests * Update frontend * Bump API version * Ensure string-iness of lookup field
This commit is contained in:
@@ -1,11 +1,14 @@
|
||||
"""InvenTree API version information."""
|
||||
|
||||
# InvenTree API version
|
||||
INVENTREE_API_VERSION = 506
|
||||
INVENTREE_API_VERSION = 507
|
||||
"""Increment this API version number whenever there is a significant change to the API that any clients need to know about."""
|
||||
|
||||
INVENTREE_API_TEXT = """
|
||||
|
||||
v507 -> 2026-06-16 : https://github.com/inventree/InvenTree/pull/12180
|
||||
- Adds "lookup_field" parameter to the DataImportSessionSerializer, which allows for more flexible lookup of related objects during data import operations
|
||||
|
||||
v506 -> 2026-06-15 : https://github.com/inventree/InvenTree/pull/12168
|
||||
- Reduce permissions scope for a number of API endpoints, to improve security and ensure that users only have access to the data they need
|
||||
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
# Generated migration
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("importer", "0005_dataimportsession_update_records"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="dataimportcolumnmap",
|
||||
name="lookup_field",
|
||||
field=models.CharField(
|
||||
blank=True,
|
||||
null=True,
|
||||
max_length=100,
|
||||
verbose_name="Lookup Field",
|
||||
help_text="Database field to use for foreign-key lookup. Leave blank for automatic lookup.",
|
||||
),
|
||||
),
|
||||
]
|
||||
@@ -152,6 +152,36 @@ class DataImportSession(models.Model):
|
||||
|
||||
return supported_models().get(self.model_type, None)
|
||||
|
||||
def get_lookup_fields_for_field(self, field_name: str) -> list:
|
||||
"""Return the valid lookup fields for a given related (FK) field.
|
||||
|
||||
Returns a list of field names that can be used as a lookup key,
|
||||
consisting of 'pk' plus any fields defined in IMPORT_ID_FIELDS on the related model.
|
||||
"""
|
||||
model = self.get_related_model(field_name)
|
||||
|
||||
if not model:
|
||||
return ['pk']
|
||||
|
||||
id_fields = ['pk']
|
||||
|
||||
if custom_fields := getattr(model, 'IMPORT_ID_FIELDS', None):
|
||||
id_fields += custom_fields
|
||||
|
||||
return id_fields
|
||||
|
||||
@property
|
||||
def field_lookup_mapping(self) -> dict:
|
||||
"""Return a dict of field -> lookup_field mappings for this import session.
|
||||
|
||||
Only entries where lookup_field is explicitly set are included.
|
||||
"""
|
||||
return {
|
||||
mapping.field: mapping.lookup_field
|
||||
for mapping in self.column_mappings.all()
|
||||
if mapping.lookup_field
|
||||
}
|
||||
|
||||
def get_related_model(self, field_name: str) -> Optional[models.Model]:
|
||||
"""Return the related model for a given field name.
|
||||
|
||||
@@ -408,6 +438,11 @@ class DataImportSession(models.Model):
|
||||
if field.get('read_only', False):
|
||||
continue
|
||||
|
||||
if field.get('type') == 'related field':
|
||||
field['lookup_fields'] = self.get_lookup_fields_for_field(
|
||||
field_name
|
||||
)
|
||||
|
||||
fields[field_name] = field
|
||||
|
||||
# Cache the available fields against this instance
|
||||
@@ -495,6 +530,22 @@ class DataImportColumnMap(models.Model):
|
||||
if field_def.get('read_only', False):
|
||||
raise DjangoValidationError({'field': _('Selected field is read-only')})
|
||||
|
||||
if self.lookup_field:
|
||||
if field_def.get('type') != 'related field':
|
||||
raise DjangoValidationError({
|
||||
'lookup_field': _(
|
||||
'Lookup field can only be set for related (foreign-key) fields'
|
||||
)
|
||||
})
|
||||
|
||||
valid_lookup_fields = self.session.get_lookup_fields_for_field(self.field)
|
||||
if self.lookup_field not in valid_lookup_fields:
|
||||
raise DjangoValidationError({
|
||||
'lookup_field': _(
|
||||
'Invalid lookup field. Valid options are: {options}'
|
||||
).format(options=', '.join(valid_lookup_fields))
|
||||
})
|
||||
|
||||
session = models.ForeignKey(
|
||||
DataImportSession,
|
||||
on_delete=models.CASCADE,
|
||||
@@ -506,6 +557,16 @@ class DataImportColumnMap(models.Model):
|
||||
|
||||
column = models.CharField(blank=True, max_length=100, verbose_name=_('Column'))
|
||||
|
||||
lookup_field = models.CharField(
|
||||
blank=True,
|
||||
null=True,
|
||||
max_length=100,
|
||||
verbose_name=_('Lookup Field'),
|
||||
help_text=_(
|
||||
'Database field to use for foreign-key lookup. Leave blank for automatic lookup.'
|
||||
),
|
||||
)
|
||||
|
||||
@property
|
||||
def available_fields(self):
|
||||
"""Return a list of available fields for this import session.
|
||||
@@ -638,9 +699,12 @@ class DataImportRow(models.Model):
|
||||
default_values = self.default_values
|
||||
|
||||
data = {}
|
||||
extract_errors = {}
|
||||
|
||||
self.related_field_map = {}
|
||||
|
||||
field_lookup_mapping = self.session.field_lookup_mapping
|
||||
|
||||
# We have mapped column (file) to field (serializer) already
|
||||
for field, col in field_mapping.items():
|
||||
# Data override (force value and skip any further checks)
|
||||
@@ -668,7 +732,13 @@ class DataImportRow(models.Model):
|
||||
elif field_type == 'date':
|
||||
value = self.convert_date_field(value)
|
||||
elif field_type == 'related field':
|
||||
value = self.lookup_related_field(field, value)
|
||||
try:
|
||||
value = self.lookup_related_field(
|
||||
field, value, lookup_field=field_lookup_mapping.get(field)
|
||||
)
|
||||
except DjangoValidationError as exc:
|
||||
extract_errors[field] = exc.message
|
||||
continue
|
||||
|
||||
# Use the default value, if provided
|
||||
if value is None and field in default_values:
|
||||
@@ -710,6 +780,9 @@ class DataImportRow(models.Model):
|
||||
|
||||
self.data = data
|
||||
|
||||
if extract_errors:
|
||||
self.errors = extract_errors
|
||||
|
||||
if commit:
|
||||
self.save()
|
||||
|
||||
@@ -733,7 +806,9 @@ class DataImportRow(models.Model):
|
||||
# If none of the formats matched, return the original value
|
||||
return value
|
||||
|
||||
def lookup_related_field(self, field_name: str, value: str) -> Optional[int]:
|
||||
def lookup_related_field(
|
||||
self, field_name: str, value: str, lookup_field: Optional[str] = None
|
||||
) -> Optional[int]:
|
||||
"""Try to perform lookup against a related field.
|
||||
|
||||
- This is used to convert a human-readable value (e.g. a supplier name) into a database reference (e.g. supplier ID).
|
||||
@@ -742,6 +817,7 @@ class DataImportRow(models.Model):
|
||||
Arguments:
|
||||
field_name: The name of the field to perform the lookup against
|
||||
value: The value to be looked up
|
||||
lookup_field: If provided, only query this specific model field (skips auto-lookup)
|
||||
|
||||
Returns:
|
||||
A primary key value
|
||||
@@ -765,21 +841,35 @@ class DataImportRow(models.Model):
|
||||
'session': f'No related model found for field: {field_name}'
|
||||
})
|
||||
|
||||
valid_items = set()
|
||||
|
||||
base_filters = (
|
||||
self.session.field_filters.get(field_name, {})
|
||||
if self.session.field_filters
|
||||
else {}
|
||||
)
|
||||
|
||||
# First priority is the PK (primary key) field
|
||||
if lookup_field and type(lookup_field) is str:
|
||||
# A specific lookup field has been chosen by the user — query only that field
|
||||
try:
|
||||
queryset = model.objects.filter(**{lookup_field: value}, **base_filters)
|
||||
except ValueError:
|
||||
return value
|
||||
|
||||
results = list(queryset[:2])
|
||||
|
||||
if len(results) == 1:
|
||||
return results[0].pk
|
||||
|
||||
# Zero or multiple results — return raw value and let serializer report the error
|
||||
return value
|
||||
|
||||
# Auto-lookup: try pk first, then any model-defined IMPORT_ID_FIELDS
|
||||
id_fields = ['pk']
|
||||
|
||||
if custom_id_fields := getattr(model, 'IMPORT_ID_FIELDS', None):
|
||||
id_fields += custom_id_fields
|
||||
|
||||
# Iterate through the provided list - if any of the values match, we can perform the lookup
|
||||
valid_items = set()
|
||||
|
||||
for id_field in id_fields:
|
||||
try:
|
||||
queryset = model.objects.filter(**{id_field: value}, **base_filters)
|
||||
@@ -789,15 +879,19 @@ class DataImportRow(models.Model):
|
||||
# Evaluate at most two results to determine if there is exactly one match
|
||||
results = list(queryset[:2])
|
||||
if len(results) == 1:
|
||||
# We have a single match against this field
|
||||
valid_items.add(results[0].pk)
|
||||
|
||||
if len(valid_items) == 1:
|
||||
# We found a single valid match against the related model - return this value
|
||||
return valid_items.pop()
|
||||
|
||||
# We found either zero or multiple values matching against the related model
|
||||
# Return the original value and let the serializer validation handle any errors against this field
|
||||
if len(valid_items) > 1:
|
||||
raise DjangoValidationError(
|
||||
_(
|
||||
'Multiple matches found for value - please ensure the value is unique, or select a specific lookup field'
|
||||
)
|
||||
)
|
||||
|
||||
# No match found - return the original value and let the serializer validation handle it
|
||||
return value
|
||||
|
||||
def serializer_data(self):
|
||||
@@ -844,6 +938,10 @@ class DataImportRow(models.Model):
|
||||
# Row has already been completed
|
||||
return True
|
||||
|
||||
if self.errors:
|
||||
# Errors were set during data extraction (e.g. ambiguous FK lookup)
|
||||
return False
|
||||
|
||||
if self.session.update_records:
|
||||
# Extract the ID field from the data
|
||||
instance_id = self.data.get(self.session.ID_FIELD_LABEL, None)
|
||||
|
||||
@@ -23,7 +23,15 @@ class DataImportColumnMapSerializer(InvenTreeModelSerializer):
|
||||
"""Meta class options for the serializer."""
|
||||
|
||||
model = importer.models.DataImportColumnMap
|
||||
fields = ['pk', 'session', 'column', 'field', 'label', 'description']
|
||||
fields = [
|
||||
'pk',
|
||||
'session',
|
||||
'column',
|
||||
'field',
|
||||
'label',
|
||||
'description',
|
||||
'lookup_field',
|
||||
]
|
||||
read_only_fields = ['field', 'session']
|
||||
|
||||
label = serializers.CharField(read_only=True)
|
||||
|
||||
@@ -80,6 +80,72 @@ class ImporterTest(ImporterMixin, InvenTreeTestCase):
|
||||
def test_field_defaults(self):
|
||||
"""Test default field values."""
|
||||
|
||||
def test_lookup_field_ambiguous_match(self):
|
||||
"""Test the behavior of lookup_related_field for ambiguous and pinned matches."""
|
||||
from django.core.exceptions import ValidationError as DjangoValidationError
|
||||
|
||||
from part.models import Part, PartCategory
|
||||
|
||||
category = PartCategory.objects.create(
|
||||
name='Test Category', description='Test category'
|
||||
)
|
||||
|
||||
# Two parts which collide under different IMPORT_ID_FIELDS ('IPN' and 'name')
|
||||
part_a = Part.objects.create(
|
||||
category=category, name='Widget', description='desc', IPN='AMBIG-001'
|
||||
)
|
||||
Part.objects.create(
|
||||
category=category, name='AMBIG-001', description='desc', IPN='WIDGET-002'
|
||||
)
|
||||
|
||||
data_file = self.helper_file('companies.csv')
|
||||
session = DataImportSession.objects.create(
|
||||
data_file=data_file, model_type='stockitem'
|
||||
)
|
||||
|
||||
row = DataImportRow(session=session)
|
||||
row.related_field_map = {}
|
||||
|
||||
# Auto-lookup (no pinned lookup field) raises, as the value matches two different parts
|
||||
with self.assertRaises(DjangoValidationError):
|
||||
row.lookup_related_field('part', 'AMBIG-001')
|
||||
|
||||
# Pinning the lookup field to 'IPN' resolves the match unambiguously
|
||||
result = row.lookup_related_field('part', 'AMBIG-001', lookup_field='IPN')
|
||||
self.assertEqual(result, part_a.pk)
|
||||
|
||||
# Pinning the lookup field to 'name' resolves to the *other* part
|
||||
result = row.lookup_related_field('part', 'AMBIG-001', lookup_field='name')
|
||||
self.assertNotEqual(result, part_a.pk)
|
||||
|
||||
def test_lookup_field_validation(self):
|
||||
"""Test that DataImportColumnMap.clean() validates the lookup_field value."""
|
||||
from django.core.exceptions import ValidationError as DjangoValidationError
|
||||
|
||||
data_file = self.helper_file('companies.csv')
|
||||
session = DataImportSession.objects.create(
|
||||
data_file=data_file, model_type='stockitem'
|
||||
)
|
||||
|
||||
part_mapping = session.column_mappings.get(field='part')
|
||||
quantity_mapping = session.column_mappings.get(field='quantity')
|
||||
|
||||
# Valid lookup field for a related (FK) field
|
||||
part_mapping.lookup_field = 'IPN'
|
||||
part_mapping.save()
|
||||
part_mapping.refresh_from_db()
|
||||
self.assertEqual(part_mapping.lookup_field, 'IPN')
|
||||
|
||||
# Invalid lookup field (not a valid IMPORT_ID_FIELDS option)
|
||||
part_mapping.lookup_field = 'not_a_real_field'
|
||||
with self.assertRaises(DjangoValidationError):
|
||||
part_mapping.save()
|
||||
|
||||
# lookup_field cannot be set against a non-related field
|
||||
quantity_mapping.lookup_field = 'IPN'
|
||||
with self.assertRaises(DjangoValidationError):
|
||||
quantity_mapping.save()
|
||||
|
||||
|
||||
class ImportAPITest(ImporterMixin, InvenTreeAPITestCase):
|
||||
"""End-to-end tests for the importer API."""
|
||||
@@ -181,6 +247,58 @@ class ImportAPITest(ImporterMixin, InvenTreeAPITestCase):
|
||||
# Check that there are new database records
|
||||
self.assertEqual(PartCategory.objects.count(), N + 4)
|
||||
|
||||
def test_column_mapping_lookup_field(self):
|
||||
"""Test that the 'lookup_field' option can be specified via the column-mapping API."""
|
||||
f = self.helper_file('companies.csv')
|
||||
|
||||
session = DataImportSession.objects.create(
|
||||
data_file=f, model_type='stockitem', user=self.user
|
||||
)
|
||||
|
||||
self.assignRole('stock.change')
|
||||
|
||||
# available_fields should expose the valid lookup field options for the 'part' FK field
|
||||
session_detail = self.get(
|
||||
reverse('api-import-session-detail', kwargs={'pk': session.pk})
|
||||
).data
|
||||
part_field_info = session_detail['available_fields']['part']
|
||||
self.assertIn('lookup_fields', part_field_info)
|
||||
self.assertIn('IPN', part_field_info['lookup_fields'])
|
||||
self.assertIn('name', part_field_info['lookup_fields'])
|
||||
self.assertIn('pk', part_field_info['lookup_fields'])
|
||||
|
||||
part_mapping = session.column_mappings.get(field='part')
|
||||
quantity_mapping = session.column_mappings.get(field='quantity')
|
||||
|
||||
mapping_url = reverse(
|
||||
'api-importer-mapping-detail', kwargs={'pk': part_mapping.pk}
|
||||
)
|
||||
|
||||
# Initially, no lookup field is set (defaults to 'auto')
|
||||
data = self.get(mapping_url).data
|
||||
self.assertIn(data['lookup_field'], [None, ''])
|
||||
|
||||
# Set a valid lookup field
|
||||
data = self.patch(mapping_url, {'lookup_field': 'IPN'}, expected_code=200).data
|
||||
self.assertEqual(data['lookup_field'], 'IPN')
|
||||
|
||||
# Confirm it persists
|
||||
data = self.get(mapping_url).data
|
||||
self.assertEqual(data['lookup_field'], 'IPN')
|
||||
|
||||
# An invalid lookup field is rejected
|
||||
self.patch(mapping_url, {'lookup_field': 'not_a_real_field'}, expected_code=400)
|
||||
|
||||
# Clear the lookup field (revert to 'auto')
|
||||
data = self.patch(mapping_url, {'lookup_field': None}, expected_code=200).data
|
||||
self.assertIn(data['lookup_field'], [None, ''])
|
||||
|
||||
# lookup_field cannot be set against a non-related field
|
||||
quantity_url = reverse(
|
||||
'api-importer-mapping-detail', kwargs={'pk': quantity_mapping.pk}
|
||||
)
|
||||
self.patch(quantity_url, {'lookup_field': 'IPN'}, expected_code=400)
|
||||
|
||||
def test_session_list(self):
|
||||
"""Test API endpoint which details the list of import sessions."""
|
||||
url = reverse('api-importer-session-list')
|
||||
|
||||
@@ -179,6 +179,55 @@ function ImporterDefaultField({
|
||||
);
|
||||
}
|
||||
|
||||
function ImporterLookupFieldSelector({
|
||||
column,
|
||||
session
|
||||
}: Readonly<{ column: any; session: ImportSessionState }>) {
|
||||
const api = useApi();
|
||||
|
||||
const fieldDef = session.availableFields[column.field];
|
||||
const lookupFields: string[] = fieldDef?.lookup_fields ?? [];
|
||||
|
||||
const [selected, setSelected] = useState<string>(column.lookup_field ?? '');
|
||||
|
||||
useEffect(() => {
|
||||
setSelected(column.lookup_field ?? '');
|
||||
}, [column.lookup_field]);
|
||||
|
||||
if (lookupFields.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const options = [
|
||||
{ value: '', label: t`Auto` },
|
||||
...lookupFields.map((f: string) => ({ value: f, label: f }))
|
||||
];
|
||||
|
||||
const onChange = useCallback(
|
||||
(value: string | null) => {
|
||||
const next = value ?? '';
|
||||
api
|
||||
.patch(
|
||||
apiUrl(ApiEndpoints.import_session_column_mapping_list, column.pk),
|
||||
{ lookup_field: next || null }
|
||||
)
|
||||
.then(() => setSelected(next))
|
||||
.catch(() => {});
|
||||
},
|
||||
[column.pk]
|
||||
);
|
||||
|
||||
return (
|
||||
<Select
|
||||
aria-label={`import-lookup-field-${column.field}`}
|
||||
data={options}
|
||||
value={selected}
|
||||
onChange={onChange}
|
||||
size='sm'
|
||||
/>
|
||||
);
|
||||
}
|
||||
|
||||
function ImporterColumnTableRow({
|
||||
session,
|
||||
column,
|
||||
@@ -210,6 +259,9 @@ function ImporterColumnTableRow({
|
||||
<Table.Td>
|
||||
<ImporterColumn column={column} options={options} />
|
||||
</Table.Td>
|
||||
<Table.Td>
|
||||
<ImporterLookupFieldSelector column={column} session={session} />
|
||||
</Table.Td>
|
||||
<Table.Td>
|
||||
<ImporterDefaultField
|
||||
fieldName={column.field}
|
||||
@@ -285,6 +337,7 @@ export default function ImporterColumnSelector({
|
||||
<Table.Th>{t`Database Field`}</Table.Th>
|
||||
<Table.Th>{t`Field Description`}</Table.Th>
|
||||
<Table.Th>{t`Imported Column`}</Table.Th>
|
||||
<Table.Th>{t`Lookup Field`}</Table.Th>
|
||||
<Table.Th>{t`Default Value`}</Table.Th>
|
||||
</Table.Tr>
|
||||
</Table.Thead>
|
||||
|
||||
Reference in New Issue
Block a user