API Reference
Factory
Create a version-typed MQTT client.
Returns MQTTClientV311 when version="3.1.1", MQTTClientV5 when version="5.0". The concrete type is always MQTTClient; the return type is a Protocol view.
Client protocols
Core classes
Asyncio MQTT client with automatic reconnection.
Use as an async context manager. subscribe() returns a Subscription that is itself an async context manager.
__aenter__()
async
Connect to the broker and start the background run loop.
__aexit__(*exc)
async
Disconnect cleanly and cancel the run loop.
__init__(host, port=1883, *, client_id='', keepalive=60, clean_session=True, username=None, password=None, tls=False, reconnect=None, transport_factory=None, version='3.1.1', session_expiry_interval=0)
Create an MQTT client.
The client must be used as an async context manager to establish the connection::
async with MQTTClient("broker.example.com") as client:
await client.publish("sensors/temp", "22.5")
Prefer :func:create_client for version-typed access.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Broker hostname or IP address. |
required |
port
|
int
|
TCP port. Defaults to |
1883
|
client_id
|
str
|
Client identifier sent in CONNECT. An empty string lets the broker assign one. |
''
|
keepalive
|
int
|
Keepalive interval in seconds. |
60
|
clean_session
|
bool
|
Start with a clean session (MQTT 3.1.1) or discard any existing session state on connect. |
True
|
username
|
str | None
|
Optional username for broker authentication. |
None
|
password
|
str | None
|
Optional plain-text password for broker authentication. |
None
|
tls
|
SSLContext | bool
|
TLS configuration. Pass |
False
|
reconnect
|
ReconnectConfig | None
|
Reconnection policy. Defaults to
:class: |
None
|
transport_factory
|
TransportFactory | None
|
Override the low-level transport. Useful for testing. |
None
|
version
|
Literal['3.1.1', '5.0']
|
MQTT protocol version to use. Either |
'3.1.1'
|
session_expiry_interval
|
int
|
MQTT 5.0 session expiry interval in seconds.
|
0
|
auth(method, data=None)
async
Send an AUTH packet for enhanced authentication (MQTT 5.0 only).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
method
|
str
|
Authentication method name negotiated with the broker. |
required |
data
|
bytes | None
|
Optional authentication data to include in the packet. |
None
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the client is not using MQTT 5.0. |
MQTTDisconnectedError
|
If the client is not currently connected. |
connect()
async
Connect to the broker and start the background run loop.
Equivalent to entering the async context manager.
Must be paired with a corresponding :meth:disconnect call to send
DISCONNECT, close the socket, and stop the background run loop.
Example::
client = create_client("broker.example.com")
await client.connect()
# ... use the client
await client.disconnect()
Raises:
| Type | Description |
|---|---|
MQTTConnectError
|
If the broker refuses the connection. |
disconnect()
async
Disconnect cleanly and stop the background run loop.
Equivalent to exiting the async context manager. Sends DISCONNECT, closes the socket, and cancels the internal run loop task. Safe to call even if the connection has already been lost.
ping(timeout=10.0)
async
Send a PINGREQ and return the round-trip time in seconds.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Seconds to wait for PINGRESP before raising
:exc: |
10.0
|
Returns:
| Type | Description |
|---|---|
float
|
RTT in seconds. |
Raises:
| Type | Description |
|---|---|
MQTTDisconnectedError
|
If the client is not currently connected. |
MQTTTimeoutError
|
If no PINGRESP is received within timeout seconds. |
publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False, properties=None)
async
Publish a message to topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic string. Must not contain wildcards. |
required |
payload
|
bytes | str
|
Message body. |
required |
qos
|
QoS
|
Delivery guarantee level. Defaults to |
AT_MOST_ONCE
|
retain
|
bool
|
Ask the broker to retain the message for future subscribers. |
False
|
properties
|
PublishProperties | None
|
MQTT 5.0 publish properties. Raises if used with MQTT 3.1.1. |
None
|
Raises:
| Type | Description |
|---|---|
MQTTInvalidTopicError
|
If topic is empty, contains wildcards, or has
|
MQTTDisconnectedError
|
If the client is not currently connected. |
RuntimeError
|
If properties is supplied on an MQTT 3.1.1 connection. |
request(topic, payload, *, qos=QoS.AT_MOST_ONCE, timeout=30.0, properties=None)
async
Send a request and wait for exactly one reply (MQTT 5.0 only).
Publishes payload to topic with a response_topic property, then
waits for the first message that arrives on that topic and returns it.
Both response_topic and correlation_data are taken from
properties when set; otherwise they are generated automatically
(a unique _zmqtt/reply/<32 hex chars> topic and 16 random bytes
respectively).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Request topic. Must not contain wildcards. |
required |
payload
|
bytes | str
|
Request body. |
required |
qos
|
QoS
|
QoS for the outgoing request publish. |
AT_MOST_ONCE
|
timeout
|
float
|
Seconds to wait for the reply before raising
|
30.0
|
properties
|
PublishProperties | None
|
Publish properties for the request. |
None
|
Returns:
| Type | Description |
|---|---|
Message
|
The first |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the client is not using MQTT 5.0. |
MQTTInvalidTopicError
|
If |
MQTTDisconnectedError
|
If the connection is lost while waiting. |
TimeoutError
|
If no reply arrives within timeout seconds. |
subscribe(*filters, qos=QoS.AT_MOST_ONCE, auto_ack=True, receive_buffer_size=1000, no_local=False, retain_as_published=False)
Create a :class:Subscription for one or more topic filters.
The returned object must be used as an async context manager to activate the subscription and unsubscribe on exit::
async with client.subscribe("sensors/#", qos=QoS.AT_LEAST_ONCE) as sub:
async for msg in sub:
print(msg.topic, msg.payload)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*filters
|
str
|
One or more MQTT topic filters. Wildcards |
()
|
qos
|
QoS
|
Maximum QoS level requested from the broker. |
AT_MOST_ONCE
|
auto_ack
|
bool
|
Automatically send PUBACK/PUBREC upon receipt. Set to
|
True
|
receive_buffer_size
|
int
|
Maximum messages buffered in the internal queue. Older messages are dropped when the queue is full. |
1000
|
no_local
|
bool
|
Do not receive messages published by this client (MQTT 5.0 only). |
False
|
retain_as_published
|
bool
|
Preserve the retain flag on forwarded messages (MQTT 5.0 only). |
False
|
Raises:
| Type | Description |
|---|---|
MQTTInvalidTopicError
|
If any filter is empty, has |
RuntimeError
|
If no_local or retain_as_published are used on an MQTT 3.1.1 connection. |
Async context manager for an active topic subscription.
Registers filters on enter, unsubscribes on exit. Messages are available via get_message() or async iteration. Survives reconnection transparently — the queue keeps buffering and delivery resumes when the connection restores.
start()
async
Register the subscription filters with the broker.
Equivalent to entering the async context manager.
Must be paired with a corresponding :meth:stop call to send
UNSUBSCRIBE and release internal resources.
Example::
sub = client.subscribe("sensors/#", qos=QoS.AT_LEAST_ONCE)
await sub.start()
# ... later
await sub.stop()
Raises:
| Type | Description |
|---|---|
MQTTDisconnectedError
|
If the client is not currently connected. |
stop()
async
Unsubscribe from all filters and stop message delivery.
Equivalent to exiting the async context manager. Sends UNSUBSCRIBE to the broker and cancels internal relay tasks. Safe to call even if the connection has already been lost — the UNSUBSCRIBE is silently skipped in that case.
Example::
await sub.stop()
get_message()
async
Wait for and return the next message from the subscription queue.
__aenter__()
async
Register the subscription filters with the broker.
Raises:
| Type | Description |
|---|---|
MQTTDisconnectedError
|
If the client is not currently connected. |
__aexit__(*exc)
async
Unsubscribe from all filters and stop message delivery.
__aiter__()
Return self as the async iterator.
__anext__()
async
Return the next message, suspending until one is available.
Incoming MQTT message as delivered to application code.
Attributes:
| Name | Type | Description |
|---|---|---|
topic |
str
|
The topic on which the message was published. |
payload |
bytes
|
Raw message body as bytes. |
qos |
QoS
|
QoS level at which the message was delivered. |
retain |
bool
|
|
properties |
PublishProperties | None
|
MQTT 5.0 publish properties, or |
topic
instance-attribute
payload
instance-attribute
qos
instance-attribute
retain
instance-attribute
properties = None
class-attribute
instance-attribute
ack()
async
Send the protocol-level ack for this message. Idempotent; no-op when auto_ack=True.
Configuration
Configuration for automatic reconnection on connection loss.
Attributes:
| Name | Type | Description |
|---|---|---|
enabled |
bool
|
Whether to reconnect automatically. Set to |
initial_delay |
float
|
Seconds to wait before the first reconnection attempt. |
max_delay |
float
|
Upper bound (seconds) for the exponential back-off delay. |
backoff_factor |
float
|
Multiplier applied to the delay after each failed attempt. |
max_attempts |
int | None
|
Maximum total number of connection attempts before
giving up. |
Enumerations
Bases: IntEnum
MQTT Quality of Service delivery guarantee levels.
Attributes:
| Name | Type | Description |
|---|---|---|
AT_MOST_ONCE |
Fire-and-forget. No acknowledgement, no retries (QoS 0). |
|
AT_LEAST_ONCE |
Acknowledged delivery. The message may arrive more than once (QoS 1). |
|
EXACTLY_ONCE |
Four-way handshake guarantees exactly-once delivery (QoS 2). |
Properties (MQTT 5.0)
MQTT 5.0 properties attached to a PUBLISH packet.
Attributes:
| Name | Type | Description |
|---|---|---|
payload_format_indicator |
int | None
|
|
message_expiry_interval |
int | None
|
Seconds the broker retains the message. Omit to keep indefinitely. |
topic_alias |
int | None
|
Topic alias integer used instead of the full topic string on the wire. |
response_topic |
str | None
|
Topic the receiver should use when replying (request/response pattern). |
correlation_data |
bytes | None
|
Opaque bytes the sender uses to match a response to a request. |
subscription_identifier |
int | None
|
Identifier of the subscription that caused this message to be delivered. |
content_type |
str | None
|
MIME type describing the payload content. |
user_properties |
tuple[tuple[str, str], ...]
|
Arbitrary key-value pairs forwarded with the message. |
MQTT 5.0 properties sent in the CONNECT packet.
Attributes:
| Name | Type | Description |
|---|---|---|
session_expiry_interval |
int | None
|
Seconds before the broker discards the session
after disconnect. |
receive_maximum |
int | None
|
Maximum number of QoS 1/2 messages the client will process concurrently. |
maximum_packet_size |
int | None
|
Largest packet size (bytes) the client will accept. |
topic_alias_maximum |
int | None
|
Number of topic aliases the client supports. |
request_response_information |
bool | None
|
Ask the broker to include response information in CONNACK. |
request_problem_information |
bool | None
|
Ask the broker to include reason strings and user properties on errors. |
authentication_method |
str | None
|
Method name for enhanced authentication. |
authentication_data |
bytes | None
|
Initial data for enhanced authentication. |
user_properties |
tuple[tuple[str, str], ...]
|
Arbitrary key-value pairs forwarded to the broker. |
MQTT 5.0 properties sent in an AUTH packet.
Attributes:
| Name | Type | Description |
|---|---|---|
authentication_method |
str | None
|
Method name for the enhanced authentication exchange. |
authentication_data |
bytes | None
|
Method-specific data for this step of the authentication exchange. |
reason_string |
str | None
|
Human-readable reason included for diagnostic purposes. |
user_properties |
tuple[tuple[str, str], ...]
|
Arbitrary key-value pairs forwarded with the packet. |
Exceptions
Bases: MQTTError
Unexpected or malformed packet received.
Bases: MQTTError
Connection lost unexpectedly.
Bases: MQTTError
An MQTT operation did not complete within the allotted time.
Bases: MQTTError
Topic string or topic filter failed MQTT validation.