From b33ace81b08942cf0e805b034f1d682aafef59b1 Mon Sep 17 00:00:00 2001
From: Andreas Ellewsen <andreas@ellewsen.no>
Date: Thu, 2 Sep 2021 16:00:46 +0200
Subject: [PATCH] Use CloudEvents format for message publishing

---
 .../commands/start_notification_publisher.py  | 62 +++++++++++++++----
 1 file changed, 51 insertions(+), 11 deletions(-)

diff --git a/greg/management/commands/start_notification_publisher.py b/greg/management/commands/start_notification_publisher.py
index b73d26fc..c7257c94 100644
--- a/greg/management/commands/start_notification_publisher.py
+++ b/greg/management/commands/start_notification_publisher.py
@@ -10,8 +10,8 @@ from django.conf import settings
 from django.core.management.base import BaseCommand
 from pika_context_manager import PCM
 
-from greg.api.serializers import get_serializer
 from greg.models import Notification
+from greg.utils import camel_to_snake
 
 logging.config.dictConfig(settings.LOGGING)
 logger = logging.getLogger()
@@ -24,21 +24,61 @@ def exception_handler(ex_cls, ex, tb):
 sys.excepthook = exception_handler
 
 
-def generate_routing_key(system_name, n):
-    return "{system_name}.{object_type}.{operation}".format(
-        system_name=system_name, **n
-    )
-
-
 def get_notifications(limit):
     return Notification.objects.all()[:limit]
 
 
+def generate_event_type(n: Notification) -> str:
+    """
+    Generate CloudEvent compliant type field.
+
+    This can be used as routing key as well so that a future change to amqp where
+    routing key is removed results in minor changes to the code.
+    """
+    object_type = camel_to_snake(n.object_type)
+    return f"{settings.INTERNAL_RK_PREFIX}.{object_type}.{n.operation}"
+
+
+def create_cloud_event_payload(n: Notification) -> str:
+    """
+    Produce a notification payload from a Notification object that matches CloudEvents
+    spec.
+
+    This ensures that the combination of source and id is unique for all messages
+    published from this instance of greg. If you mix messages from multiple instances of
+    greg this will fail.
+
+    Using the object_type and the identifier gives us which api endpoint to use and
+    which id to look up.
+
+    specversion specifies version of CloudEvents spec, in our case 1.0. Note we're using
+    only major and minor versions. This function was written at the time of specversion
+    1.0.1.
+
+    The type describes which sort of event has happened to the object identifier by id
+    and source, i.e no.local.greg.person.add
+    """
+
+    object_type = camel_to_snake(n.object_type)
+    return json.dumps(
+        {
+            "id": str(n.id),
+            "source": f"urn:greg:{settings.INSTANCE_NAME}:{object_type}:{n.identifier}",
+            "specversion": "1.0",
+            "type": generate_event_type(n),
+        }
+    )
+
+
 def handle_one_notification(notification: Notification, pcm: PCM, exchange: str):
-    serializer = get_serializer(notification)
-    representation = serializer().to_representation(notification)
-    payload = json.dumps(representation)
-    routing_key = generate_routing_key("greg", representation)
+    """
+    Publish a message to an exchange with the information contained in a Notification
+    and delete the Notification afterwards.
+    """
+    payload = create_cloud_event_payload(notification)
+    # Use event_type for routing_key so that a future switch to amqp 1.0 that does not
+    # have routing keys is easier
+    routing_key = generate_event_type(notification)
 
     if pcm.publish(
         exchange,
-- 
GitLab