Implementing CDC from AWS RDS PostgreSQL to another Database using Kafka Connect

Apollo Software Labs
7 min readMar 24, 2022

Kafka Connect can be used to move data from one database to another using Debezium Source Connector to move the data from source database to a Kafka Topic and then JDBC Sink Connector to move the data from the Kafka Topic to the destination database.

Motivation

When you are using microservices architecture, as a microservices developer, you are commonly faced with executing a database transaction and also communicating the success of that transaction to another system for downstream processing.
There are a couple of mechanisms to ensure that the downstream processing happens reliably.

Option 1 — Outbox Pattern. As part of the same database transaction, a entry is made into an outbox table and subsequently a REST api or Asynchronous message is relayed.

Option 2 — The database changes resulting from the transaction are propagated using Change Data Capture(CDC).

There are a few DMS tools that can move data from point to point reliably. But often, there is a need to notify multiple systems. Kafka Connect is a mechanism that enables moving data to and from Kafka.

In this article, we will look into Option 2 mentioned above — Moving data from PostgreSQL into a Kafka Topic using Kafka Connect Debezium Connector so that the data is available for multiple consumers, and then sinking that data from the Kafka Topic to the destination database.

Change Data Capture (CDC)

Let us first try to understand how data changes occur within PostgreSQL database server and how these changes are replicated to a Kafka Topic using the Debezium Kafka Connector.

PostgreSQL captures all changes across all databases in WAL(Write Ahead Logging) files. Data file changes happen only after they are captured in the WAL files. This allows PostgreSQL to effectively execute lot of transactions sequentially without having to update every data file modified by each transaction.

To see current LSN(Last Sequence Number). Note: This tracks latest LSN over the entire database server, i.e. change across all of the databases on the server.
select pg_current_wal_lsn();

For the tables that you want to track any changes, you create a Publication. This is captured in the tables below.
select * from pg_catalog.pg_publication;
select * from pg_catalog.pg_publication_tables;

Replication Slot tracks the changes for the client(Kafka Connect Debezium Connector). It is a PostgreSQL feature used by the Database Server to retain WAL logs that are needed by a Client even when the Client may be disconnected.

select * from pg_catalog.pg_replication_slots;

When a Debezium Source Connector Client is active, PostgreSQL Database server will create a system backend WAL Sender for each client. The WAL Sender process runs only when a client is connected and exits when client disconnects. The WAL Sender tracks the current LSN and the LSNs processed by the client(tracked in pg_catalog.pg_replication_slot) and sends the WAL record that the client needs. When a client is connected, this is reflected in the pg_catalog.pg_replication_slots active indicator column.

Postgres Db Configuration

By enabling AWS RDS parameter rds.logical_replication, you enable capturing of any changes made to the PostgreSQL WAL(Write Ahead Log) to the Replication Slots consumed by the Kafka connectors.

SHOW wal_level;
should return logical. If it returns replica, then CDC is not enabled on the database server.

Create Source Database

CREATE DATABASE sample_service
WITH
OWNER = postgres
ENCODING = ‘UTF8’
CONNECTION LIMIT = -1;

Create Schema and other database objects after connecting to above source database.

CREATE SCHEMA sample_schema AUTHORIZATION postgres;

CREATE role debezium_user WITH PASSWORD ‘xxx’ login;

GRANT rds_replication TO debezium_user;

GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA sample_schema TO debezium_user;

If above role and privilege grants are not sufficient, and you are still getting permission errors in Kafka Connect for debezium_user, you may need to:
GRANT rds_superuser TO debezium_user;
I would not recommend doing this in any higher environments, and urge you to find the required privileges and roles and grant them explicitly.

— Source table that we want to replicate to another database
CREATE TABLE sample_schema.employeeaccount (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
password VARCHAR ( 50 ) NOT NULL,
email VARCHAR ( 255 ) UNIQUE NOT NULL,
created_on TIMESTAMP NOT NULL,
last_login TIMESTAMP
);

— Create Publication
CREATE PUBLICATION sample_debezium_pub FOR TABLE sample_schema.employeeaccount;

Note: Publications are created in the source database. i.e. They are not a database server level object like Replication Slot.

— Create replication slot on the database server
SELECT pg_create_logical_replication_slot(‘sample_debezium_slot’, ‘pgoutput’);

The Publication and Replication Slot objects may be created manually in the source database beforehand and specified in the Kafka Connector Source Task Configuration OR the Task can create these database objects automatically if they do not already exist and the Task has required privileges to create them.

Dockerfile Configuration

See article Run Kafka Connect on Kubernetes for details on standing up Kafka Connect Worker on Kubernetes.

