Skip to content

Alpha

faststream-sqlbroker is currently in alpha.

Tutorial#

Motivation#

The primary benefit of a message queue built on top of a relational database is the ability to insert messages transactionally, atomically with other database operations, thus enabling the transactional outbox pattern. Also, the relational database is usually the most readily available, already-provisioned piece of infrastructure for a given service.

Given a proper understanding of the trade-offs involved, a relational-database-based queue is an appropriate tool for many low-to-medium throughput, latency-tolerant uses, including as part of a larger messaging flow that involves a "proper" queue (e.g. as an outbox between a service and a queue).

Installation#

PostgreSQL, MySQL, and SQLite are currently supported.

pip install "faststream-sqlbroker"

Database Tables#

The SqlBroker requires two tables — message (active messages) and message_archive (completed/failed messages), with table names customizable via the broker's message_table_name and message_archive_table_name parameters. You can customize the tables to your liking (partition them, add indices, specify more specific data types like JSONB, etc.) as long as they generally conform to the following schemas. Schema check is done on startup if the brokers's validate_schema_on_start is True.

from datetime import datetime, timezone

from sqlalchemy import (
    JSON,
    BigInteger,
    Column,
    DateTime,
    Enum,
    LargeBinary,
    MetaData,
    String,
    Table,
)

from faststream_sqlbroker.sqlbroker.message import SqlBrokerMessageState

metadata = MetaData()

message = Table(
    "message",
    metadata,
    Column("id", BigInteger, primary_key=True),
    Column("queue", String(255), nullable=False, index=True),
    Column("headers", JSON, nullable=True),
    Column("payload", LargeBinary, nullable=False),
    Column(
        "state",
        Enum(SqlBrokerMessageState),
        nullable=False,
        index=True,
        server_default=SqlBrokerMessageState.PENDING.name,
    ),
    Column("attempts_count", BigInteger, nullable=False, default=0),
    Column("deliveries_count", BigInteger, nullable=False, default=0),
    Column(
        "created_at",
        DateTime,
        nullable=False,
        default=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
    ),
    Column("first_attempt_at", DateTime),
    Column(
        "next_attempt_at",
        DateTime,
        nullable=False,
        default=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
        index=True,
    ),
    Column("last_attempt_at", DateTime),
    Column("acquired_at", DateTime),
)


message_archive = Table(
    "message_archive",
    metadata,
    Column("id", BigInteger, primary_key=True),
    Column("queue", String(255), nullable=False, index=True),
    Column("headers", JSON, nullable=True),
    Column("payload", LargeBinary, nullable=False),
    Column("state", Enum(SqlBrokerMessageState), nullable=False, index=True),
    Column("attempts_count", BigInteger, nullable=False),
    Column("deliveries_count", BigInteger, nullable=False),
    Column("created_at", DateTime, nullable=False),
    Column("first_attempt_at", DateTime),
    Column("last_attempt_at", DateTime),
    Column(
        "archived_at",
        DateTime,
        nullable=False,
        default=lambda: datetime.now(timezone.utc).replace(tzinfo=None),
    ),
)

Broker#

1
2
3
4
5
6
from sqlalchemy.ext.asyncio import create_async_engine

from faststream_sqlbroker.sqlbroker import SqlBroker

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlBroker(engine=engine)

Broker parameters#

  • engine — SQLAlchemy AsyncEngine to use for requests to the database.
  • message_table_name — Name of the table containing active messages. Defaults to message.
  • message_archive_table_name — Name of the table containing completed/failed messages. Defaults to message_archive.
  • validate_schema_on_start — If True (default), validates that the configured tables exist and conform to the expected schema.
  • graceful_timeout — Seconds to wait for in-flight messages to finish processing during shutdown.

Publishing#

from datetime import datetime, timedelta, timezone

from sqlalchemy.ext.asyncio import create_async_engine

from faststream import FastStream
from faststream_sqlbroker.sqlbroker import SqlBroker

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlBroker(engine=engine)
app = FastStream(broker)

publisher_sqlbroker = broker.publisher()

