MQTT 5.0
Enabling MQTT 5.0
Pass version="5.0" to create_client() (see Version selection):
from zmqtt import create_client
async with create_client("localhost", version="5.0") as client:
...
The return type is MQTTClientV5, which exposes 5.0-specific methods and accepts 5.0-specific parameters.
Session expiry interval
Controls how long the broker preserves your session after disconnect. 0 (default) means the session ends immediately on disconnect; 0xFFFFFFFF means the session never expires.
async with create_client("localhost", version="5.0", session_expiry_interval=3600) as client:
# Session survives for 1 hour after disconnect
...
Publish properties
PublishProperties can be attached to any publish() call on a 5.0 connection:
from zmqtt import PublishProperties
props = PublishProperties(
message_expiry_interval=300, # broker discards after 300 s
content_type="application/json",
response_topic="replies/my-app",
correlation_data=b"request-id-42",
user_properties=(("x-source", "sensor-01"), ("x-region", "eu-west")),
)
await client.publish("data/readings", b'{"temp": 23.4}', properties=props)
Received messages expose properties via msg.properties (a PublishProperties instance or None):
async for msg in sub:
if msg.properties and msg.properties.response_topic:
await client.publish(
msg.properties.response_topic,
b"ok",
properties=PublishProperties(
correlation_data=msg.properties.correlation_data,
),
)
PublishProperties fields
| Field | Type | Description |
|---|---|---|
payload_format_indicator |
int \| None |
0 = bytes, 1 = UTF-8 string |
message_expiry_interval |
int \| None |
Seconds until broker discards |
topic_alias |
int \| None |
Topic alias integer |
response_topic |
str \| None |
Topic for response messages |
correlation_data |
bytes \| None |
Request/response correlation token |
subscription_identifier |
int \| None |
Set by broker, not by publisher |
content_type |
str \| None |
MIME type of the payload |
user_properties |
tuple[tuple[str, str], ...] |
Arbitrary key-value pairs |
Subscribe options (5.0 only)
Additional keyword arguments are available on 5.0 connections:
async with client.subscribe(
"local/events",
no_local=True, # do not receive own publishes
retain_as_published=True, # preserve the retain flag as published
) as sub:
...
| Option | Type | Description |
|---|---|---|
no_local |
bool |
Skip messages published by this client |
retain_as_published |
bool |
Forward the original retain flag, not the delivery flag |
Warning
no_local and retain_as_published raise RuntimeError when used on a version="3.1.1" connection.
Request / response (client.request())
Send a request and await exactly one reply in a single call:
reply = await client.request("services/echo", b"hello", timeout=5.0)
print(reply.payload) # b"hello"
zmqtt manages the reply topic subscription, the response_topic /
correlation_data PUBLISH properties, and cleanup on timeout or
cancellation automatically. See Request / Response
for the full API and responder example.
Enhanced authentication (client.auth())
Send an AUTH packet for extended authentication flows (e.g. SCRAM, Kerberos):
await client.auth("SCRAM-SHA-256", data=b"client-first-message")
The method string is sent as the MQTT 5.0 authentication_method in the AUTH packet. This is an advanced feature — most applications do not need it.
CONNACK / DISCONNECT reason codes
In MQTT 5.0, CONNACK and DISCONNECT packets carry a reason code. The library surfaces MQTTConnectError with return_code set to the CONNACK reason code on failure. Common 5.0 reason codes:
| Code | Name |
|---|---|
| 0x00 | Success |
| 0x04 | Disconnect with will message |
| 0x80 | Unspecified error |
| 0x81 | Malformed packet |
| 0x87 | Not authorised |
| 0x88 | Server unavailable |
| 0x8A | Bad authentication |
See the MQTT 5.0 spec for the full list.
See also: Connecting — Version selection · Publishing · Subscribing · Error Handling