Request / Response
MQTT 5.0 defines a first-class request/response pattern via two
PUBLISH properties: response_topic and correlation_data. zmqtt
implements this as a single await client.request(…) call.
Basic usage
from zmqtt import create_client
async with create_client("broker", version="5.0") as client:
reply = await client.request("services/calculator", b"2+2")
print(reply.payload)
request() handles the full flow automatically:
- Subscribes to a unique reply topic before publishing.
- Publishes the request with
response_topicandcorrelation_dataset. - Waits for the first message on the reply topic and returns it.
- Unsubscribes on return, timeout, or cancellation.
Customising via PublishProperties
Pass a PublishProperties instance to control any field of the outgoing
PUBLISH. Two fields receive special treatment:
| Field | Behaviour |
|---|---|
response_topic |
Used as the reply topic instead of the auto-generated one. Must not contain wildcards. |
correlation_data |
Forwarded to the responder as-is. Auto-generated (16 random bytes) when absent. |
All other fields (content_type, message_expiry_interval,
user_properties, …) are forwarded unchanged.
from zmqtt import PublishProperties
reply = await client.request(
"services/translate",
b"hello",
properties=PublishProperties(
content_type="text/plain",
response_topic="my-app/replies/translate",
correlation_data=b"req-001",
),
timeout=10.0,
)
Implementing a responder
The responder reads response_topic and correlation_data from the
incoming message and publishes the reply there:
async with client.subscribe("services/translate") as sub:
async for msg in sub:
assert msg.properties is not None
result = translate(msg.payload)
await client.publish(
msg.properties.response_topic,
result,
properties=PublishProperties(
correlation_data=msg.properties.correlation_data,
),
)
Timeout
request() raises asyncio.TimeoutError when no reply arrives within
timeout seconds (default 30.0). The reply subscription is always
cleaned up, even on timeout or cancellation.
import asyncio
try:
reply = await client.request("slow/service", b"ping", timeout=5.0)
except asyncio.TimeoutError:
print("Service did not respond in time")
Errors
| Exception | Raised when |
|---|---|
RuntimeError |
request() is called on an MQTT 3.1.1 connection |
MQTTInvalidTopicError |
properties.response_topic contains wildcards |
MQTTDisconnectedError |
Connection is lost while waiting for the reply |
asyncio.TimeoutError |
No reply arrives within timeout seconds |
Note
request() is only available on MQTT 5.0 connections. Use
create_client(…, version="5.0") or MQTTClient(…, version="5.0").
See also: MQTT 5.0 · Publishing · Error Handling