Skip to content

Subscribing

client.subscribe()

subscribe() returns a Subscription object. Use it as an async context manager — the SUBSCRIBE packet is sent to the broker on entry and UNSUBSCRIBE is sent on exit:

async with client.subscribe("sensors/#") as sub:
    async for msg in sub:
        print(msg.topic, msg.payload.decode())

Manual subscription lifecycle

When nesting context managers does not fit your program structure — for example when subscriptions are added or removed dynamically at runtime — use start() and stop() directly:

sub = client.subscribe("sensors/#", qos=QoS.AT_LEAST_ONCE)
await sub.start()

async for msg in sub:
    ...

await sub.stop()

stop() sends UNSUBSCRIBE and releases internal resources. It is safe to call even if the connection has already been lost.

Buffering and backpressure

Each Subscription buffers incoming messages in an internal queue. The size of this buffer is controlled by the receive_buffer_size parameter, see Backpressure

If receive_buffer_size > 0 - limits the queue to that number of messages.

If receive_buffer_size == 0 - (default) → a safe default of 1000 messages is used.

When the buffer is full, message delivery is naturally slowed down (backpressure) instead of growing memory usage indefinitely.

Async iterator

async for loops indefinitely. Break out when you are done:

async with client.subscribe("commands/+") as sub:
    async for msg in sub:
        handle(msg)
        if msg.topic.endswith("/stop"):
            break

Explicit pull with get_message()

get_message() returns the next message, suspending until one arrives:

async with client.subscribe("telemetry/device/01") as sub:
    first  = await sub.get_message()
    second = await sub.get_message()

Use get_message() when you want pull-based control over when to read the next message. To bound memory when the consumer is slow, see Backpressure.

Message fields

Field Type Description
topic str Full topic the message was published on
payload bytes Raw payload bytes
qos QoS QoS level of the incoming message
retain bool True if this is a retained message
properties PublishProperties \| None MQTT 5.0 properties; None on 3.1.1

Multiple topic filters

Pass multiple filters in a single call. One Subscription object covers all of them:

async with client.subscribe("sensors/#", "alerts/+", "status") as sub:
    async for msg in sub:
        ...

QoS on subscribe

async with client.subscribe("data/#", qos=QoS.AT_LEAST_ONCE) as sub:
    ...

The broker delivers messages at the lower of the publish QoS and the subscribe QoS.

RetainHandling (MQTT 5.0)

RetainHandling controls whether retained messages are sent when you subscribe. Available values:

Value Behaviour
SEND_ON_SUBSCRIBE (default) Send retained messages on every SUBSCRIBE
SEND_IF_NOT_EXISTS Send only if no subscription already exists
DO_NOT_SEND Never send retained messages
from zmqtt import RetainHandling

async with client.subscribe("status/#", retain_handling=RetainHandling.SEND_IF_NOT_EXISTS) as sub:
    ...

Wildcard filter priority

When multiple filters in the same Subscription match an incoming topic, zmqtt routes the message to exactly one internal queue: the one that corresponds to the most specific matching filter. Delivery is never duplicated inside zmqtt.

Specificity is compared level-by-level, left to right:

  • A literal segment beats +
  • + beats #

Example: if you subscribe to both a/b and a/#, a message on a/b is routed to a/b only:

async with client.subscribe("a/b", "a/#") as sub:
    # message published to "a/b" → delivered once, matched by "a/b"
    # message published to "a/c" → delivered once, matched by "a/#"

Broker behaviour vs zmqtt behaviour

Some brokers can deliver multiple PUBLISH packets to a client when the client has overlapping subscriptions that all match the same topic. In practice:

  • Some brokers may deliver only one message.
  • Others may deliver one message per matching subscription.

zmqtt's routing rule above applies after packets are received: if multiple PUBLISH packets arrive from the broker, zmqtt will still enqueue each received packet (because they are distinct network deliveries). If only one PUBLISH arrives, zmqtt ensures it is delivered to the most specific matching filter queue.

Tie-breaking

When two filters tie (same specificity), zmqtt logs a WARNING and routes the message to whichever filter was registered first.

Duplicate-filter guard

Warning

Subscribing the same filter string across two separate Subscription objects logs a WARNING. The second subscription gets no queue for that filter — it will receive get_message() results from other filters only. The SUBSCRIBE is still forwarded to the broker.

async with client.subscribe("data/temp") as sub1:
    async with client.subscribe("data/temp") as sub2:  # WARNING logged
        # sub2 receives nothing for "data/temp"
        ...

Prefer multiple filters in a single subscribe() call rather than multiple overlapping subscriptions. See Logging for how to observe this warning at runtime.


See also: Manual Ack · Backpressure · MQTT 5.0 · Scaling