Change Data Capture
TigerBeetle can stream changes (transfers and balance updates) to message queues using the AMQP 0.9.1 protocol, which is compatible with RabbitMQ and various other message brokers.
See Installing for instructions on how to deploy the TigerBeetle binary.
Here’s how to start the CDC job:
./tigerbeetle amqp --addresses=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 --cluster=0 \
--host=127.0.0.1 \
--vhost=/ \
--user=guest --password=guest \
--publish-exchange=tigerbeetle
Here what the arguments mean:
--addresses
specify IP addresses of all the replicas in the cluster. The order of addresses must correspond to the order of replicas.--cluster
specifies a globally unique 128 bit cluster ID.--host
the AMQP host address in the formatip:port
.
Both IPv4 and IPv6 addresses are supported. Ifport
is omitted, the AMQP default5672
is used.
Multiple addresses (for clustered environments) and DNS names are not supported.
The operator must resolve the IP address of the preferred/reachable server.
The CDC job will exit with a non-zero code in case of any connectivity or configuration issue with the AMQP server.--vhost
the AMQP virtual host name.--user
the AMQP username.--password
the AMQP password.
Only PLAIN authentication is supported.--publish-exchange
the exchange name.
Must be a pre-existing exchange provided by the operator.
Optional. May be omitted if--publish-routing-key
is present.--publish-routing-key
the routing key used in combination with the exchange.
Optional. May be omitted ifpublish-exchange
is present.--event-count-max
the maximum number of events fetched from TigerBeetle and published to the AMQP server per batch.
Optional. Defaults to2730
if omitted.--idle-interval-ms
the time interval in milliseconds to wait before querying again when the last query returned no events.
Optional. Defaults to1000
ms if omitted.--timestamp-last
overrides the last published timestamp, resuming from this point.
This is a TigerBeetle timestamp with nanosecond precision.
Optional. If omitted, the last acknowledged timestamp is used.
Message content:
Messages are published with custom headers, allowing users to implement routing and filtering rules.
Message headers:
Key | AMQP data type | Description |
---|---|---|
event_type |
string |
The event type. |
ledger |
long_uint |
The ledger of the transfer and accounts. |
transfer_code |
short_uint |
The transfer code. |
debit_account_code |
short_uint |
The debit account code. |
credit_account_code |
short_uint |
The credit account code. |
app_id |
string |
Constant tigerbeetle . |
content_type |
string |
Constant application/json |
delivery_mode |
short_short_uint |
Constant 2 which means persistent. |
timestamp |
timestamp |
The event timestamp.¹ |
¹ AMQP timestamps are represented in seconds, so TigerBeetle timestamps are truncated.
Use thetimestamp
field in the message body for full nanosecond precision.
Message body:
Each event published contains information about the transfer and the accounts involved.
type
: The type of event.
One ofsingle_phase
,two_phase_pending
,two_phase_posted
,two_phase_voided
ortwo_phase_expired
.
See the Two-Phase Transfers for more details.timestamp
: The event timestamp.
Usually, it’s the same as the transfer’s timestamp, except whenevent_type == 'two_phase_expired'
when it’s the expiry timestamp.ledger
: The ledger code.transfer
: Full details of the transfer.
Fortwo_phase_expired
events, it’s the pending transfer that was reverted.debit_account
: Full details of the debit account, with the balance as of the time of the event.credit_account
: Full details of the credit account, with the balance as of the time of the event.
The message body is encoded as a UTF-8 JSON without line breaks or
spaces. Long integers such as u128
and u64
are
encoded as JSON strings to improve interoperability.
Here is a formatted example (with indentation and line breaks) for readability.
{
"timestamp": "1745328372758695656",
"type": "single_phase",
"ledger": 2,
"transfer": {
"id": 9082709,
"amount": 3794,
"pending_id": 0,
"user_data_128": "79248595801719937611592367840129079151",
"user_data_64": "13615171707598273871",
"user_data_32": 3229992513,
"timeout": 0,
"code": 20295,
"flags": 0,
"timestamp": "1745328372758695656"
},
"debit_account": {
"id": 3750,
"debits_pending": 0,
"debits_posted": 8463768,
"credits_pending": 0,
"credits_posted": 8861179,
"user_data_128": "118966247877720884212341541320399553321",
"user_data_64": "526432537153007844",
"user_data_32": 4157247332,
"code": 1,
"flags": 0,
"timestamp": "1745328270103398016"
},
"credit_account": {
"id": 6765,
"debits_pending": 0,
"debits_posted": 8669204,
"credits_pending": 0,
"credits_posted": 8637251,
"user_data_128": "43670023860556310170878798978091998141",
"user_data_64": "12485093662256535374",
"user_data_32": 1924162092,
"code": 1,
"flags": 0,
"timestamp": "1745328270103401031"
}
}
Guarantees
TigerBeetle guarantees at-least-once semantics when publishing to message brokers, and makes a best effort to prevent duplicate messages. However, during crash recovery, the CDC job may replay unacknowledged messages that could have been already delivered to consumers.
It is the consumer’s responsibility to perform idempotency checks when processing messages.
Edit this page