1.2. Extraction from Databases
This article explains how to extract data from various databases into a centralized data lake using Debizium and Apache Kafka. We will cover the architecture, setup, and best practices for implementing a robust data extraction pipeline.
Overview
Many organizations rely on databases for their operational data, but extracting that data for analytics can be challenging. Debizium is an open-source tool that simplifies change data capture (CDC) from databases, allowing you to stream changes in real-time to a data lake or other downstream systems. This article focuses on using Debizium with Apache Kafka to extract data from multiple databases and load it into a centralized data lake.
Architecture
The architecture for extracting data from databases with Debizium typically involves the following components:
Debizium Connectors: These connectors are responsible for capturing changes from the source databases (e.g., MySQL, PostgreSQL) and publishing them to Kafka topics.
Apache Kafka: Kafka serves as the central messaging system, allowing you to decouple the data extraction process from the data loading process.
Kafka Connect: This component is used to move data between Kafka and other systems, such as your data lake (e.g., AWS S3, Google Cloud Storage).
Data Lake: The final destination for the extracted data, where it can be stored, processed, and analyzed.
Setup (Debezium + PostgreSQL on AWS)
This section shows a practical setup for CDC from PostgreSQL (Amazon RDS or self-managed on EC2) into Kafka via Debezium, and then to S3 via a sink.
Prerequisites
A running Kafka cluster and Kafka Connect (can be on EC2, EKS, or a managed service). Ensure network access from Connect to Postgres and S3.
PostgreSQL with logical replication enabled (RDS parameter group or postgresql.conf):
wal_level = logical max_replication_slots = 10 # adjust as needed max_wal_senders = 10 # adjust as needed
A replication user in Postgres with required privileges and REPLICATION role.
Security groups/NACLs allowing Connect to reach Postgres (port 5432) and S3 egress.
RDS specifics
Modify the DB parameter group to set
wal_level=logical
and associate with your RDS instance; apply and reboot if required.Ensure
rds.logical_replication
is enabled for certain engines/versions if applicable (check AWS docs).Grant appropriate permissions to your Debezium user and create a replication slot name that Debezium will use.
Debezium Postgres connector (generic config)
1{
2 "name": "postgres-source-generic",
3 "config": {
4 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
5 "tasks.max": "1",
6
7 "database.hostname": "<RDS_OR_SELF_HOSTED_PG_HOST>",
8 "database.port": "5432",
9 "database.user": "<REPLICATION_USER>",
10 "database.password": "<REPLICATION_PASSWORD>",
11 "database.dbname": "<DATABASE_NAME>",
12
13 "database.server.name": "pg",
14 "schema.include.list": "<OPTIONAL_SCHEMA_INCLUDE_LIST>",
15 "table.include.list": "<OPTIONAL_TABLE_INCLUDE_LIST>",
16
17 "plugin.name": "pgoutput",
18 "slot.name": "debezium_slot_generic",
19
20 "tombstones.on.delete": "false",
21 "include.schema.changes": "true",
22
23 "decimal.handling.mode": "string",
24 "time.precision.mode": "adaptive_time_microseconds",
25
26 "snapshot.mode": "initial",
27 "snapshot.fetch.size": "10240",
28
29 "topic.creation.enable": "true",
30 "topic.prefix": "<TOPIC_PREFIX>",
31
32 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
33 "key.converter.schemas.enable": "true",
34 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
35 "value.converter.schemas.enable": "true"
36 }
37}
Posting the connector to Kafka Connect
1#!/usr/bin/env bash
2set -euo pipefail
3
4# Usage: ./create_connector.sh http://<connect-host>:8083 connector_postgres_generic.json
5
6CONNECT_URL=${1:-http://localhost:8083}
7CONFIG_FILE=${2:-connector_postgres_generic.json}
8
9curl -s -X POST \
10 -H "Content-Type: application/json" \
11 --data @"${CONFIG_FILE}" \
12 "${CONNECT_URL}/connectors" | jq .
S3 sink connector (generic config)
1{
2 "name": "s3-sink-generic",
3 "config": {
4 "connector.class": "io.confluent.connect.s3.S3SinkConnector",
5 "tasks.max": "2",
6
7 "topics.regex": "<TOPIC_PREFIX>\\..*",
8
9 "s3.bucket.name": "<S3_BUCKET>",
10 "s3.region": "<AWS_REGION>",
11 "s3.part.size": "5242880",
12 "flush.size": "1000",
13 "rotate.interval.ms": "600000",
14
15 "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
16 "storage.class": "io.confluent.connect.s3.storage.S3Storage",
17 "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
18 "path.format": "year=YYYY/month=MM/day=dd",
19 "locale": "en_US",
20 "timezone": "UTC",
21 "partition.duration.ms": "86400000",
22 "timestamp.extractor": "Record",
23
24 "schema.compatibility": "NONE",
25
26 "aws.access.key.id": "${file:/opt/kafka/external-credentials/aws.properties:AWS_ACCESS_KEY_ID}",
27 "aws.secret.access.key": "${file:/opt/kafka/external-credentials/aws.properties:AWS_SECRET_ACCESS_KEY}"
28 }
29}
Monitoring and operations
Use Kafka Connect REST API to check connector/task status, offsets, and errors.
Observe Postgres replication slots (
pg_replication_slots
) and lag; alert if WAL grows.Scale
tasks.max
and tune flush/rotate parameters in sink based on throughput and latency requirements.
Limitations and pitfalls
Initial snapshot: Large tables can take time; consider off-hours or
snapshot.mode=never
if you pre-seed data.Replication lag: Heavy write load or network issues increase lag; monitor WAL size to avoid storage pressure.
DDL changes: Debezium emits schema change events; ensure downstream can adapt or route to a schema registry.
Permissions: RDS Postgres requires proper roles for logical replication; ensure slot names are unique per connector.
Topic naming: Use a topic prefix and include lists to control scope; avoid accidentally capturing entire databases.
S3 sink dependencies: Confluent S3 sink requires appropriate licensing/dependencies; alternatively, consider Kafka Connect S3 sink alternatives (open-source) or write a custom consumer.
Best Practices
Schema Management: Keep track of schema changes in your source databases and update your Kafka topics accordingly. Debizium can help with this by capturing schema changes as part of the change events.
Data Quality: Implement data validation and cleansing processes in your data lake to ensure the extracted data is accurate and consistent.
Performance Tuning: Monitor the performance of your Debizium connectors and Kafka cluster, and make adjustments as needed to optimize throughput and reduce latency.
Conclusion
Debizium provides a powerful solution for extracting data from databases and streaming it to a data lake. By leveraging Kafka and a well-designed architecture, you can build a robust data extraction pipeline that meets your organization’s analytics needs.