Skip to content
Snippets Groups Projects
Commit a6ff3426 authored by Jonathan Weth's avatar Jonathan Weth :keyboard:
Browse files

Merge branch 'member-of-field' into 'master'

Stuff for Schild course imports

Closes #44

See merge request AlekSIS/official/AlekSIS-App-CSVImport!88
parents 2a0ae5d0 c5cd6cd6
Branches
Tags
No related merge requests found
......@@ -14,6 +14,8 @@ Added
* ZIP files with multiple CSVs and accompanying photos can now be imported
* Field types can now provide values for arbitrary alternative DB fields
* Virtual fields can be generated from literal fields using Django templates
* Fields for Group.parent_groups and Person.member_of
Fixed
~~~~~
......
......@@ -27,6 +27,7 @@ class FieldType:
db_field: str = ""
converter: Optional[Union[str, Sequence[str]]] = None
alternative_db_fields: Optional[str] = None
template: str = ""
args: Optional[dict] = None
@classmethod
......@@ -40,7 +41,7 @@ class FieldType:
converters_post = self.get_args().get("converter_post", [])
if isinstance(converters_post, str):
converters_post = [converters_post]
converters = self.converter
converters = self.get_args().get("converter") or self.converter
if converters is None:
converters = []
elif isinstance(converters, str):
......@@ -72,17 +73,55 @@ class FieldType:
def get_column_name(self) -> str:
"""Get column name for use in Pandas structures."""
if self.get_args().get("column_name"):
return self.get_args()["column_name"]
return self.column_name
def __init__(self, school_term: SchoolTerm, base_path: str):
def get_template(self) -> str:
if self.get_args().get("template"):
return self.get_args()["template"]
return self.template or ""
def __init__(self, school_term: SchoolTerm, base_path: str, **kwargs):
self.school_term = school_term
self.base_path = os.path.realpath(base_path)
self.column_name = f"col_{uuid4()}"
self.args = kwargs
class FieldTypeRegistry:
def __init__(self):
self.field_types = {}
def register(self, field_type: Type[FieldType]):
"""Add new `FieldType` to registry.
Can be used as decorator, too.
"""
if field_type.name in self.field_types:
raise ValueError(f"The field type {field_type.name} is already registered.")
self.field_types[field_type.name] = field_type
return field_type
def get_from_name(self, name: str) -> FieldType:
"""Get `FieldType` by its name."""
return self.field_types[name]
@property
def choices(self) -> Sequence[Tuple[str, str]]:
"""Return choices in Django format."""
return [(f.name, f.verbose_name) for f in self.field_types.values()]
field_type_registry = FieldTypeRegistry()
@field_type_registry.register
class MatchFieldType(FieldType):
"""Field type for getting an instance."""
name: str = "match"
priority: int = 1
def get_priority(self):
......@@ -117,34 +156,6 @@ class RegExFieldType(ProcessFieldType):
instance.save()
class FieldTypeRegistry:
def __init__(self):
self.field_types = {}
def register(self, field_type: Type[FieldType]):
"""Add new `FieldType` to registry.
Can be used as decorator, too.
"""
if field_type.name in self.field_types:
raise ValueError(f"The field type {field_type.name} is already registered.")
self.field_types[field_type.name] = field_type
return field_type
def get_from_name(self, name: str) -> FieldType:
"""Get `FieldType` by its name."""
return self.field_types[name]
@property
def choices(self) -> Sequence[Tuple[str, str]]:
"""Return choices in Django format."""
return [(f.name, f.verbose_name) for f in self.field_types.values()]
field_type_registry = FieldTypeRegistry()
@field_type_registry.register
class UniqueReferenceFieldType(MatchFieldType):
name = "unique_reference"
......@@ -345,12 +356,12 @@ class ClassRangeFieldType(ProcessFieldType):
name = "class_range"
verbose_name = _("Class range (e. g. 7a-d)")
def __init__(self, school_term: SchoolTerm, base_path: str):
def __init__(self, school_term: SchoolTerm, base_path: str, **kwargs):
# Prefetch class groups
self.classes_per_short_name = get_classes_per_short_name(school_term)
self.classes_per_grade = get_classes_per_grade(self.classes_per_short_name.keys())
super().__init__(school_term, base_path)
super().__init__(school_term, base_path, **kwargs)
def process(self, instance: Model, value):
classes = parse_class_range(
......@@ -415,6 +426,30 @@ class GroupMembershipByShortNameFieldType(ProcessFieldType):
pass
@field_type_registry.register
class ParentGroupByShortNameFieldType(ProcessFieldType):
name = "parent_group_short_name"
verbose_name = _("Short name of the group's parent group")
def process(self, instance: Model, value):
group, __ = Group.objects.get_or_create(
short_name=value, school_term=self.school_term, defaults={"name": value}
)
instance.parent_groups.add(group)
instance.save()
@field_type_registry.register
class MemberOfByNameFieldType(ProcessFieldType):
name = "member_of_by_name"
verbose_name = _("Name of a group the person is a member of")
def process(self, instance: Model, value):
group, __ = Group.objects.get_or_create(name=value, school_term=self.school_term)
instance.member_of.add(group)
instance.save()
@field_type_registry.register
class ChildByUniqueReference(ProcessFieldType):
name = "child_by_unique_reference"
......
# Generated by Django 3.2.11 on 2022-01-28 19:36
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('csv_import', '0006_from_default_field'),
]
operations = [
migrations.AddField(
model_name='importtemplatefield',
name='virtual_tmpl',
field=models.TextField(blank=True, null=True, verbose_name='Django template to generate virtual field from'),
),
migrations.AddField(
model_name='importtemplatefield',
name='virtual',
field=models.BooleanField(default=False, verbose_name='Virtual field'),
),
]
......@@ -76,7 +76,7 @@ class ImportTemplate(ExtensibleModel):
name: str,
verbose_name: str,
extra_args: dict,
fields: Sequence[Tuple[FieldType, Dict[str, Any]]],
fields: Sequence[Tuple[FieldType, Dict[str, Any], bool, str]],
):
"""Update or create an import template in database."""
ct = ContentType.objects.get_for_model(model)
......@@ -87,11 +87,16 @@ class ImportTemplate(ExtensibleModel):
i = 0
for i, field in enumerate(fields):
field_type, args = field
field_type, args, virtual, virtual_tmpl = field
ImportTemplateField.objects.update_or_create(
template=template,
index=i,
defaults={"field_type": field_type.name, "args": args},
defaults={
"field_type": field_type.name,
"args": args,
"virtual": virtual,
"virtual_tmpl": virtual_tmpl,
},
)
ImportTemplateField.objects.filter(template=template, index__gt=i).delete()
......@@ -108,14 +113,18 @@ class ImportTemplate(ExtensibleModel):
for field_definition in defs["fields"]:
if isinstance(field_definition, str):
field_type = field_type_registry.get_from_name(field_definition)
virtual = False
template = ""
args = {}
else:
field_type = field_type_registry.get_from_name(
field_definition.pop("field_type")
)
virtual = field_definition.pop("virtual", False)
template = field_definition.pop("template", "")
args = field_definition
fields.append((field_type, args))
fields.append((field_type, args, virtual, template))
extra_args = defs.get("extra_args", {})
extra_args["from_default"] = from_default
......@@ -157,10 +166,14 @@ class ImportTemplateField(ExtensibleModel):
)
args = models.JSONField(verbose_name=_("Optional arguments passed to field type"), default={})
virtual = models.BooleanField(verbose_name=_("Virtual field"), default=False, null=False)
virtual_tmpl = models.TextField(
verbose_name=_("Django template to generate virtual field from"), blank=True
)
@property
def field_type_class(self):
field_type = field_type_registry.get_from_name(self.field_type)
field_type.args = self.args
return field_type
def clean(self):
......
import pytest
from aleksis.apps.csv_import.util.import_helpers import bulk_get_or_create
from aleksis.core.models import Person
pytestmark = pytest.mark.django_db
def test_bulk_get_or_create_person():
short_names = ["FOO", "BAR", "BAZ"]
# with pytest.raises(Ex)
r = bulk_get_or_create(Person, short_names, "short_name")
assert sorted([x.short_name for x in r]) == sorted(short_names)
r = bulk_get_or_create(Person, short_names, "short_name")
assert sorted([x.short_name for x in r]) == sorted(short_names)
def test_bulk_get_or_create_person_default_attrs():
short_names = ["FOO", "BAR", "BAZ"]
r = bulk_get_or_create(Person, short_names, "short_name", default_attrs="last_name")
for person in r:
assert person.short_name in short_names
assert person.short_name == person.last_name
def test_bulk_get_or_create_person_default_attrs_list():
short_names = ["FOO", "BAR", "BAZ"]
r = bulk_get_or_create(
Person, short_names, "short_name", default_attrs=["first_name", "last_name"]
)
for person in r:
assert person.short_name == person.last_name
assert person.short_name == person.first_name
def test_bulk_get_or_create_person_defaults():
short_names = ["FOO", "BAR", "BAZ"]
defaults = {"first_name": "foo", "last_name": "bar"}
r = bulk_get_or_create(Person, short_names, "short_name", defaults=defaults)
for person in r:
assert person.first_name == "foo"
assert person.last_name == "bar"
def test_bulk_get_or_create_person_defaults_default_attrs():
short_names = ["FOO", "BAR", "BAZ"]
defaults = {"last_name": "foo"}
r = bulk_get_or_create(
Person,
short_names,
"short_name",
default_attrs=["first_name"],
defaults=defaults,
)
for person in r:
assert person.short_name == person.first_name
assert person.last_name == "foo"
......@@ -8,6 +8,7 @@ from zipfile import ZipFile
from django.contrib import messages
from django.core.exceptions import ValidationError
from django.db import transaction
from django.template import Context, Template
from django.utils.translation import gettext as _
import chardet
......@@ -46,16 +47,22 @@ def import_csv(
data_types = {}
converters = {}
field_types = {}
virtual_fields = []
for field in template.fields.all():
# Get field type and prepare for import
field_type = field.field_type_class(school_term, temp_dir)
field_type = field.field_type_class(school_term, temp_dir, **field.args)
column_name = field_type.get_column_name()
# Get data type and conversion rules, if any,
# to be passed to Pandas
data_types[column_name] = field_type.get_data_type()
if field_type.get_converter():
converters[column_name] = field_type.get_converter()
if not field.virtual:
# Get data type and conversion rules, if any,
# to be passed to Pandas
data_types[column_name] = field_type.get_data_type()
if field_type.get_converter():
converters[column_name] = field_type.get_converter()
else:
# Mark field as virtual so as to not handle with Pandas
virtual_fields.append(column_name)
field_type.template = field.virtual_tmpl
field_types[column_name] = field_type
......@@ -123,6 +130,21 @@ def import_csv(
iterator = recorder.iterate(data_as_dict) if recorder else tqdm(data_as_dict)
for row in iterator:
# Generate virtual field data
for column_name in virtual_fields:
field_type = field_types[column_name]
# Generate field using a Django template string, and the row as context
tmpl = Template(field_type.get_template())
ctx = Context(row)
data = tmpl.render(ctx).strip()
# Post-process field using converter
data = field_type.get_converter()(data)
# Store
row[column_name] = data
# Build dict with all fields that should be directly updated
update_dict = {}
for key, value in row.items():
......
......@@ -72,7 +72,7 @@ class ImportTemplateUploadView(PermissionRequiredMixin, FormView):
ImportTemplate.update_or_create_templates(template_defs)
except Exception as e:
messages.error(
self.request, _("The import of the import templates failed: \n {e}").format(e)
self.request, _("The import of the import templates failed: \n {}").format(e)
)
return self.form_invalid(form)
messages.success(self.request, _("The import of the import templates was successful."))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment