Skip to content

API Reference

Factory

Create a version-typed MQTT client.

Returns MQTTClientV311 when version="3.1.1", MQTTClientV5 when version="5.0". The concrete type is always MQTTClient; the return type is a Protocol view.

Client protocols

Bases: Protocol

Bases: Protocol

Type-safe view of MQTTClient for MQTT 5.0 connections.

Core classes

Asyncio MQTT client with automatic reconnection.

Use as an async context manager. subscribe() returns a Subscription that is itself an async context manager.

__aenter__() async

Connect to the broker and start the background run loop.

__aexit__(*exc) async

Disconnect cleanly and cancel the run loop.

__init__(host, port=1883, *, client_id='', keepalive=60, clean_session=True, username=None, password=None, tls=False, reconnect=None, transport_factory=None, version='3.1.1', session_expiry_interval=0)

Create an MQTT client.

The client must be used as an async context manager to establish the connection::

async with MQTTClient("broker.example.com") as client:
    await client.publish("sensors/temp", "22.5")

Prefer :func:create_client for version-typed access.

Parameters:

Name Type Description Default
host str

Broker hostname or IP address.

required
port int

TCP port. Defaults to 1883 (8883 is conventional for TLS).

1883
client_id str

Client identifier sent in CONNECT. An empty string lets the broker assign one.

''
keepalive int

Keepalive interval in seconds. 0 disables keepalive.

60
clean_session bool

Start with a clean session (MQTT 3.1.1) or discard any existing session state on connect.

True
username str | None

Optional username for broker authentication.

None
password str | None

Optional plain-text password for broker authentication.

None
tls SSLContext | bool

TLS configuration. Pass True for default TLS, an :class:ssl.SSLContext for custom settings, or False (default) for a plain TCP connection.

False
reconnect ReconnectConfig | None

Reconnection policy. Defaults to :class:ReconnectConfig with exponential back-off enabled.

None
transport_factory TransportFactory | None

Override the low-level transport. Useful for testing.

None
version Literal['3.1.1', '5.0']

MQTT protocol version to use. Either "3.1.1" (default) or "5.0".

'3.1.1'
session_expiry_interval int

MQTT 5.0 session expiry interval in seconds. 0 means the session expires on disconnect.

0

auth(method, data=None) async

Send an AUTH packet for enhanced authentication (MQTT 5.0 only).

Parameters:

Name Type Description Default
method str

Authentication method name negotiated with the broker.

required
data bytes | None

Optional authentication data to include in the packet.

None

Raises:

Type Description
RuntimeError

If the client is not using MQTT 5.0.

MQTTDisconnectedError

If the client is not currently connected.

connect() async

Connect to the broker and start the background run loop.

Equivalent to entering the async context manager. Must be paired with a corresponding :meth:disconnect call to send DISCONNECT, close the socket, and stop the background run loop.

Example::

client = create_client("broker.example.com")
await client.connect()
# ... use the client
await client.disconnect()

Raises:

Type Description
MQTTConnectError

If the broker refuses the connection.

disconnect() async

Disconnect cleanly and stop the background run loop.

Equivalent to exiting the async context manager. Sends DISCONNECT, closes the socket, and cancels the internal run loop task. Safe to call even if the connection has already been lost.

ping(timeout=10.0) async

Send a PINGREQ and return the round-trip time in seconds.

Parameters:

Name Type Description Default
timeout float

Seconds to wait for PINGRESP before raising :exc:MQTTTimeoutError.

10.0

Returns:

Type Description
float

RTT in seconds.

Raises:

Type Description
MQTTDisconnectedError

If the client is not currently connected.

MQTTTimeoutError

If no PINGRESP is received within timeout seconds.

publish(topic, payload, *, qos=QoS.AT_MOST_ONCE, retain=False, properties=None) async

Publish a message to topic.

Parameters:

Name Type Description Default
topic str

Topic string. Must not contain wildcards.

required
payload bytes | str

Message body. str values are UTF-8 encoded automatically.

required
qos QoS

Delivery guarantee level. Defaults to AT_MOST_ONCE.

AT_MOST_ONCE
retain bool

Ask the broker to retain the message for future subscribers.

False
properties PublishProperties | None