@app.after_startup
async def publish_examples():
    await publisher_sqlbroker.publish("Hello, SqlBroker!", queue="my_queue")

The broker's and publisher's (see publishing) .publish() methods accept:

  • message — The message body.
  • queue — The target queue name.
  • headers — Optional dict[str, str] of message headers.
  • next_attempt_at — Optional datetime (with timezone) for delayed delivery.
  • connection — Optional SQLAlchemy AsyncConnection for transactional publishing.

Delayed delivery#

The message won't be fetched until next_attempt_at if it is provided.

1
2
3
4
5
    await publisher_sqlbroker.publish(
        "Process me later",
        queue="my_queue",
        next_attempt_at=datetime.now(timezone.utc) + timedelta(minutes=5),
    )

Transactional publishing#

When connection is provided, the message insert participates in the same database transaction as your other operations, enabling the transactional outbox pattern.

1
2
3
4
5
6
7
    async with engine.begin() as connection:
        # ... your other database operations using `connection` ...
        await publisher_sqlbroker.publish(
            "Transactional message",
            queue="my_queue",
            connection=connection,
        )

Subscribing#

from sqlalchemy.ext.asyncio import create_async_engine

from faststream import AckPolicy, FastStream

from faststream_sqlbroker.sqlbroker import SqlBroker
from faststream_sqlbroker.sqlbroker.retry import ConstantRetryStrategy

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker = SqlBroker(engine=engine)
app = FastStream(broker)


@broker.subscriber(
    queues=["my_queue"],
    max_workers=10,
    retry_strategy=ConstantRetryStrategy(
        delay_seconds=5,
        max_attempts=3,
        max_total_delay_seconds=None,
    ),
    min_fetch_interval=0.1,
    max_fetch_interval=1,
    fetch_batch_size=10,
    overfetch_factor=2,
    flush_interval=1,
    release_stuck_interval=60,
    release_stuck_timeout=60*5,
    max_deliveries=10,
    ack_policy=AckPolicy.NACK_ON_ERROR,
    retain_in_archive_on_ack=True,
    retain_in_archive_on_reject=True,
)
async def handler(msg: str):
    print(msg)

