Message AMGI Message Format¶
The Message AMGI sub-specification outlines how messages are sent, and received within AMGI.
It is deliberately designed to be agnostic where possible. Terminology is taken from AsyncAPI so as to follow their agnosticism.
A simple implementation would be:
async def app(scope, receive, send):
if scope["type"] == "message":
more_messages = True
while more_messages:
message = await receive()
message_id = message["id"]
try:
headers = message["headers"]
payload = message.get("payload")
bindings = message.get("bindings", {})
... # Do some message handling here!
await send(
{
"type": "message.ack",
"id": message_id,
}
)
except Exception as e:
await send(
{
"type": "message.nack",
"id": message_id,
"message": str(e),
}
)
more_messages = message.get("more_messages")
else:
pass # Handle other types
Message¶
A message batch has a single message scope. Your application will be called once per batch. For protocols that do not support batched consumption a batch of one message should be sent to the application.
The message scope information passed in scope contains:
scope["type"](Literal['message'])scope["amgi"]["spec_version"](str) – Version of the AMGI message spec this server understands-scope["amgi"]["version"](Literal['1.0']) – Version of the AMGI specscope["address"](str) – The address of the batch of messages, for example, in Kafka this would be the topicscope["state"](NotRequired[dict[str,Any]]) – A copy of the namespace passed into the lifespan corresponding to this batch. Optional; if missing the server does not support this feature.scope["extensions"](NotRequired[dict[str,dict[str,Any]]]) – Extensions allow AMGI servers to advertise optional capabilities to applications. Extensions are provided via scope and are opt-in: applications MUST assume an extension is unsupported unless it is explicitly present.
Receive message - receive() event¶
Sent to the application to indicate an incoming message in the batch.
Keys:
message["type"](Literal['message.receive'])message["id"](str) – A unique id for the message, used to ack, or nack the messagemessage["headers"](Iterable[tuple[bytes,bytes]]) – Includes the headers of the messagemessage["payload"](NotRequired[Optional[bytes]]) – Payload of the message, which can beNoneorbytes. If missing, it defaults toNonemessage["bindings"](NotRequired[dict[str,dict[str,Any]]]) – Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key:{"kafka": {"key": b"key"}}message["more_messages"](NotRequired[bool]) – Indicates there are more messages to process in the batch. The application should keep receiving until it receivesFalse. If missing it defaults toFalse
Response message ack - send() event¶
Sent by the application to signify that it has successfully acknowledged a message.
Keys:
Response message nack - send() event¶
Sent by the application to signify that it could not process a message.
Keys:
Response message send - send() event¶
Sent by the application to send a message. If the server fails to send the message, the server should raise a
server-specific subclass of OSError.
Keys:
message["type"](Literal['message.send'])message["address"](str) – Address to send the message tomessage["headers"](Iterable[tuple[bytes,bytes]]) – Headers of the messagemessage["payload"](NotRequired[Optional[bytes]]) – Payload of the message, which can beNone, orbytes. If missing, it defaults toNone.message["bindings"](NotRequired[dict[str,dict[str,Any]]]) – Protocol specific bindings to send. This can be bindings for multiple protocols, allowing the server to decide to handle them, or ignore them.
Bindings Object¶
Both "message.receive", and "message.send" can contain a bindings object. These are defined as per protocol
specifications.