MQTT 5.0 publish properties. Raises if used with MQTT 3.1.1.

None

Raises:

Type Description
MQTTInvalidTopicError

If topic is empty, contains wildcards, or has $ in a non-leading position.

MQTTDisconnectedError

If the client is not currently connected.

RuntimeError

If properties is supplied on an MQTT 3.1.1 connection.

request(topic, payload, *, qos=QoS.AT_MOST_ONCE, timeout=30.0, properties=None) async

Send a request and wait for exactly one reply (MQTT 5.0 only).

Publishes payload to topic with a response_topic property, then waits for the first message that arrives on that topic and returns it.

Both response_topic and correlation_data are taken from properties when set; otherwise they are generated automatically (a unique _zmqtt/reply/<32 hex chars> topic and 16 random bytes respectively).

Parameters:

Name Type Description Default
topic str

Request topic. Must not contain wildcards.

required
payload bytes | str

Request body. str values are UTF-8 encoded automatically.

required
qos QoS

QoS for the outgoing request publish.

AT_MOST_ONCE
timeout float

Seconds to wait for the reply before raising asyncio.TimeoutError.

30.0
properties PublishProperties | None

Publish properties for the request. response_topic selects the reply topic (must not contain wildcards [MQTT-3.3.2-14]). correlation_data is forwarded as-is to the responder.

None

Returns:

Type Description
Message

The first Message received on the reply topic.

Raises:

Type Description
RuntimeError

If the client is not using MQTT 5.0.

MQTTInvalidTopicError

If properties.response_topic contains wildcards.

MQTTDisconnectedError

If the connection is lost while waiting.

TimeoutError

If no reply arrives within timeout seconds.

subscribe(*filters, qos=QoS.AT_MOST_ONCE, auto_ack=True, receive_buffer_size=1000, no_local=False, retain_as_published=False)

Create a :class:Subscription for one or more topic filters.

The returned object must be used as an async context manager to activate the subscription and unsubscribe on exit::

async with client.subscribe("sensors/#", qos=QoS.AT_LEAST_ONCE) as sub:
    async for msg in sub:
        print(msg.topic, msg.payload)

Parameters:

Name Type Description Default
*filters str

One or more MQTT topic filters. Wildcards + (single level) and # (multi-level) are supported.

()
qos QoS

Maximum QoS level requested from the broker.

AT_MOST_ONCE
auto_ack bool

Automatically send PUBACK/PUBREC upon receipt. Set to False to acknowledge manually via :meth:Message.ack.

True
receive_buffer_size int

Maximum messages buffered in the internal queue. Older messages are dropped when the queue is full.

1000
no_local bool

Do not receive messages published by this client (MQTT 5.0 only).

False
retain_as_published bool

Preserve the retain flag on forwarded messages (MQTT 5.0 only).

False

Raises:

Type Description
MQTTInvalidTopicError

