2
0
mirror of https://github.com/inventree/InvenTree.git synced 2025-09-13 22:21:37 +00:00

Import update (#10188)

* Add field to "update" existing records

* Ensure the ID is first

* Prevent editing of "ID" field

* Extract db instance

* Bump API version

* Prevent edit of "id" field

* Refactoring

* Enhanced playwright tests for data importing

* Update docs

* Update src/backend/InvenTree/importer/models.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update src/frontend/src/forms/ImporterForms.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Fix silly AI mistake

* Fix for table pagination

- Ensure page does not exceed available records

* Bug fix for playwright test

* Add end-to-end API testing

* Fix unit tests

* Adjust table page logic

* Ensure sensible page size

* Simplify playwright test

* Simplify test again

* Tweak unit test

- Importing has invalidated the BOM?

* Adjust playwright tests

* Further playwright fixes

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Oliver
2025-08-20 15:34:49 +10:00
committed by GitHub
parent e44008f528
commit 885ec81a08
21 changed files with 351 additions and 51 deletions

View File

@@ -1,11 +1,14 @@
"""InvenTree API version information."""
# InvenTree API version
INVENTREE_API_VERSION = 386
INVENTREE_API_VERSION = 387
"""Increment this API version number whenever there is a significant change to the API that any clients need to know about."""
INVENTREE_API_TEXT = """
v387 -> 2025-08-19 : https://github.com/inventree/InvenTree/pull/10188
- Adds "update_records" field to the DataImportSession API
v386 -> 2025-08-11 : https://github.com/inventree/InvenTree/pull/8191
- Adds "consumed" field to the BuildItem API
- Adds API endpoint to consume stock against a BuildOrder

View File

@@ -0,0 +1,22 @@
# Generated by Django 4.2.23 on 2025-08-18 13:57
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("importer", "0004_alter_dataimportsession_model_type"),
]
operations = [
migrations.AddField(
model_name="dataimportsession",
name="update_records",
field=models.BooleanField(
default=False,
help_text="If enabled, existing records will be updated with new data",
verbose_name="Update Existing Records",
),
),
]

View File

@@ -1,6 +1,7 @@
"""Model definitions for the 'importer' app."""
import json
from collections import OrderedDict
from typing import Optional
from django.contrib.auth.models import User
@@ -39,6 +40,8 @@ class DataImportSession(models.Model):
field_filters: JSONField for field filter values - optional field API filters
"""
ID_FIELD_LABEL = 'id'
class ModelChoices(RenderChoices):
"""Model choices for data import sessions."""
@@ -118,6 +121,12 @@ class DataImportSession(models.Model):
validators=[importer.validators.validate_field_defaults],
)
update_records = models.BooleanField(
default=False,
verbose_name=_('Update Existing Records'),
help_text=_('If enabled, existing records will be updated with new data'),
)
@property
def field_mapping(self) -> dict:
"""Construct a dict of field mappings for this import session.
@@ -351,13 +360,25 @@ class DataImportSession(models.Model):
metadata = InvenTreeMetadata()
fields = OrderedDict()
if self.update_records:
# If we are updating records, ensure the ID field is included
fields[self.ID_FIELD_LABEL] = {
'label': _('ID'),
'help_text': _('Existing database identifier for the record'),
'type': 'integer',
'required': True,
'read_only': False,
}
if serializer_class := self.serializer_class:
serializer = serializer_class(data={}, importing=True)
fields = metadata.get_serializer_info(serializer)
else:
fields = {}
fields.update(metadata.get_serializer_info(serializer))
# Cache the available fields against this instance
self._available_fields = fields
return fields
def required_fields(self) -> dict:
@@ -370,6 +391,10 @@ class DataImportSession(models.Model):
if info.get('required', False):
required[field] = info
elif self.update_records and field == self.ID_FIELD_LABEL:
# If we are updating records, the ID field is required
required[field] = info
return required
@@ -630,11 +655,13 @@ class DataImportRow(models.Model):
return data
def construct_serializer(self, request=None):
def construct_serializer(self, instance=None, request=None):
"""Construct a serializer object for this row."""
if serializer_class := self.session.serializer_class:
return serializer_class(
data=self.serializer_data(), context={'request': request}
instance=instance,
data=self.serializer_data(),
context={'request': request},
)
def validate(self, commit=False, request=None) -> bool:
@@ -654,7 +681,26 @@ class DataImportRow(models.Model):
# Row has already been completed
return True
serializer = self.construct_serializer(request=request)
if self.session.update_records:
# Extract the ID field from the data
instance_id = self.data.get(self.session.ID_FIELD_LABEL, None)
if not instance_id:
raise DjangoValidationError(
_('ID is required for updating existing records.')
)
try:
instance = self.session.model_class.objects.get(pk=instance_id)
except self.session.model_class.DoesNotExist:
raise DjangoValidationError(_('No record found with the provided ID.'))
except ValueError:
raise DjangoValidationError(_('Invalid ID format provided.'))
serializer = self.construct_serializer(instance=instance, request=request)
else:
serializer = self.construct_serializer(request=request)
if not serializer:
self.errors = {

View File

@@ -41,6 +41,7 @@ class DataImportSessionSerializer(InvenTreeModelSerializer):
'pk',
'timestamp',
'data_file',
'update_records',
'model_type',
'available_fields',
'status',

View File

@@ -0,0 +1,6 @@
ID,Name,Description,Default Location,Default keywords,Level,Parent Category,Parts,Subcategories,Path,Starred,Structural,Icon,Parent default location
23,Category 0,"Part category, level 1",,,0,,0,5,Category 0,False,False,,
1,Electronics,Electronic components and systems,,,0,,135,12,Electronics,False,False,,
17,Furniture,Furniture and associated things,,,0,,22,2,Furniture,False,False,,
2,Mechanical,Mechanical components,,,0,,263,3,Mechanical,False,False,,
20,Paint,"Paints, inks, etc",,,0,,5,0,Paint,False,False,,
1 ID Name Description Default Location Default keywords Level Parent Category Parts Subcategories Path Starred Structural Icon Parent default location
2 23 Category 0 Part category, level 1 0 0 5 Category 0 False False
3 1 Electronics Electronic components and systems 0 135 12 Electronics False False
4 17 Furniture Furniture and associated things 0 22 2 Furniture False False
5 2 Mechanical Mechanical components 0 263 3 Mechanical False False
6 20 Paint Paints, inks, etc 0 5 0 Paint False False

View File

@@ -3,25 +3,23 @@
import os
from django.core.files.base import ContentFile
from django.urls import reverse
from importer.models import DataImportRow, DataImportSession
from InvenTree.unit_test import AdminTestCase, InvenTreeTestCase
from InvenTree.unit_test import AdminTestCase, InvenTreeAPITestCase, InvenTreeTestCase
class ImporterMixin:
"""Helpers for import tests."""
def helper_file(self):
def helper_file(self, fn: str) -> ContentFile:
"""Return test data."""
fn = os.path.join(os.path.dirname(__file__), 'test_data', 'companies.csv')
file_path = os.path.join(os.path.dirname(__file__), 'test_data', fn)
with open(fn, encoding='utf-8') as input_file:
with open(file_path, encoding='utf-8') as input_file:
data = input_file.read()
return data
def helper_content(self):
"""Return content file."""
return ContentFile(self.helper_file(), 'companies.csv')
return ContentFile(data, fn)
class ImporterTest(ImporterMixin, InvenTreeTestCase):
@@ -33,8 +31,10 @@ class ImporterTest(ImporterMixin, InvenTreeTestCase):
n = Company.objects.count()
data_file = self.helper_file('companies.csv')
session = DataImportSession.objects.create(
data_file=self.helper_content(), model_type='company'
data_file=data_file, model_type='company'
)
session.extract_columns()
@@ -74,13 +74,116 @@ class ImporterTest(ImporterMixin, InvenTreeTestCase):
"""Test default field values."""
class ImportAPITest(ImporterMixin, InvenTreeAPITestCase):
"""End-to-end tests for the importer API."""
def test_import(self):
"""Test full import process via the API."""
from part.models import PartCategory
N = PartCategory.objects.count()
url = reverse('api-importer-session-list')
# Load data file
data_file = self.helper_file('part_categories.csv')
data = self.post(
url,
{'model_type': 'partcategory', 'data_file': data_file},
format='multipart',
).data
self.assertFalse(data['update_records'])
self.assertEqual(data['model_type'], 'partcategory')
# No data has been imported yet
self.assertEqual(data['row_count'], 0)
self.assertEqual(data['completed_row_count'], 0)
field_names = data['available_fields'].keys()
for fn in ['name', 'default_location', 'description']:
self.assertIn(fn, field_names)
self.assertEqual(len(data['columns']), 14)
for col in ['Name', 'Parent Category', 'Path']:
self.assertIn(col, data['columns'])
session_id = data['pk']
# Accept the field mappings
url = reverse('api-import-session-accept-fields', kwargs={'pk': session_id})
# Initially the user does not have the right permissions
self.post(url, expected_code=403)
# Assign correct permission to user
self.assignRole('part_category.add')
self.post(url, expected_code=200)
session = self.get(
reverse('api-import-session-detail', kwargs={'pk': session_id})
).data
self.assertEqual(session['row_count'], 5)
self.assertEqual(session['completed_row_count'], 0)
# Fetch each row, and validate it
rows = self.get(
reverse('api-importer-row-list'), data={'session': session_id}
).data
self.assertEqual(len(rows), 5)
row_ids = []
for row in rows:
row_ids.append(row['pk'])
self.assertEqual(row['session'], session_id)
self.assertTrue(row['valid'])
self.assertFalse(row['complete'])
# Validate the rows
url = reverse('api-import-session-accept-rows', kwargs={'pk': session_id})
self.post(
url,
{
'rows': row_ids[1:] # Validate all but the first row
},
)
# Update session information
session = self.get(
reverse('api-import-session-detail', kwargs={'pk': session_id})
).data
self.assertEqual(session['row_count'], 5)
self.assertEqual(session['completed_row_count'], 4)
for idx, row in enumerate(row_ids):
detail = self.get(
reverse('api-importer-row-detail', kwargs={'pk': row})
).data
self.assertEqual(detail['session'], session_id)
self.assertEqual(detail['complete'], idx > 0)
# Check that there are new database records
self.assertEqual(PartCategory.objects.count(), N + 4)
class AdminTest(ImporterMixin, AdminTestCase):
"""Tests for the admin interface integration."""
def test_admin(self):
"""Test the admin URL."""
data_file = self.helper_file('companies.csv')
session = self.helper(
model=DataImportSession,
model_kwargs={'data_file': self.helper_content(), 'model_type': 'company'},
model_kwargs={'data_file': data_file, 'model_type': 'company'},
)
self.helper(model=DataImportRow, model_kwargs={'session_id': session.id})