From 967c18942e0673b8d1b2d181fc0a62d7c9f013e0 Mon Sep 17 00:00:00 2001 From: Andreas Ellewsen <andreas@ellewsen.no> Date: Tue, 26 Apr 2022 11:24:42 +0200 Subject: [PATCH] Make import_guests command more lenient on source The database constraint on the Identity model demands only one value per id type for each person. This means that a person with a pass number already in greg from a different source than the one we are trying to import will trigger an integrity error when we try to create it. The logic now instead checks for identities with other sources as well, and if a match is found from another source system, the source is updated to the one in the data we are importing. This will also mean that a previously entered manual entry, will be converted to one marked as automatic. --- greg/management/commands/import_guests.py | 55 ++++++++++++++--------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/greg/management/commands/import_guests.py b/greg/management/commands/import_guests.py index fc766f0d..3af001a5 100644 --- a/greg/management/commands/import_guests.py +++ b/greg/management/commands/import_guests.py @@ -68,20 +68,32 @@ class Command(BaseCommand): ) def _find_person_from_ids(self, ids: dict) -> Optional[Person]: - """Match IDs to find person.""" + """ + Match IDs to find person. + + Tries to find a perfect match, returns matches for identifiers + of same type with same value from different source if present. + """ for id_type in self.ID_TYPES: matching_ids = [x for x in ids if x["id_type"] == id_type] for matching_id in matching_ids: - try: - greg_id = Identity.objects.get( - type=id_type, - value=matching_id["external_id"], - source=matching_id["source_system"], - ) - except Identity.DoesNotExist: + # Check without correct source + matches = Identity.objects.filter( + type=id_type, + value=matching_id["external_id"], + ) + if not matches: + # No match, check next id continue - else: - return greg_id.person + # Found one or more matches, check if a perfect match exists + perfect = matches.filter( + source=matching_id["source_system"], + ).first() + if perfect: + # Perfect match! Return it + return perfect.person + # No perfect match, return first match from other source + return matches.first().person # type: ignore return None def _find_orgunit_from_external_id( @@ -188,15 +200,12 @@ class Command(BaseCommand): def upsert_identity(self, person: Person, id_data: dict) -> Identity: """Add or update identity""" - try: - identity = Identity.objects.get( - person=person, - type=id_data["id_type"], - value=id_data["external_id"], - source=id_data["source_system"], - verified="automatic", - ) - except Identity.DoesNotExist: + match = Identity.objects.filter( + person=person, + type=id_data["id_type"], + value=id_data["external_id"], + ).first() + if not match: identity = Identity.objects.create( person=person, type=id_data["id_type"], @@ -208,7 +217,13 @@ class Command(BaseCommand): logger.info( "identity_added", identity=identity.id, identity_type=identity.type ) - + else: + identity = match + if not identity.verified_at: + identity.source = id_data["source_system"] + identity.verified = "automatic" + identity.verified_at = make_aware(datetime.datetime.now()) + identity.save() return identity def _has_required_id(self, id_data: dict) -> bool: -- GitLab