If any filter is empty, has $ in a non-leading position, or contains a malformed wildcard (e.g. sensors# or a/b#/c).

RuntimeError

If no_local or retain_as_published are used on an MQTT 3.1.1 connection.

Async context manager for an active topic subscription.

Registers filters on enter, unsubscribes on exit. Messages are available via get_message() or async iteration. Survives reconnection transparently — the queue keeps buffering and delivery resumes when the connection restores.

start() async

Register the subscription filters with the broker. Equivalent to entering the async context manager. Must be paired with a corresponding :meth:stop call to send UNSUBSCRIBE and release internal resources.

Example::

sub = client.subscribe("sensors/#", qos=QoS.AT_LEAST_ONCE)
await sub.start()
# ... later
await sub.stop()

Raises:

Type Description
MQTTDisconnectedError

If the client is not currently connected.

stop() async

Unsubscribe from all filters and stop message delivery.

Equivalent to exiting the async context manager. Sends UNSUBSCRIBE to the broker and cancels internal relay tasks. Safe to call even if the connection has already been lost — the UNSUBSCRIBE is silently skipped in that case.

Example::

await sub.stop()

get_message() async

Wait for and return the next message from the subscription queue.

__aenter__() async

Register the subscription filters with the broker.

Raises:

Type Description
MQTTDisconnectedError

If the client is not currently connected.

__aexit__(*exc) async

Unsubscribe from all filters and stop message delivery.

__aiter__()

Return self as the async iterator.

__anext__() async

Return the next message, suspending until one is available.

Incoming MQTT message as delivered to application code.

Attributes:

Name Type Description
topic str

The topic on which the message was published.

payload bytes

Raw message body as bytes.

qos QoS

QoS level at which the message was delivered.

retain bool

True if the broker sent this as a retained message.

properties PublishProperties | None

MQTT 5.0 publish properties, or None for MQTT 3.1.1 connections.

topic instance-attribute

payload instance-attribute

qos instance-attribute

retain instance-attribute

properties = None class-attribute instance-attribute

ack() async

Send the protocol-level ack for this message. Idempotent; no-op when auto_ack=True.

Configuration

Configuration for automatic reconnection on connection loss.

Attributes:

Name Type Description
enabled bool

Whether to reconnect automatically. Set to False to let exceptions propagate immediately on disconnection.

initial_delay float

Seconds to wait before the first reconnection attempt.

max_delay float

Upper bound (seconds) for the exponential back-off delay.

backoff_factor float

Multiplier applied to the delay after each failed attempt.

max_attempts int | None

Maximum total number of connection attempts before giving up. None retries indefinitely.

Enumerations

Bases: IntEnum

MQTT Quality of Service delivery guarantee levels.

Attributes:

Name Type Description
AT_MOST_ONCE

Fire-and-forget. No acknowledgement, no retries (QoS 0).

AT_LEAST_ONCE

Acknowledged delivery. The message may arrive more than once (QoS 1).

EXACTLY_ONCE

Four-way handshake guarantees exactly-once delivery (QoS 2).

Bases: IntEnum

MQTT 5.0 subscription option controlling which retained messages are delivered.

Properties (MQTT 5.0)

MQTT 5.0 properties attached to a PUBLISH packet.

Attributes:

Name Type Description
payload_format_indicator int | None

0 for unspecified binary, 1 for UTF-8 encoded character data.

message_expiry_interval int | None

Seconds the broker retains the message. Omit to keep indefinitely.

topic_alias int | None

Topic alias integer used instead of the full topic string on the wire.

response_topic str | None

Topic the receiver should use when replying (request/response pattern).

correlation_data bytes | None

Opaque bytes the sender uses to match a response to a request.

subscription_identifier int | None

Identifier of the subscription that caused this message to be delivered.

content_type str | None

MIME type describing the payload content.

user_properties tuple[tuple[str, str], ...]

Arbitrary key-value pairs forwarded with the message.

MQTT 5.0 properties sent in the CONNECT packet.

Attributes:

Name Type Description
session_expiry_interval int | None

Seconds before the broker discards the session after disconnect. 0 means discard immediately; 0xFFFFFFFF means never expire.

receive_maximum int | None

Maximum number of QoS 1/2 messages the client will process concurrently.

maximum_packet_size int | None

Largest packet size (bytes) the client will accept.

topic_alias_maximum int | None

Number of topic aliases the client supports. 0 disables topic aliases.

request_response_information bool | None

Ask the broker to include response information in CONNACK.

request_problem_information bool | None

Ask the broker to include reason strings and user properties on errors.

authentication_method str | None

Method name for enhanced authentication.

authentication_data bytes | None

Initial data for enhanced authentication.

user_properties tuple[tuple[str, str], ...]

Arbitrary key-value pairs forwarded to the broker.

MQTT 5.0 properties sent in an AUTH packet.

Attributes:

Name Type Description
authentication_method str | None

Method name for the enhanced authentication exchange.

authentication_data bytes | None

Method-specific data for this step of the authentication exchange.

reason_string str | None

Human-readable reason included for diagnostic purposes.

user_properties tuple[tuple[str, str], ...]

Arbitrary key-value pairs forwarded with the packet.

Exceptions

Bases: Exception

Base class for all zmqtt exceptions.

Bases: MQTTError

CONNACK returned a non-zero return code.

Bases: MQTTError

Unexpected or malformed packet received.

Bases: MQTTError

Connection lost unexpectedly.

Bases: MQTTError

An MQTT operation did not complete within the allotted time.

Bases: MQTTError

Topic string or topic filter failed MQTT validation.