Message AMGI Message Format

The Message AMGI sub-specification outlines how messages are sent, and received within AMGI.

It is deliberately designed to be agnostic where possible. Terminology is taken from AsyncAPI so as to follow their agnosticism.

A simple implementation would be:

async def app(scope, receive, send):
    if scope["type"] == "message":
        more_messages = True
        while more_messages:
            message = await receive()
            message_id = message["id"]
            try:
                headers = message["headers"]
                payload = message.get("payload")
                bindings = message.get("bindings", {})
                ...  # Do some message handling here!
                await send(
                    {
                        "type": "message.ack",
                        "id": message_id,
                    }
                )
            except Exception as e:
                await send(
                    {
                        "type": "message.nack",
                        "id": message_id,
                        "message": str(e),
                    }
                )
            more_messages = message.get("more_messages")
    else:
        pass  # Handle other types

Message

A message batch has a single message scope. Your application will be called once per batch. For protocols that do not support batched consumption a batch of one message should be sent to the application.

The message scope information passed in scope contains:

  • scope["type"] (Literal['message'])

  • scope["amgi"]["spec_version"] (str) – Version of the AMGI message spec this server understands-

  • scope["amgi"]["version"] (Literal['1.0']) – Version of the AMGI spec

  • scope["address"] (str) – The address of the batch of messages, for example, in Kafka this would be the topic

  • scope["state"] (NotRequired[dict[str, Any]]) – A copy of the namespace passed into the lifespan corresponding to this batch. Optional; if missing the server does not support this feature.

  • scope["extensions"] (NotRequired[dict[str, dict[str, Any]]]) – Extensions allow AMGI servers to advertise optional capabilities to applications. Extensions are provided via scope and are opt-in: applications MUST assume an extension is unsupported unless it is explicitly present.

Receive message - receive() event

Sent to the application to indicate an incoming message in the batch.

Keys:

  • message["type"] (Literal['message.receive'])

  • message["id"] (str) – A unique id for the message, used to ack, or nack the message

  • message["headers"] (Iterable[tuple[bytes, bytes]]) – Includes the headers of the message

  • message["payload"] (NotRequired[Optional[bytes]]) – Payload of the message, which can be None or bytes. If missing, it defaults to None

  • message["bindings"] (NotRequired[dict[str, dict[str, Any]]]) – Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key: {"kafka": {"key": b"key"}}

  • message["more_messages"] (NotRequired[bool]) – Indicates there are more messages to process in the batch. The application should keep receiving until it receives False. If missing it defaults to False

Response message ack - send() event

Sent by the application to signify that it has successfully acknowledged a message.

Keys:

  • message["type"] (Literal['message.ack'])

  • message["id"] (str) – The unique id of the message

Response message nack - send() event

Sent by the application to signify that it could not process a message.

Keys:

  • message["type"] (Literal['message.nack'])

  • message["id"] (str) – The unique id of the message

  • message["message"] (str) – A message indicating why the message could not be processed

Response message send - send() event

Sent by the application to send a message. If the server fails to send the message, the server should raise a server-specific subclass of OSError.

Keys:

  • message["type"] (Literal['message.send'])

  • message["address"] (str) – Address to send the message to

  • message["headers"] (Iterable[tuple[bytes, bytes]]) – Headers of the message

  • message["payload"] (NotRequired[Optional[bytes]]) – Payload of the message, which can be None, or bytes. If missing, it defaults to None.

  • message["bindings"] (NotRequired[dict[str, dict[str, Any]]]) – Protocol specific bindings to send. This can be bindings for multiple protocols, allowing the server to decide to handle them, or ignore them.

Bindings Object

Both "message.receive", and "message.send" can contain a bindings object. These are defined as per protocol specifications.