Skip to content
Snippets Groups Projects
Commit b33ace81 authored by Andreas Ellewsen's avatar Andreas Ellewsen
Browse files

Use CloudEvents format for message publishing

parent c99132cf
No related branches found
No related tags found
1 merge request!40Use CloudEvents format for message publishing
Pipeline #91885 passed
......@@ -10,8 +10,8 @@ from django.conf import settings
from django.core.management.base import BaseCommand
from pika_context_manager import PCM
from greg.api.serializers import get_serializer
from greg.models import Notification
from greg.utils import camel_to_snake
logging.config.dictConfig(settings.LOGGING)
logger = logging.getLogger()
......@@ -24,21 +24,61 @@ def exception_handler(ex_cls, ex, tb):
sys.excepthook = exception_handler
def generate_routing_key(system_name, n):
return "{system_name}.{object_type}.{operation}".format(
system_name=system_name, **n
)
def get_notifications(limit):
return Notification.objects.all()[:limit]
def generate_event_type(n: Notification) -> str:
"""
Generate CloudEvent compliant type field.
This can be used as routing key as well so that a future change to amqp where
routing key is removed results in minor changes to the code.
"""
object_type = camel_to_snake(n.object_type)
return f"{settings.INTERNAL_RK_PREFIX}.{object_type}.{n.operation}"
def create_cloud_event_payload(n: Notification) -> str:
"""
Produce a notification payload from a Notification object that matches CloudEvents
spec.
This ensures that the combination of source and id is unique for all messages
published from this instance of greg. If you mix messages from multiple instances of
greg this will fail.
Using the object_type and the identifier gives us which api endpoint to use and
which id to look up.
specversion specifies version of CloudEvents spec, in our case 1.0. Note we're using
only major and minor versions. This function was written at the time of specversion
1.0.1.
The type describes which sort of event has happened to the object identifier by id
and source, i.e no.local.greg.person.add
"""
object_type = camel_to_snake(n.object_type)
return json.dumps(
{
"id": str(n.id),
"source": f"urn:greg:{settings.INSTANCE_NAME}:{object_type}:{n.identifier}",
"specversion": "1.0",
"type": generate_event_type(n),
}
)
def handle_one_notification(notification: Notification, pcm: PCM, exchange: str):
serializer = get_serializer(notification)
representation = serializer().to_representation(notification)
payload = json.dumps(representation)
routing_key = generate_routing_key("greg", representation)
"""
Publish a message to an exchange with the information contained in a Notification
and delete the Notification afterwards.
"""
payload = create_cloud_event_payload(notification)
# Use event_type for routing_key so that a future switch to amqp 1.0 that does not
# have routing keys is easier
routing_key = generate_event_type(notification)
if pcm.publish(
exchange,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment