mirror of
https://github.com/inventree/InvenTree.git
synced 2025-05-01 21:16:46 +00:00
Merge pull request #1267 from SchrodingersGat/migration-fixes
Migration fixes
This commit is contained in:
commit
79ddea50f5
@ -4,7 +4,6 @@ import os
|
|||||||
from rapidfuzz import fuzz
|
from rapidfuzz import fuzz
|
||||||
|
|
||||||
from django.db import migrations, connection
|
from django.db import migrations, connection
|
||||||
from company.models import Company, SupplierPart
|
|
||||||
from django.db.utils import OperationalError, ProgrammingError
|
from django.db.utils import OperationalError, ProgrammingError
|
||||||
|
|
||||||
|
|
||||||
@ -21,25 +20,29 @@ def reverse_association(apps, schema_editor):
|
|||||||
into the 'manufacturer_name' field.
|
into the 'manufacturer_name' field.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
response = cursor.execute('select id, "MPN" from part_supplierpart;')
|
||||||
|
supplier_parts = cursor.fetchall()
|
||||||
|
|
||||||
# Exit if there are no SupplierPart objects
|
# Exit if there are no SupplierPart objects
|
||||||
# This crucial otherwise the unit test suite fails!
|
# This crucial otherwise the unit test suite fails!
|
||||||
if SupplierPart.objects.count() == 0:
|
if len(supplier_parts) == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
print("Reversing migration for manufacturer association")
|
print("Reversing migration for manufacturer association")
|
||||||
|
|
||||||
for part in SupplierPart.objects.all():
|
for (index, row) in enumerate(supplier_parts):
|
||||||
|
supplier_part_id, MPN = row
|
||||||
|
|
||||||
print("Checking part [{pk}]:".format(pk=part.pk))
|
print(f"Checking SupplierPart [{supplier_part_id}]:")
|
||||||
|
|
||||||
cursor = connection.cursor()
|
|
||||||
|
|
||||||
# Grab the manufacturer ID from the part
|
# Grab the manufacturer ID from the part
|
||||||
response = cursor.execute('SELECT manufacturer_id FROM part_supplierpart WHERE id={ID};'.format(ID=part.id))
|
response = cursor.execute(f"SELECT manufacturer_id FROM part_supplierpart WHERE id={supplier_part_id};")
|
||||||
|
|
||||||
manufacturer_id = None
|
manufacturer_id = None
|
||||||
|
|
||||||
row = response.fetchone()
|
row = cursor.fetchone()
|
||||||
|
|
||||||
if len(row) > 0:
|
if len(row) > 0:
|
||||||
try:
|
try:
|
||||||
@ -54,15 +57,15 @@ def reverse_association(apps, schema_editor):
|
|||||||
print(" - Manufacturer ID: [{id}]".format(id=manufacturer_id))
|
print(" - Manufacturer ID: [{id}]".format(id=manufacturer_id))
|
||||||
|
|
||||||
# Now extract the "name" for the manufacturer
|
# Now extract the "name" for the manufacturer
|
||||||
response = cursor.execute('SELECT name from company_company where id={ID};'.format(ID=manufacturer_id))
|
response = cursor.execute(f"SELECT name from company_company where id={manufacturer_id};")
|
||||||
|
|
||||||
row = response.fetchone()
|
row = cursor.fetchone()
|
||||||
|
|
||||||
name = row[0]
|
name = row[0]
|
||||||
|
|
||||||
print(" - Manufacturer name: '{name}'".format(name=name))
|
print(" - Manufacturer name: '{name}'".format(name=name))
|
||||||
|
|
||||||
response = cursor.execute("UPDATE part_supplierpart SET manufacturer_name='{name}' WHERE id={ID};".format(name=name, ID=part.id))
|
response = cursor.execute("UPDATE part_supplierpart SET manufacturer_name='{name}' WHERE id={ID};".format(name=name, ID=supplier_part_id))
|
||||||
|
|
||||||
def associate_manufacturers(apps, schema_editor):
|
def associate_manufacturers(apps, schema_editor):
|
||||||
"""
|
"""
|
||||||
@ -100,10 +103,14 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
return row[0]
|
return row[0]
|
||||||
return ''
|
return ''
|
||||||
|
|
||||||
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
response = cursor.execute(f'select id, "MPN" from part_supplierpart;')
|
||||||
|
supplier_parts = cursor.fetchall()
|
||||||
|
|
||||||
# Exit if there are no SupplierPart objects
|
# Exit if there are no SupplierPart objects
|
||||||
# This crucial otherwise the unit test suite fails!
|
# This crucial otherwise the unit test suite fails!
|
||||||
if SupplierPart.objects.count() == 0:
|
if len(supplier_parts) == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Link a 'manufacturer_name' to a 'Company'
|
# Link a 'manufacturer_name' to a 'Company'
|
||||||
@ -112,50 +119,66 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
# Map company names to company objects
|
# Map company names to company objects
|
||||||
companies = {}
|
companies = {}
|
||||||
|
|
||||||
for company in Company.objects.all():
|
# Iterate through each company object
|
||||||
companies[company.name] = company
|
response = cursor.execute("select id, name from company_company;")
|
||||||
|
results = cursor.fetchall()
|
||||||
|
|
||||||
def link_part(part, name):
|
for index, row in enumerate(results):
|
||||||
|
pk, name = row
|
||||||
|
|
||||||
|
companies[name] = pk
|
||||||
|
|
||||||
|
def link_part(part_id, name):
|
||||||
""" Attempt to link Part to an existing Company """
|
""" Attempt to link Part to an existing Company """
|
||||||
|
|
||||||
# Matches a company name directly
|
# Matches a company name directly
|
||||||
if name in companies.keys():
|
if name in companies.keys():
|
||||||
print(" - Part[{pk}]: '{n}' maps to existing manufacturer".format(pk=part.pk, n=name))
|
print(" - Part[{pk}]: '{n}' maps to existing manufacturer".format(pk=part_id, n=name))
|
||||||
part.manufacturer = companies[name]
|
|
||||||
part.save()
|
manufacturer_id = companies[name]
|
||||||
|
|
||||||
|
query = f"update part_supplierpart set manufacturer_id={manufacturer_id} where id={part_id};"
|
||||||
|
result = cursor.execute(query)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Have we already mapped this
|
# Have we already mapped this
|
||||||
if name in links.keys():
|
if name in links.keys():
|
||||||
print(" - Part[{pk}]: Mapped '{n}' - '{c}'".format(pk=part.pk, n=name, c=links[name].name))
|
print(" - Part[{pk}]: Mapped '{n}' - '{c}'".format(pk=part_id, n=name, c=links[name].name))
|
||||||
part.manufacturer = links[name]
|
|
||||||
part.save()
|
query = f"update part_supplierpart set manufacturer_id={manufacturer_id} where id={part_id};"
|
||||||
|
result = query.execute()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Mapping not possible
|
# Mapping not possible
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def create_manufacturer(part, input_name, company_name):
|
def create_manufacturer(part_id, input_name, company_name):
|
||||||
""" Create a new manufacturer """
|
""" Create a new manufacturer """
|
||||||
|
|
||||||
company = Company(name=company_name, description=company_name, is_manufacturer=True)
|
# Manually create a new database row
|
||||||
|
# Note: Have to fill out all empty string values!
|
||||||
|
new_manufacturer_query = f"insert into company_company ('name', 'description', 'is_customer', 'is_supplier', 'is_manufacturer', 'address', 'website', 'phone', 'email', 'contact', 'link', 'notes') values ('{company_name}', '{company_name}', false, false, true, '', '', '', '', '', '', '');"
|
||||||
|
|
||||||
company.is_manufacturer = True
|
cursor = connection.cursor()
|
||||||
|
|
||||||
# Save the company BEFORE we associate the part, otherwise the PK does not exist
|
cursor.execute(new_manufacturer_query)
|
||||||
company.save()
|
|
||||||
|
# Extract the company back from the database
|
||||||
|
response = cursor.execute(f"select id from company_company where name='{company_name}';")
|
||||||
|
row = cursor.fetchone()
|
||||||
|
manufacturer_id = int(row[0])
|
||||||
|
|
||||||
# Map both names to the same company
|
# Map both names to the same company
|
||||||
links[input_name] = company
|
links[input_name] = manufacturer_id
|
||||||
links[company_name] = company
|
links[company_name] = manufacturer_id
|
||||||
|
|
||||||
companies[company_name] = company
|
companies[company_name] = manufacturer_id
|
||||||
|
|
||||||
print(" - Part[{pk}]: Created new manufacturer: '{name}'".format(pk=part.pk, name=company_name))
|
print(" - Part[{pk}]: Created new manufacturer: '{name}'".format(pk=part_id, name=company_name))
|
||||||
|
|
||||||
# Save the manufacturer reference link
|
# Update SupplierPart object in the database
|
||||||
part.manufacturer = company
|
cursor.execute(f"update part_supplierpart set manufacturer_id={manufacturer_id} where id={part_id};")
|
||||||
part.save()
|
|
||||||
|
|
||||||
def find_matches(text, threshold=65):
|
def find_matches(text, threshold=65):
|
||||||
"""
|
"""
|
||||||
@ -178,17 +201,19 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
def map_part_to_manufacturer(part, idx, total):
|
def map_part_to_manufacturer(part_id, idx, total):
|
||||||
|
|
||||||
name = get_manufacturer_name(part.id)
|
cursor = connection.cursor()
|
||||||
|
|
||||||
|
name = get_manufacturer_name(part_id)
|
||||||
|
|
||||||
# Skip empty names
|
# Skip empty names
|
||||||
if not name or len(name) == 0:
|
if not name or len(name) == 0:
|
||||||
print(" - Part[{pk}]: No manufacturer_name provided, skipping".format(pk=part.pk))
|
print(" - Part[{pk}]: No manufacturer_name provided, skipping".format(pk=part_id))
|
||||||
return
|
return
|
||||||
|
|
||||||
# Can be linked to an existing manufacturer
|
# Can be linked to an existing manufacturer
|
||||||
if link_part(part, name):
|
if link_part(part_id, name):
|
||||||
return
|
return
|
||||||
|
|
||||||
# Find a list of potential matches
|
# Find a list of potential matches
|
||||||
@ -198,7 +223,7 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
|
|
||||||
# Present a list of options
|
# Present a list of options
|
||||||
print("----------------------------------")
|
print("----------------------------------")
|
||||||
print("Checking part [{pk}] ({idx} of {total})".format(pk=part.pk, idx=idx+1, total=total))
|
print("Checking part [{pk}] ({idx} of {total})".format(pk=part_id, idx=idx+1, total=total))
|
||||||
print("Manufacturer name: '{n}'".format(n=name))
|
print("Manufacturer name: '{n}'".format(n=name))
|
||||||
print("----------------------------------")
|
print("----------------------------------")
|
||||||
print("Select an option from the list below:")
|
print("Select an option from the list below:")
|
||||||
@ -222,7 +247,7 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
# Option 0) is to create a new manufacturer with the current name
|
# Option 0) is to create a new manufacturer with the current name
|
||||||
if n == 0:
|
if n == 0:
|
||||||
|
|
||||||
create_manufacturer(part, name, name)
|
create_manufacturer(part_id, name, name)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Options 1) - n) select an existing manufacturer
|
# Options 1) - n) select an existing manufacturer
|
||||||
@ -232,21 +257,19 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
if n < len(matches):
|
if n < len(matches):
|
||||||
# Get the company which matches the selected options
|
# Get the company which matches the selected options
|
||||||
company_name = matches[n]
|
company_name = matches[n]
|
||||||
company = companies[company_name]
|
company_id = companies[company_name]
|
||||||
|
|
||||||
# Ensure the company is designated as a manufacturer
|
# Ensure the company is designated as a manufacturer
|
||||||
company.is_manufacturer = True
|
cursor.execute(f"update company_company set is_manufacturer=true where id={company_id};")
|
||||||
company.save()
|
|
||||||
|
|
||||||
# Link the company to the part
|
# Link the company to the part
|
||||||
part.manufacturer = company
|
cursor.execute(f"update part_supplierpart set manufacturer_id={company_id} where id={part_id};")
|
||||||
part.save()
|
|
||||||
|
|
||||||
# Link the name to the company
|
# Link the name to the company
|
||||||
links[name] = company
|
links[name] = company_id
|
||||||
links[company_name] = company
|
links[company_name] = company_id
|
||||||
|
|
||||||
print(" - Part[{pk}]: Linked '{n}' to manufacturer '{m}'".format(pk=part.pk, n=name, m=company_name))
|
print(" - Part[{pk}]: Linked '{n}' to manufacturer '{m}'".format(pk=part_id, n=name, m=company_name))
|
||||||
|
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
@ -270,7 +293,7 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
|
|
||||||
# No match, create a new manufacturer
|
# No match, create a new manufacturer
|
||||||
else:
|
else:
|
||||||
create_manufacturer(part, name, response)
|
create_manufacturer(part_id, name, response)
|
||||||
return
|
return
|
||||||
|
|
||||||
clear()
|
clear()
|
||||||
@ -292,16 +315,22 @@ def associate_manufacturers(apps, schema_editor):
|
|||||||
|
|
||||||
clear()
|
clear()
|
||||||
|
|
||||||
part_count = SupplierPart.objects.count()
|
# Extract all SupplierPart objects from the database
|
||||||
|
cursor = connection.cursor()
|
||||||
|
response = cursor.execute('select id, "MPN", "SKU", manufacturer_id, manufacturer_name from part_supplierpart;')
|
||||||
|
results = cursor.fetchall()
|
||||||
|
|
||||||
|
part_count = len(results)
|
||||||
|
|
||||||
# Create a unique set of manufacturer names
|
# Create a unique set of manufacturer names
|
||||||
for idx, part in enumerate(SupplierPart.objects.all()):
|
for index, row in enumerate(results):
|
||||||
|
pk, MPN, SKU, manufacturer_id, manufacturer_name = row
|
||||||
|
|
||||||
if part.manufacturer is not None:
|
if manufacturer_id is not None:
|
||||||
print(" - Part '{p}' already has a manufacturer associated (skipping)".format(p=part))
|
print(f" - SupplierPart <{pk}> already has a manufacturer associated (skipping)")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
map_part_to_manufacturer(part, idx, part_count)
|
map_part_to_manufacturer(pk, index, part_count)
|
||||||
|
|
||||||
print("Done!")
|
print("Done!")
|
||||||
|
|
||||||
|
@ -1,6 +1,14 @@
|
|||||||
from django.db import migrations, models
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
def reverse_empty_email(apps, schema_editor):
|
||||||
|
Company = apps.get_model('company', 'Company')
|
||||||
|
for company in Company.objects.all():
|
||||||
|
if company.email == None:
|
||||||
|
company.email = ''
|
||||||
|
company.save()
|
||||||
|
|
||||||
|
|
||||||
def make_empty_email_field_null(apps, schema_editor):
|
def make_empty_email_field_null(apps, schema_editor):
|
||||||
Company = apps.get_model('company', 'Company')
|
Company = apps.get_model('company', 'Company')
|
||||||
for company in Company.objects.all():
|
for company in Company.objects.all():
|
||||||
@ -23,7 +31,7 @@ class Migration(migrations.Migration):
|
|||||||
field=models.EmailField(blank=True, help_text='Contact email address', max_length=254, null=True, unique=False, verbose_name='Email'),
|
field=models.EmailField(blank=True, help_text='Contact email address', max_length=254, null=True, unique=False, verbose_name='Email'),
|
||||||
),
|
),
|
||||||
# Convert empty email string to NULL
|
# Convert empty email string to NULL
|
||||||
migrations.RunPython(make_empty_email_field_null),
|
migrations.RunPython(make_empty_email_field_null, reverse_code=reverse_empty_email),
|
||||||
# Remove unique constraint on name field
|
# Remove unique constraint on name field
|
||||||
migrations.AlterField(
|
migrations.AlterField(
|
||||||
model_name='company',
|
model_name='company',
|
||||||
|
@ -106,7 +106,7 @@ def reverse_currencies(apps, schema_editor):
|
|||||||
# For each currency code in use, check if we have a matching Currency object
|
# For each currency code in use, check if we have a matching Currency object
|
||||||
for code in codes_in_use:
|
for code in codes_in_use:
|
||||||
response = cursor.execute(f"SELECT id, suffix from common_currency where suffix='{code}';")
|
response = cursor.execute(f"SELECT id, suffix from common_currency where suffix='{code}';")
|
||||||
row = response.fetchone()
|
row = cursor.fetchone()
|
||||||
|
|
||||||
if row is not None:
|
if row is not None:
|
||||||
# A match exists!
|
# A match exists!
|
||||||
|
@ -106,7 +106,7 @@ def reverse_currencies(apps, schema_editor):
|
|||||||
# For each currency code in use, check if we have a matching Currency object
|
# For each currency code in use, check if we have a matching Currency object
|
||||||
for code in codes_in_use:
|
for code in codes_in_use:
|
||||||
response = cursor.execute(f"SELECT id, suffix from common_currency where suffix='{code}';")
|
response = cursor.execute(f"SELECT id, suffix from common_currency where suffix='{code}';")
|
||||||
row = response.fetchone()
|
row = cursor.fetchone()
|
||||||
|
|
||||||
if row is not None:
|
if row is not None:
|
||||||
# A match exists!
|
# A match exists!
|
||||||
|
Loading…
x
Reference in New Issue
Block a user