Skip to content
Snippets Groups Projects
Commit a2dca83b authored by Sivert Kronen Hatteberg's avatar Sivert Kronen Hatteberg
Browse files

Merge branch 'GREG-153-guest-import' into 'master'

Add admin command for importing guest data from file

See merge request !241
parents ec24555f b0940a91
No related branches found
No related tags found
1 merge request!241Add admin command for importing guest data from file
Pipeline #109875 passed
"""
Import guestsdata from json.
"""
import datetime
import json
from typing import Optional
import structlog
# from django.conf import settings
from django.core.management.base import BaseCommand, CommandParser
from django.utils.timezone import make_aware
from greg.models import (
Identity,
OuIdentifier,
OrganizationalUnit,
Person,
Role,
RoleType,
Sponsor,
SponsorOrganizationalUnit,
)
logger = structlog.getLogger(__name__)
class Command(BaseCommand):
"""
Import guest data from json.
"""
help = __doc__
ID_TYPES = [
Identity.IdentityType.FEIDE_ID,
Identity.IdentityType.NORWEGIAN_NATIONAL_ID_NUMBER,
Identity.IdentityType.PASSPORT_NUMBER,
]
REQUIRED_IDS = [
Identity.IdentityType.NORWEGIAN_NATIONAL_ID_NUMBER,
Identity.IdentityType.PASSPORT_NUMBER,
]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._orgunit_id_type = "legacy_stedkode"
self._end_date = datetime.datetime.today() + datetime.timedelta(days=100)
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"file",
help="Path of import json file",
)
parser.add_argument(
"--orgunit_type",
default="legacy_stedkode",
help="Name of orgunit id type. Default: legacy_stedkode",
)
parser.add_argument(
"--end_date",
default=datetime.datetime.today() + datetime.timedelta(days=100),
type=datetime.date.fromisoformat,
help="End date of roles. Default: today + 100 days",
)
def _find_person_from_ids(self, ids: dict) -> Optional[Person]:
"""Match IDs to find person."""
for id_type in self.ID_TYPES:
matching_ids = [x for x in ids if x["id_type"] == id_type]
for matching_id in matching_ids:
try:
greg_id = Identity.objects.get(
type=id_type,
value=matching_id["external_id"],
source=matching_id["source_system"],
)
except Identity.DoesNotExist:
continue
else:
return greg_id.person
return None
def _find_orgunit_from_external_id(
self, external_id: str
) -> Optional[OrganizationalUnit]:
"""Find orgunit form an external id"""
try:
ou_id = OuIdentifier.objects.get(
name=self._orgunit_id_type,
value=external_id,
)
except OuIdentifier.DoesNotExist:
return None
return ou_id.orgunit
def _find_orgunit_sponsor(self, orgunit: OrganizationalUnit) -> Optional[Sponsor]:
"""
Find a sponsor associated with a unit.
If a unit is missing a sponsor, we try the parrent unit.
"""
iterations = 0
current_orgunit = orgunit
sponsor_ou_link = None
while iterations < 15:
sponsor_ou_links = SponsorOrganizationalUnit.objects.filter(
organizational_unit=current_orgunit
)
if sponsor_ou_links:
# If multiple sponsors exists, we use the first one.
sponsor_ou_link = sponsor_ou_links[0]
break
logger.info("orgunit_missing_sponsor", orgunit=orgunit)
if current_orgunit.parent:
current_orgunit = current_orgunit.parent
else:
logger.info("orgunit_sponsor_not_found", orgunit=orgunit)
return None
return sponsor_ou_link.sponsor if sponsor_ou_link else None
def _find_role_type_form_slug(self, slug: str) -> Optional[RoleType]:
"""
Find a role unit from a slug.
The role types used need to defined in the database for import.
"""
try:
role_type = RoleType.objects.get(
identifier=slug,
)
except RoleType.DoesNotExist:
return None
return role_type
def upsert_role(self, person: Person, role_data: dict) -> Optional[Role]:
"""Add or update a role"""
orgunit = self._find_orgunit_from_external_id(role_data["orgunit"])
if not orgunit:
logger.error(
"orgunit_missing", person=person, orgunit_id=role_data["orgunit"]
)
return None
role_type = self._find_role_type_form_slug(role_data["role_type"])
if not role_type:
logger.error(
"role_type_missing",
person=person,
role_type_slug=role_data["role_type"],
)
return None
try:
role = Role.objects.get(
orgunit=orgunit,
person=person,
type=role_type,
)
role.comments = role_data["comment"]
role.end_date = self._end_date
role.save()
logger.info("updating_existing_role", role=role, sponsor=role.sponsor)
except Role.DoesNotExist:
sponsor = self._find_orgunit_sponsor(orgunit)
if not sponsor:
logger.error("orgunit_missing_sponsor", person=person, orgunit=orgunit)
return None
role = Role.objects.create(
comments=role_data["comment"],
end_date=self._end_date,
orgunit=orgunit,
person=person,
sponsor=sponsor,
start_date=role_data["start_date"],
type=role_type,
)
logger.info("role_created", role=role, person=person, sponsor=sponsor)
return role
def upsert_identity(self, person: Person, id_data: dict) -> Identity:
"""Add or update identity"""
try:
identity = Identity.objects.get(
person=person,
type=id_data["id_type"],
value=id_data["external_id"],
source=id_data["source_system"],
verified="automatic",
)
except Identity.DoesNotExist:
identity = Identity.objects.create(
person=person,
type=id_data["id_type"],
value=id_data["external_id"],
source=id_data["source_system"],
verified="automatic",
verified_at=make_aware(datetime.datetime.now()),
)
logger.info(
"identity_added", identity=identity.id, identity_type=identity.type
)
return identity
def _has_required_id(self, id_data: dict) -> bool:
"""Check that we have at least one of the required ids"""
matching_ids = [x for x in id_data if x["id_type"] in self.REQUIRED_IDS]
return len(matching_ids) > 0
def upsert_person(self, external_id: str, data: dict) -> Optional[Person]:
"""Add or update person"""
if not self._has_required_id(data["ids"]):
logger.error("missing_required_id", external_id=external_id)
return None
person = self._find_person_from_ids(data["ids"])
if person:
# Update data
person.first_name = data["first_name"]
person.last_name = data["last_name"]
person.date_of_birth = data["date_of_birth"]
person.gender = data["gender"]
person.registration_completed_date = datetime.date.today()
person.save()
logger.info("person_updated", person=person)
else:
# Create new person
person = Person.objects.create(
first_name=data["first_name"],
last_name=data["last_name"],
date_of_birth=data["date_of_birth"],
gender=data["gender"],
registration_completed_date=datetime.date.today(),
)
logger.info("person_created", person=person)
for identity in data["ids"]:
self.upsert_identity(person, identity)
for role in data["role"]:
self.upsert_role(person, role)
return person
def handle(self, *args, **options):
"""Import of Sponsors from Cerebrum."""
active_persons = Person.objects.all()
logger.info("import_start", nr_of_persons=len(active_persons))
self._orgunit_id_type = options["orgunit_type"]
self._end_date = options["end_date"]
with open(options["file"], "r", encoding="UTF-8") as fp:
persons = json.load(fp)
for external_id, person in persons.items():
self.upsert_person(external_id, person)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment