Manual Acknowledgement
By default, zmqtt acknowledges incoming messages automatically as soon as they are delivered to your queue. With auto_ack=False you take control: the broker-level ack is withheld until you explicitly call msg.ack().
Why use manual ack
Use auto_ack=False when you need a process-before-ack guarantee: the message should not be considered delivered until your processing is complete. If your process crashes before calling ack(), the broker (for QoS 1) or the library (for QoS 2) will redeliver.
Note
Manual ack only has effect at QoS 1 (AT_LEAST_ONCE) or QoS 2 (EXACTLY_ONCE). At QoS 0 (AT_MOST_ONCE) there is no acknowledgement protocol, so auto_ack=False is a no-op. See QoS levels for the delivery guarantees.
Enabling manual ack
async with client.subscribe("jobs/#", qos=QoS.AT_LEAST_ONCE, auto_ack=False) as sub:
async for msg in sub:
await process(msg) # do your work first
await msg.ack() # then acknowledge
msg.ack() is idempotent — calling it multiple times is safe, subsequent calls are no-ops.
QoS 1 semantics
For QoS 1 messages (AT_LEAST_ONCE), ack() sends PUBACK to the broker. Until PUBACK is sent, the broker may retransmit the message (e.g. on reconnect). Each retransmission appears as a new Message in your queue.
QoS 2 semantics
For QoS 2 messages (EXACTLY_ONCE), ack() sends PUBREC. The library then handles PUBREL and sends PUBCOMP automatically.
QoS 2 deduplication during the manual-ack window
Between receiving the initial PUBLISH and calling ack(), PUBREC has not been sent, so the broker may retransmit the PUBLISH (e.g. after a reconnect). The library silently drops the duplicate — your application sees the message exactly once regardless of how many retransmissions occur.
What happens on reconnect before ack()
- QoS 1: the broker retransmits the message after reconnection. Your queue receives it again as a new message with the DUP flag set.
- QoS 2: the library's deduplication logic ensures only one delivery regardless of reconnects.
Example: durable job processing
import asyncio
from zmqtt import create_client, QoS
async def main():
async with create_client("localhost") as client:
async with client.subscribe(
"jobs/process",
qos=QoS.AT_LEAST_ONCE,
auto_ack=False,
) as sub:
async for msg in sub:
try:
await process_job(msg.payload)
await msg.ack()
except Exception:
# Do NOT ack — broker will redeliver
raise
asyncio.run(main())
See also: Publishing — QoS levels · Reconnection · Error Handling