diff --git a/greg/management/commands/start_notification_publisher.py b/greg/management/commands/start_notification_publisher.py index b73d26fc78034753b04423f595041b2887eed2e5..c7257c948ae2ad522055f326539ddb97e2fb817c 100644 --- a/greg/management/commands/start_notification_publisher.py +++ b/greg/management/commands/start_notification_publisher.py @@ -10,8 +10,8 @@ from django.conf import settings from django.core.management.base import BaseCommand from pika_context_manager import PCM -from greg.api.serializers import get_serializer from greg.models import Notification +from greg.utils import camel_to_snake logging.config.dictConfig(settings.LOGGING) logger = logging.getLogger() @@ -24,21 +24,61 @@ def exception_handler(ex_cls, ex, tb): sys.excepthook = exception_handler -def generate_routing_key(system_name, n): - return "{system_name}.{object_type}.{operation}".format( - system_name=system_name, **n - ) - - def get_notifications(limit): return Notification.objects.all()[:limit] +def generate_event_type(n: Notification) -> str: + """ + Generate CloudEvent compliant type field. + + This can be used as routing key as well so that a future change to amqp where + routing key is removed results in minor changes to the code. + """ + object_type = camel_to_snake(n.object_type) + return f"{settings.INTERNAL_RK_PREFIX}.{object_type}.{n.operation}" + + +def create_cloud_event_payload(n: Notification) -> str: + """ + Produce a notification payload from a Notification object that matches CloudEvents + spec. + + This ensures that the combination of source and id is unique for all messages + published from this instance of greg. If you mix messages from multiple instances of + greg this will fail. + + Using the object_type and the identifier gives us which api endpoint to use and + which id to look up. + + specversion specifies version of CloudEvents spec, in our case 1.0. Note we're using + only major and minor versions. This function was written at the time of specversion + 1.0.1. + + The type describes which sort of event has happened to the object identifier by id + and source, i.e no.local.greg.person.add + """ + + object_type = camel_to_snake(n.object_type) + return json.dumps( + { + "id": str(n.id), + "source": f"urn:greg:{settings.INSTANCE_NAME}:{object_type}:{n.identifier}", + "specversion": "1.0", + "type": generate_event_type(n), + } + ) + + def handle_one_notification(notification: Notification, pcm: PCM, exchange: str): - serializer = get_serializer(notification) - representation = serializer().to_representation(notification) - payload = json.dumps(representation) - routing_key = generate_routing_key("greg", representation) + """ + Publish a message to an exchange with the information contained in a Notification + and delete the Notification afterwards. + """ + payload = create_cloud_event_payload(notification) + # Use event_type for routing_key so that a future switch to amqp 1.0 that does not + # have routing keys is easier + routing_key = generate_event_type(notification) if pcm.publish( exchange,