1.2. Extraction from Databases

This article explains how to extract data from various databases into a centralized data lake using Debizium and Apache Kafka. We will cover the architecture, setup, and best practices for implementing a robust data extraction pipeline.

Overview

Many organizations rely on databases for their operational data, but extracting that data for analytics can be challenging. Debizium is an open-source tool that simplifies change data capture (CDC) from databases, allowing you to stream changes in real-time to a data lake or other downstream systems. This article focuses on using Debizium with Apache Kafka to extract data from multiple databases and load it into a centralized data lake.

Architecture

The architecture for extracting data from databases with Debizium typically involves the following components:

  1. Debizium Connectors: These connectors are responsible for capturing changes from the source databases (e.g., MySQL, PostgreSQL) and publishing them to Kafka topics.

  2. Apache Kafka: Kafka serves as the central messaging system, allowing you to decouple the data extraction process from the data loading process.

  3. Kafka Connect: This component is used to move data between Kafka and other systems, such as your data lake (e.g., AWS S3, Google Cloud Storage).

  4. Data Lake: The final destination for the extracted data, where it can be stored, processed, and analyzed.

Setup (Debezium + PostgreSQL on AWS)

This section shows a practical setup for CDC from PostgreSQL (Amazon RDS or self-managed on EC2) into Kafka via Debezium, and then to S3 via a sink.

Prerequisites

  • A running Kafka cluster and Kafka Connect (can be on EC2, EKS, or a managed service). Ensure network access from Connect to Postgres and S3.

  • PostgreSQL with logical replication enabled (RDS parameter group or postgresql.conf):

    wal_level = logical
    max_replication_slots = 10     # adjust as needed
    max_wal_senders = 10           # adjust as needed
    
  • A replication user in Postgres with required privileges and REPLICATION role.

  • Security groups/NACLs allowing Connect to reach Postgres (port 5432) and S3 egress.

RDS specifics

  • Modify the DB parameter group to set wal_level=logical and associate with your RDS instance; apply and reboot if required.

  • Ensure rds.logical_replication is enabled for certain engines/versions if applicable (check AWS docs).

  • Grant appropriate permissions to your Debezium user and create a replication slot name that Debezium will use.

Debezium Postgres connector (generic config)

 1{
 2  "name": "postgres-source-generic",
 3  "config": {
 4    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
 5    "tasks.max": "1",
 6
 7    "database.hostname": "<RDS_OR_SELF_HOSTED_PG_HOST>",
 8    "database.port": "5432",
 9    "database.user": "<REPLICATION_USER>",
10    "database.password": "<REPLICATION_PASSWORD>",
11    "database.dbname": "<DATABASE_NAME>",
12
13    "database.server.name": "pg",  
14    "schema.include.list": "<OPTIONAL_SCHEMA_INCLUDE_LIST>",
15    "table.include.list": "<OPTIONAL_TABLE_INCLUDE_LIST>",
16
17    "plugin.name": "pgoutput",
18    "slot.name": "debezium_slot_generic",
19
20    "tombstones.on.delete": "false",
21    "include.schema.changes": "true",
22
23    "decimal.handling.mode": "string",
24    "time.precision.mode": "adaptive_time_microseconds",
25
26    "snapshot.mode": "initial",
27    "snapshot.fetch.size": "10240",
28
29    "topic.creation.enable": "true",
30    "topic.prefix": "<TOPIC_PREFIX>",
31
32    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
33    "key.converter.schemas.enable": "true",
34    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
35    "value.converter.schemas.enable": "true"
36  }
37}

Posting the connector to Kafka Connect

 1#!/usr/bin/env bash
 2set -euo pipefail
 3
 4# Usage: ./create_connector.sh http://<connect-host>:8083 connector_postgres_generic.json
 5
 6CONNECT_URL=${1:-http://localhost:8083}
 7CONFIG_FILE=${2:-connector_postgres_generic.json}
 8
 9curl -s -X POST \
10  -H "Content-Type: application/json" \
11  --data @"${CONFIG_FILE}" \
12  "${CONNECT_URL}/connectors" | jq .

S3 sink connector (generic config)

 1{
 2  "name": "s3-sink-generic",
 3  "config": {
 4    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
 5    "tasks.max": "2",
 6
 7  "topics.regex": "<TOPIC_PREFIX>\\..*",  
 8
 9    "s3.bucket.name": "<S3_BUCKET>",
10    "s3.region": "<AWS_REGION>",
11    "s3.part.size": "5242880",
12    "flush.size": "1000",
13    "rotate.interval.ms": "600000",
14
15    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
16    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
17    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
18    "path.format": "year=YYYY/month=MM/day=dd",
19    "locale": "en_US",
20    "timezone": "UTC",
21    "partition.duration.ms": "86400000",
22    "timestamp.extractor": "Record",
23
24    "schema.compatibility": "NONE",
25
26    "aws.access.key.id": "${file:/opt/kafka/external-credentials/aws.properties:AWS_ACCESS_KEY_ID}",
27    "aws.secret.access.key": "${file:/opt/kafka/external-credentials/aws.properties:AWS_SECRET_ACCESS_KEY}"
28  }
29}

Monitoring and operations

  • Use Kafka Connect REST API to check connector/task status, offsets, and errors.

  • Observe Postgres replication slots (pg_replication_slots) and lag; alert if WAL grows.

  • Scale tasks.max and tune flush/rotate parameters in sink based on throughput and latency requirements.

Limitations and pitfalls

  • Initial snapshot: Large tables can take time; consider off-hours or snapshot.mode=never if you pre-seed data.

  • Replication lag: Heavy write load or network issues increase lag; monitor WAL size to avoid storage pressure.

  • DDL changes: Debezium emits schema change events; ensure downstream can adapt or route to a schema registry.

  • Permissions: RDS Postgres requires proper roles for logical replication; ensure slot names are unique per connector.

  • Topic naming: Use a topic prefix and include lists to control scope; avoid accidentally capturing entire databases.

  • S3 sink dependencies: Confluent S3 sink requires appropriate licensing/dependencies; alternatively, consider Kafka Connect S3 sink alternatives (open-source) or write a custom consumer.

Best Practices

  • Schema Management: Keep track of schema changes in your source databases and update your Kafka topics accordingly. Debizium can help with this by capturing schema changes as part of the change events.

  • Data Quality: Implement data validation and cleansing processes in your data lake to ensure the extracted data is accurate and consistent.

  • Performance Tuning: Monitor the performance of your Debizium connectors and Kafka cluster, and make adjustments as needed to optimize throughput and reduce latency.

Conclusion

Debizium provides a powerful solution for extracting data from databases and streaming it to a data lake. By leveraging Kafka and a well-designed architecture, you can build a robust data extraction pipeline that meets your organization’s analytics needs.