Subscriber parameters#

  • queues — List of queue names to consume from.
  • max_workers — Number of concurrent handler coroutines.
  • retry_strategy — Called to determine if and how soon a Nack'ed message is retried. If None, AckPolicy.NACK_ON_ERROR has the same effect as AckPolicy.REJECT_ON_ERROR.
  • fetch_batch_size — Maximum number of messages to fetch in a single batch. A fetch's actual limit might be lower if the free capacity of the acquired-but-not-yet-processed messages set is smaller.
  • overfetch_factor — Multiplier for fetch_batch_size to size the maximum size of the set of acquired-but-not-yet-processed messages.
  • min_fetch_interval — Minimum interval between consecutive fetches. If the last fetch was full (returned as many messages as the fetch's limit), the next fetch happens after both (i) minimum fetch interval has passed, and (ii) capacity equal to the fetch batch size has freed up in the set of acquired-but-not-yet-processed messages.
  • max_fetch_interval — Maximum interval between consecutive fetches.
  • flush_interval — Interval between flushes of processed message state to the database.
  • release_stuck_interval — Interval between checks for stuck PROCESSING messages.
  • release_stuck_timeout — Interval since acquired_at after which a PROCESSING message is considered stuck and is released back to PENDING.
  • max_deliveries — Maximum number of deliveries allowed for a message. If set, messages that have reached this limit are Reject'ed to FAILED without processing. Note that this might violate the at-least-once processing semantics.
  • ack_policyAckPolicy that controls acknowledgement behavior.
  • retain_in_archive_on_ack — If True (default), COMPLETED (Ack'ed) messages, in addition to being removed from the primary table, are also persisted in the archive table.
  • retain_in_archive_on_reject — If True (default), FAILED (Reject'ed) messages, in addition to being removed from the primary table, are also persisted in the archive table, where they serve as a dead-letter queue.

Delayed retries#

When a message is Nack'ed (either manually or by AckPolicy.NACK_ON_ERROR), the retry_strategy determines if and when the message should be retried. All strategies accept max_attempts and max_total_delay_seconds as common parameters — if either limit is reached, the message is marked as FAILED instead of RETRYABLE. Otherwise, the strategy schedules the message for a retry determined by the returned next_attempt_at.

ConstantRetryStrategy#

Retries after a fixed delay_seconds every time.

1
2
3
4
5
constant = ConstantRetryStrategy(
    delay_seconds=5,
    max_attempts=3,
    max_total_delay_seconds=None,
)

LinearRetryStrategy#

First retry after initial_delay_seconds, then the delay increases by step_seconds with each attempt.

1
2
3
4
5
6
linear = LinearRetryStrategy(
    initial_delay_seconds=1,
    step_seconds=2,
    max_attempts=5,
    max_total_delay_seconds=60,
)

ExponentialBackoffRetryStrategy#

Delay starts at initial_delay_seconds and is multiplied by multiplier on each attempt. max_delay_seconds caps the delay.

1
2
3
4
5
6
7
exponential = ExponentialBackoffRetryStrategy(
    initial_delay_seconds=1,
    multiplier=2.0,
    max_delay_seconds=60,
    max_attempts=8,
    max_total_delay_seconds=300,
)

ExponentialBackoffWithJitterRetryStrategy#

Same as exponential backoff, but adds random jitter (up to delay * jitter_factor) to spread out retries and avoid thundering herds.

1
2
3
4
5
6
7
8
exponential_jitter = ExponentialBackoffWithJitterRetryStrategy(
    initial_delay_seconds=1,
    multiplier=2.0,
    max_delay_seconds=60,
    jitter_factor=0.5,
    max_attempts=8,
    max_total_delay_seconds=300,
)

ConstantWithJitterRetryStrategy#

Retries after base_delay_seconds plus random jitter in the range [-jitter_seconds, +jitter_seconds].

1
2
3
4
5
6
constant_jitter = ConstantWithJitterRetryStrategy(
    base_delay_seconds=5,
    jitter_seconds=2,
    max_attempts=3,
    max_total_delay_seconds=None,
)

NoRetryStrategy#

No retries — the message is marked as FAILED on the first Nack.

no_retry = NoRetryStrategy()

Transactional outbox#

Implementing the transactional outbox pattern becomes as simple as the following.

Publish messages transactionally with your other database operations.

from sqlalchemy.ext.asyncio import create_async_engine

from faststream import AckPolicy, FastStream
from faststream.kafka import KafkaBroker

from faststream_sqlbroker.sqlbroker import SqlBroker
from faststream_sqlbroker.sqlbroker.retry import ExponentialBackoffRetryStrategy

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
broker_sqlbroker = SqlBroker(engine=engine)
broker_kafka = KafkaBroker("127.0.0.1:9092")
app = FastStream(broker_sqlbroker, on_startup=[broker_kafka.connect])
publisher_sqlbroker = broker_sqlbroker.publisher()


@app.after_startup # just an example
async def publish_examples():
    async with engine.begin() as connection:
        # ... your other database operations using `connection` ...
        await publisher_sqlbroker.publish(
            {"message": "Hello, SqlBroker!"},
            queue="sqlbroker_queue",
            connection=connection,
        )

And relay the messages from the database to another broker.

publisher_kafka = broker_kafka.publisher("kafka_topic")


@publisher_kafka
@broker_sqlbroker.subscriber(
    queues=["sqlbroker_queue"],
    max_workers=10,
    retry_strategy=ExponentialBackoffRetryStrategy(
        initial_delay_seconds=1,
        multiplier=2,
        max_delay_seconds=60 * 5,
        max_total_delay_seconds=60 * 60 * 6,
        max_attempts=None,
    ),
    max_fetch_interval=1,
    min_fetch_interval=0,
    fetch_batch_size=10,
    overfetch_factor=1.5,
    flush_interval=3,
    release_stuck_interval=5,
    release_stuck_timeout=60 * 60,
    max_deliveries=20,
    ack_policy=AckPolicy.NACK_ON_ERROR,
)
async def handle_msg(msg_body: dict) -> dict:
    return msg_body