Configuration properties set on the Dockerfile for the Kafka Connect Worker apply to all the connectors. But this can be overridden when submitting new connector tasks.

internal.* properties refer to how Kafka Connect works with its own Topics to manage config, offset, status. These properties are deprecated and should not be specified.

Kafka Connect Source and Sink Connector Tasks

Debezium Source Connector Task Configuration
JDBC Sink Connector Task Configuration

Note: JDBC Sink Connector requires a schema to be present for the data, whether you use a Schema Registry or embed schema along with every message.

Monitoring Database

Replication Slots can get too large and you can run out of disk space very quickly.

— List replication slots and their size
select slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) as replicationSlotLag,
active
from pg_replication_slots;

select * from pg_replication_slots;

There are several reasons a Replication Slot can get too big. Most common ones are noted below.

Connector Client is inactive
This could be due to an error in the connector task. Check the task logs for root cause. In this scenario, The current LSN on the database server will keep advancing the Database Server cannot release the WAL files. Hence, the Replication Slot Lag will keep growing.

The remedy is to either the fix/restart the Connector Client and have it catch up OR Drop the Replication Slot if the changes are no longer of interest.

— Drop replication slot
select pg_drop_replication_slot(‘sample_debezium_slot’);

Connector Client is active but changes are happening in another database
If you are only interested in changes in a few tables in Database A on the server, and most of the changes on the Database server are happening on Database B, then again the current LSN will continue advancing, but your Connector Client will not be sent any of these changes since they are not part of your subscription/publication. Hence, PostgreSQL Database Server will see that your active client is behind, and will not release the WAL files. This can be seen by increasing Replication Slot Size although there are no new changes to the tables you are interested in.
This issue is solved using a heartbeat table.

HeartBeat

— Create HeartBeat Table
create table if not exists public.debezium_heartbeat(id serial primary key, ts timestamp);

The Source Connector Task is then configured to fire the heartbeat DML every few seconds. Now, Also, make sure you are watching the heartbeat table. i.e. It is included in your publication. With these changes now happening in your database, WAL Sender will stream the changes up to this(heartbeat update) LSN to your Connector Client. The Replication Slot associated with your Connector Client will never be too far behind and the WAL files can be released by the Database Server.

Please ensure CloudWatch alerting is setup on the RDS Disk Space Utilization. In Non-Prod, you can also limit the max replication slot size(Note: you will lose changes made to the database if max is reached). The best practice is to never let this happen, by monitoring and ensuring you stop, clean up unused connector tasks, subscriptions, replication slots.

Monitoring Kafka Connect

# List Connector Tasks
curl -sS http://localhost:8083/connectors

# Check connector task status
curl -sS http://localhost:8083/connectors/sample-source-connector/status

# Stop connector jobs
curl -i -X DELETE http://localhost:8083/connectors/sample-source-connector

In rare situations, If you need to purge or update Kafka Connect config, offset and status topics, Purging can be done by either setting retention.ms = 1000 and then setting it back to default. The other option is to delete and re-create these topics.

# To purge a topic. Set Retention to 1sec.
${KAFKA_HOME}/bin/kafka-configs.sh \
— bootstrap-server ${BootstrapBrokerString} \
— entity-type topics \
— alter — entity-name Dev.ConnectWorkerGrp.config \
— add-config retention.ms=1000

# To reset retention back to default on the topic
${KAFKA_HOME}/bin/kafka-configs.sh \
— bootstrap-server ${BootstrapBrokerString} \
— entity-type topics \
— alter — entity-name Dev.ConnectWorkerGrp.config \
— delete-config retention.ms

Kafka Connect Concepts

Serialization / DeSerialization

This is done using Converters. Use AVRO for serializing data and storing in Kafka. Since it is a binary format and supports schemas, you will gain from compact message size. You can still use JSON when deserializing the data from Kafka.

Predicates, Transforms and Filtering

SMT(Single Message Transformation) can apply to source and sink connectors as needed. Transformations can be done selectively based on a Predicate(since Kafka 2.6). One of the transformation can be to skip/filter the record based on the pre-condition/predicate.

You can also chain multiple transformations.

io.debezium.transforms.ExtractNewRecordState
This SMT is used to flatten Debezium Change event to Kafka Event that is easier to understand.

org.apache.kafka.connect.transforms.RegexRouter
Another common SMT to perform regular expression transformations.

Tombstone Records

Messages with value null are called tombstone records. For example, This could be created by a source connector to indicate a record was deleted.

References

https://wolfman.dev/posts/pg-logical-heartbeats/

--

--

Apollo Software Labs

Hands-on Solution Architect passionate about building secure, scalable, and high performance solutions in the cloud.