Change Data Capture
TigerBeetle can stream changes (transfers and balance updates) to message queues using the AMQP 0.9.1 protocol, which is compatible with RabbitMQ and various other message brokers.
See Installing for instructions on how to deploy the TigerBeetle binary.
Here’s how to start the CDC job:
./tigerbeetle amqp --addresses=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 --cluster=0 \
--host=127.0.0.1 \
--vhost=/ \
--user=guest --password=guest \
--publish-exchange=tigerbeetle
Here what the arguments mean:
--addressesspecify IP addresses of all the replicas in the cluster. The order of addresses must correspond to the order of replicas.--clusterspecifies a globally unique 128 bit cluster ID.--hostthe AMQP host address in the formatip:port.
Both IPv4 and IPv6 addresses are supported. Ifportis omitted, the AMQP default5672is used.
Multiple addresses (for clustered environments) and DNS names are not supported.
The operator must resolve the IP address of the preferred/reachable server.
The CDC job will exit with a non-zero code in case of any connectivity or configuration issue with the AMQP server.--vhostthe AMQP virtual host name.--userthe AMQP username.--passwordthe AMQP password.
Only PLAIN authentication is supported.--publish-exchangethe exchange name.
Must be a pre-existing exchange provided by the operator.
Optional. May be omitted if--publish-routing-keyis present.--publish-routing-keythe routing key used in combination with the exchange.
Optional. May be omitted ifpublish-exchangeis present.--event-count-maxthe maximum number of events fetched from TigerBeetle and published to the AMQP server per batch.
Optional. Defaults to2730if omitted.--idle-interval-msthe time interval in milliseconds to wait before querying again when the last query returned no events.
Optional. Defaults to1000ms if omitted.--requests-per-second-limitthrottles the maximum number of requests per second made to TigerBeetle.
Must be greater than zero.
Optional. No limit if omitted.--timestamp-lastoverrides the last published timestamp, resuming from this point.
This is a TigerBeetle timestamp with nanosecond precision.
Optional. If omitted, the last acknowledged timestamp is used.
Message content:
Messages are published with custom headers, allowing users to implement routing and filtering rules.
Message headers:
| Key | AMQP data type | Description |
|---|---|---|
event_type |
string |
The event type. |
ledger |
long_long_int |
The ledger of the transfer and accounts. |
transfer_code |
long_int |
The transfer code. |
debit_account_code |
long_int |
The debit account code. |
credit_account_code |
long_int |
The credit account code. |
app_id |
string |
Constant tigerbeetle. |
content_type |
string |
Constant application/json |
delivery_mode |
short_short_uint |
Constant 2 which means persistent. |
timestamp |
timestamp |
The event timestamp.¹ |
¹ AMQP timestamps are represented in seconds, so TigerBeetle timestamps are truncated.
Use thetimestampfield in the message body for full nanosecond precision.
Message body:
Each event published contains information about the transfer and the accounts involved.
type: The type of event.
One ofsingle_phase,two_phase_pending,two_phase_posted,two_phase_voidedortwo_phase_expired.
See the Two-Phase Transfers for more details.timestamp: The event timestamp.
Usually, it’s the same as the transfer’s timestamp, except whenevent_type == 'two_phase_expired'when it’s the expiry timestamp.ledger: The ledger code.transfer: Full details of the transfer.
Fortwo_phase_expiredevents, it’s the pending transfer that was reverted.debit_account: Full details of the debit account, with the balance as of the time of the event.credit_account: Full details of the credit account, with the balance as of the time of the event.
The message body is encoded as a UTF-8 JSON without line breaks or
spaces. Long integers such as u128 and u64 are
encoded as JSON strings to improve interoperability.
Here is a formatted example (with indentation and line breaks) for readability.
{
"timestamp": "1745328372758695656",
"type": "single_phase",
"ledger": 2,
"transfer": {
"id": 9082709,
"amount": 3794,
"pending_id": 0,
"user_data_128": "79248595801719937611592367840129079151",
"user_data_64": "13615171707598273871",
"user_data_32": 3229992513,
"timeout": 0,
"code": 20295,
"flags": 0,
"timestamp": "1745328372758695656"
},
"debit_account": {
"id": 3750,
"debits_pending": 0,
"debits_posted": 8463768,
"credits_pending": 0,
"credits_posted": 8861179,
"user_data_128": "118966247877720884212341541320399553321",
"user_data_64": "526432537153007844",
"user_data_32": 4157247332,
"code": 1,
"flags": 0,
"timestamp": "1745328270103398016"
},
"credit_account": {
"id": 6765,
"debits_pending": 0,
"debits_posted": 8669204,
"credits_pending": 0,
"credits_posted": 8637251,
"user_data_128": "43670023860556310170878798978091998141",
"user_data_64": "12485093662256535374",
"user_data_32": 1924162092,
"code": 1,
"flags": 0,
"timestamp": "1745328270103401031"
}
}Guarantees
TigerBeetle guarantees at-least-once semantics when publishing to message brokers, and makes a best effort to prevent duplicate messages. However, during crash recovery, the CDC job may replay unacknowledged messages that could have been already delivered to consumers.
It is the consumer’s responsibility to perform idempotency checks when processing messages.
Upgrading
The CDC job requires TigerBeetle cluster version 0.16.43
or greater.
The same upgrade planning recommended for clients applies to the CDC job. The CDC job version must not be newer than the cluster version, as it will fail with an error message if so.
Any transactions originally created by TigerBeetle versions
before 0.16.29 have the following limitations for CDC
processing:
- Events of type
two_phase_expiredare not supported. - Only transfers where both the debit and credit accounts have the
flags.historyenabled are visible to CDC.
Transactions committed after version 0.16.29 are fully
compatible with CDC and do not require the history
flag.
CDC to RabbitMQ (AMQP 0.9.1) in production
High Availability
The CDC job is single instance. Starting a second
tigerbeetle amqp with the same cluster_id will
exit with a non-zero exit code. For high availability, the CDC job could
be monitored for crashes and restarted in case a failure.
The CDC job itself is stateless, and will resume from the last event acknowledged by RabbitMQ, however it may replay events that weren’t acknowledged but received by the exchange.
TLS Support
For secure AMQPS connections, we recommend using a TLS
Tunnel to wrap the connection between TigerBeetle and RabbitMQ.
Event Replay
By default, when the CDC job starts, it resumes from the timestamp of
the last acknowledged event in RabbitMQ. This can be overridden to using
--timestamp-last. For example,
--timestamp-last=0 will replay all events.