diff --git a/greg/management/commands/import_guests.py b/greg/management/commands/import_guests.py new file mode 100644 index 0000000000000000000000000000000000000000..fc766f0d385e341fc3804a55a457152108621cfd --- /dev/null +++ b/greg/management/commands/import_guests.py @@ -0,0 +1,265 @@ +""" +Import guestsdata from json. +""" + +import datetime +import json + +from typing import Optional + +import structlog + +# from django.conf import settings +from django.core.management.base import BaseCommand, CommandParser +from django.utils.timezone import make_aware + +from greg.models import ( + Identity, + OuIdentifier, + OrganizationalUnit, + Person, + Role, + RoleType, + Sponsor, + SponsorOrganizationalUnit, +) + +logger = structlog.getLogger(__name__) + + +class Command(BaseCommand): + """ + Import guest data from json. + """ + + help = __doc__ + + ID_TYPES = [ + Identity.IdentityType.FEIDE_ID, + Identity.IdentityType.NORWEGIAN_NATIONAL_ID_NUMBER, + Identity.IdentityType.PASSPORT_NUMBER, + ] + + REQUIRED_IDS = [ + Identity.IdentityType.NORWEGIAN_NATIONAL_ID_NUMBER, + Identity.IdentityType.PASSPORT_NUMBER, + ] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._orgunit_id_type = "legacy_stedkode" + self._end_date = datetime.datetime.today() + datetime.timedelta(days=100) + + def add_arguments(self, parser: CommandParser) -> None: + parser.add_argument( + "file", + help="Path of import json file", + ) + parser.add_argument( + "--orgunit_type", + default="legacy_stedkode", + help="Name of orgunit id type. Default: legacy_stedkode", + ) + parser.add_argument( + "--end_date", + default=datetime.datetime.today() + datetime.timedelta(days=100), + type=datetime.date.fromisoformat, + help="End date of roles. Default: today + 100 days", + ) + + def _find_person_from_ids(self, ids: dict) -> Optional[Person]: + """Match IDs to find person.""" + for id_type in self.ID_TYPES: + matching_ids = [x for x in ids if x["id_type"] == id_type] + for matching_id in matching_ids: + try: + greg_id = Identity.objects.get( + type=id_type, + value=matching_id["external_id"], + source=matching_id["source_system"], + ) + except Identity.DoesNotExist: + continue + else: + return greg_id.person + return None + + def _find_orgunit_from_external_id( + self, external_id: str + ) -> Optional[OrganizationalUnit]: + """Find orgunit form an external id""" + try: + ou_id = OuIdentifier.objects.get( + name=self._orgunit_id_type, + value=external_id, + ) + except OuIdentifier.DoesNotExist: + return None + return ou_id.orgunit + + def _find_orgunit_sponsor(self, orgunit: OrganizationalUnit) -> Optional[Sponsor]: + """ + Find a sponsor associated with a unit. + + If a unit is missing a sponsor, we try the parrent unit. + """ + iterations = 0 + current_orgunit = orgunit + sponsor_ou_link = None + while iterations < 15: + sponsor_ou_links = SponsorOrganizationalUnit.objects.filter( + organizational_unit=current_orgunit + ) + + if sponsor_ou_links: + # If multiple sponsors exists, we use the first one. + sponsor_ou_link = sponsor_ou_links[0] + break + + logger.info("orgunit_missing_sponsor", orgunit=orgunit) + if current_orgunit.parent: + current_orgunit = current_orgunit.parent + else: + logger.info("orgunit_sponsor_not_found", orgunit=orgunit) + return None + + return sponsor_ou_link.sponsor if sponsor_ou_link else None + + def _find_role_type_form_slug(self, slug: str) -> Optional[RoleType]: + """ + Find a role unit from a slug. + + The role types used need to defined in the database for import. + """ + try: + role_type = RoleType.objects.get( + identifier=slug, + ) + except RoleType.DoesNotExist: + return None + return role_type + + def upsert_role(self, person: Person, role_data: dict) -> Optional[Role]: + """Add or update a role""" + orgunit = self._find_orgunit_from_external_id(role_data["orgunit"]) + if not orgunit: + logger.error( + "orgunit_missing", person=person, orgunit_id=role_data["orgunit"] + ) + return None + + role_type = self._find_role_type_form_slug(role_data["role_type"]) + if not role_type: + logger.error( + "role_type_missing", + person=person, + role_type_slug=role_data["role_type"], + ) + return None + + try: + role = Role.objects.get( + orgunit=orgunit, + person=person, + type=role_type, + ) + role.comments = role_data["comment"] + role.end_date = self._end_date + role.save() + logger.info("updating_existing_role", role=role, sponsor=role.sponsor) + except Role.DoesNotExist: + sponsor = self._find_orgunit_sponsor(orgunit) + + if not sponsor: + logger.error("orgunit_missing_sponsor", person=person, orgunit=orgunit) + return None + + role = Role.objects.create( + comments=role_data["comment"], + end_date=self._end_date, + orgunit=orgunit, + person=person, + sponsor=sponsor, + start_date=role_data["start_date"], + type=role_type, + ) + logger.info("role_created", role=role, person=person, sponsor=sponsor) + return role + + def upsert_identity(self, person: Person, id_data: dict) -> Identity: + """Add or update identity""" + try: + identity = Identity.objects.get( + person=person, + type=id_data["id_type"], + value=id_data["external_id"], + source=id_data["source_system"], + verified="automatic", + ) + except Identity.DoesNotExist: + identity = Identity.objects.create( + person=person, + type=id_data["id_type"], + value=id_data["external_id"], + source=id_data["source_system"], + verified="automatic", + verified_at=make_aware(datetime.datetime.now()), + ) + logger.info( + "identity_added", identity=identity.id, identity_type=identity.type + ) + + return identity + + def _has_required_id(self, id_data: dict) -> bool: + """Check that we have at least one of the required ids""" + matching_ids = [x for x in id_data if x["id_type"] in self.REQUIRED_IDS] + return len(matching_ids) > 0 + + def upsert_person(self, external_id: str, data: dict) -> Optional[Person]: + """Add or update person""" + + if not self._has_required_id(data["ids"]): + logger.error("missing_required_id", external_id=external_id) + return None + + person = self._find_person_from_ids(data["ids"]) + if person: + # Update data + person.first_name = data["first_name"] + person.last_name = data["last_name"] + person.date_of_birth = data["date_of_birth"] + person.gender = data["gender"] + person.registration_completed_date = datetime.date.today() + person.save() + logger.info("person_updated", person=person) + else: + # Create new person + person = Person.objects.create( + first_name=data["first_name"], + last_name=data["last_name"], + date_of_birth=data["date_of_birth"], + gender=data["gender"], + registration_completed_date=datetime.date.today(), + ) + logger.info("person_created", person=person) + + for identity in data["ids"]: + self.upsert_identity(person, identity) + for role in data["role"]: + self.upsert_role(person, role) + return person + + def handle(self, *args, **options): + """Import of Sponsors from Cerebrum.""" + active_persons = Person.objects.all() + logger.info("import_start", nr_of_persons=len(active_persons)) + + self._orgunit_id_type = options["orgunit_type"] + self._end_date = options["end_date"] + + with open(options["file"], "r", encoding="UTF-8") as fp: + persons = json.load(fp) + + for external_id, person in persons.items(): + self.upsert_person(external_id, person)