diff --git a/greg/management/commands/notification_publisher.py b/greg/management/commands/notification_publisher.py index f775b339bb5fb71bd759e90f091c3ef618802c8d..a683800f49d249959e345ff78b46f495e24a584c 100644 --- a/greg/management/commands/notification_publisher.py +++ b/greg/management/commands/notification_publisher.py @@ -21,14 +21,23 @@ def get_notifications(limit): def generate_event_type(n: Notification) -> str: """ Generate CloudEvent compliant type field. - - This can be used as routing key as well so that a future change to amqp where - routing key is removed results in minor changes to the code. """ object_type = camel_to_snake(n.object_type) return f"{object_type}.{n.operation}" +def generate_routing_key(n: Notification) -> str: + """ + Generate an AMQP routing key. + + By default this equals the payload type field, but is prefixed with the + value in settings.NOTIFICATION_ROUTING_KEY_PREFIX + """ + event_type = generate_event_type(n) + prefix = settings.NOTIFICATION_ROUTING_KEY_PREFIX + return f"{prefix}{event_type}" + + def create_cloud_event_payload(n: Notification) -> str: """ Produce a notification payload from a Notification object that matches CloudEvents @@ -66,9 +75,7 @@ def handle_one_notification(notification: Notification, pcm: PCM, exchange: str) and delete the Notification afterwards. """ payload = create_cloud_event_payload(notification) - # Use event_type for routing_key so that a future switch to amqp 1.0 that does not - # have routing keys is easier - routing_key = generate_event_type(notification) + routing_key = generate_routing_key(notification) if pcm.publish( exchange, diff --git a/greg/tests/management/test_notification_publisher.py b/greg/tests/management/test_notification_publisher.py index c428d7bc32d9c29896be2c5478a20c28aeb31c08..8adc7ba6396ec36b916c7921650d78fd4005f391 100644 --- a/greg/tests/management/test_notification_publisher.py +++ b/greg/tests/management/test_notification_publisher.py @@ -4,6 +4,7 @@ import pytest from greg.management.commands.notification_publisher import ( handle_one_notification, create_cloud_event_payload, + generate_routing_key, ) from greg.models import Notification @@ -41,3 +42,10 @@ def test_create_cloud_event_payload(role_type_notification): "type": "role_type.update", "data": {"integer": 123, "string": "foo"}, } + + +@pytest.mark.django_db +def test_generate_routing_key(role_type_notification, settings): + assert generate_routing_key(role_type_notification) == "role_type.update" + settings.NOTIFICATION_ROUTING_KEY_PREFIX = "foo." + assert generate_routing_key(role_type_notification) == "foo.role_type.update" diff --git a/gregsite/settings/base.py b/gregsite/settings/base.py index 42400e6aeca647eb5f11d84a108e84552b648f80..90ae275a9abf6a651418cfe9c84a1f1e4efd4cc0 100644 --- a/gregsite/settings/base.py +++ b/gregsite/settings/base.py @@ -266,6 +266,9 @@ structlog.configure( ) +# By default, the routing key format is "object_type.operation" +# Setting the prefix to "foo." will result in "foo.object_type.operation" +NOTIFICATION_ROUTING_KEY_PREFIX = "" NOTIFICATION_PUBLISHER = { "mq": { "connection": {