Building a Real-Time Crypto Data Pipeline with Debezium CDC


I built a crypto data pipeline that streams live Binance prices through PostgreSQL → Debezium → Kafka → Cassandra with sub-second latency. Here’s how it works and the gotchas I hit.



Architecture

┌─────────────┐
│ Binance API │
└──────┬──────┘
       │ REST (60s)
       ▼
┌──────────────┐
│ Python App   │
└──────┬───────┘
       │ INSERT
       ▼
┌──────────────┐      ┌───────────┐
│ PostgreSQL   │─────▶│ Debezium  │
│ (WAL)        │      └─────┬─────┘
└──────────────┘            │ CDC
                            ▼
                      ┌───────────┐
                      │   Kafka   │
                      └─────┬─────┘
                            │
                            ▼
                   ┌─────────────────┐
                   │  CDC Consumer   │
                   └────────┬────────┘
                            │
                            ▼
                   ┌─────────────────┐
                   │   Cassandra     │
                   └────────┬────────┘
                            │
                            ▼
                   ┌─────────────────┐
                   │    Grafana      │
                   └─────────────────┘
Enter fullscreen mode

Exit fullscreen mode



Stack

Python • PostgreSQL • Debezium • Kafka • Cassandra • Grafana • Docker



Key Components



1. Binance API Client

params = {"symbols": json.dumps(symbols, separators=(',', ':'))} if symbols else {}
Enter fullscreen mode

Exit fullscreen mode

Gotcha #1: Binance rejects JSON with spaces. Use separators=(',', ':') or you’ll get 400 errors.



2. PostgreSQL Setup

Enable logical replication – this is critical:

  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: crypto_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_HOST_AUTH_METHOD: trust
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

zookeeper:
  image: confluentinc/cp-zookeeper:7.5.0
  environment:
    ZOOKEEPER_CLIENT_PORT: 2181

kafka:
  image: confluentinc/cp-kafka:7.5.0
  depends_on:
    - zookeeper
  environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

debezium:
  image: debezium/connect:2.4
  depends_on:
    - kafka
    - postgres
  ports:
    - "8083:8083"
  environment:
    BOOTSTRAP_SERVERS: kafka:29092  # Use internal listener!
    GROUP_ID: 1
    CONFIG_STORAGE_TOPIC: debezium_configs
    OFFSET_STORAGE_TOPIC: debezium_offsets
Enter fullscreen mode

Exit fullscreen mode

Important: Debezium must use Kafka’s internal listener (kafka:29092), not the external one.

Registering the PostgreSQL connector:

{
  "name": "crypto-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "crypto_db",
    "database.server.name": "crypto_server",
    "table.include.list": "public.ticker_24h,public.klines",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "topic.prefix": "crypto"
  }
}
Enter fullscreen mode

Exit fullscreen mode



5. The Tricky Part: Decoding Decimals

When I first ran the consumer, I got decimal.ConversionSyntax errors everywhere. Turns out, Debezium encodes PostgreSQL DECIMAL values as base64-encoded bytes in JSON.

Here’s how the data looks in Kafka:

{
  "payload": {
    "after": {
      "symbol": "BTCUSDT",
      "last_price": "CW/ZvA3A",  // This is base64!
      "volume": "Az8in0BI"
    }
  }
}
Enter fullscreen mode

Exit fullscreen mode

I had to write a decoder:

import base64
from decimal import Decimal

@staticmethod
def decode_decimal(encoded_value, scale):
    """Decode Debezium decimal from base64-encoded bytes"""
    if encoded_value is None:
        return Decimal('0')

    byte_data = base64.b64decode(encoded_value)
    value = int.from_bytes(byte_data, byteorder='big', signed=True)
    return Decimal(value) / Decimal(10 ** scale)

# Usage
last_price = self.decode_decimal(data['last_price'], 8)  # scale=8
percent = self.decode_decimal(data['price_change_percent'], 4)  # scale=4
Enter fullscreen mode

Exit fullscreen mode

This was the biggest gotcha. Spent 2 hours debugging this before I checked the Kafka messages directly.



6. Consuming CDC Events

The Debezium consumer subscribes to Kafka topics and replicates data to Cassandra:

from confluent_kafka import Consumer
from datetime import datetime

class DebeziumConsumer:
    def __init__(self, cassandra: CassandraManager):
        self.cassandra = cassandra
        self.consumer = Consumer({
            'bootstrap.servers': 'kafka:29092',
            'group.id': 'crypto-cdc-consumer',
            'auto.offset.reset': 'latest',
            'enable.auto.commit': True
        })
        self.consumer.subscribe([
            'crypto.public.ticker_24h',
            'crypto.public.klines'
        ])

    def consume(self):
        while True:
            msg = self.consumer.poll(timeout=1.0)
            if msg is None:
                continue

            event = json.loads(msg.value().decode('utf-8'))

            if msg.topic() == 'crypto.public.ticker_24h':
                self.process_ticker_event(event)
            elif msg.topic() == 'crypto.public.klines':
                self.process_kline_event(event)
Enter fullscreen mode

Exit fullscreen mode



7. Cassandra for Time-Series Data

Cassandra is perfect for time-series data with its clustering columns:

def _create_tables(self):
    self.session.execute("""
        CREATE TABLE IF NOT EXISTS ticker_24h (
            symbol text,
            timestamp timestamp,
            price_change decimal,
            last_price decimal,
            volume decimal,
            PRIMARY KEY (symbol, timestamp)
        ) WITH CLUSTERING ORDER BY (timestamp DESC)
    """)
Enter fullscreen mode

Exit fullscreen mode

This allows blazing-fast queries like “give me the last 100 prices for BTC” without full table scans.



8. Grafana Dashboards

The final piece is visualization. I provisioned Grafana with both PostgreSQL and Cassandra data sources:

# grafana/provisioning/datasources/datasources.yml
apiVersion: 1

datasources:
  - name: PostgreSQL
    type: postgres
    url: postgres:5432
    database: crypto_db
    user: postgres
    isDefault: true

  - name: Cassandra
    type: hadesarchitect-cassandra-datasource
    url: cassandra:9042
    jsonData:
      keyspace: crypto_data
Enter fullscreen mode

Exit fullscreen mode

Dashboard queries:

-- Top 5 performers (PostgreSQL)
SELECT
  symbol,
  ROUND(price_change_percent::numeric, 2) as "24h Change %",
  ROUND(last_price::numeric, 2) as "Last Price"
FROM ticker_24h
WHERE timestamp = (SELECT MAX(timestamp) FROM ticker_24h)
ORDER BY price_change_percent DESC
LIMIT 5
Enter fullscreen mode

Exit fullscreen mode

-- Candlestick data (Cassandra)
SELECT open_time, open_price, high_price, low_price, close_price
FROM klines
WHERE symbol = 'BTCUSDT'
LIMIT 100
Enter fullscreen mode

Exit fullscreen mode



Results

Here’s what the running system looks like:

Docker Services:

$ docker-compose ps
NAME                     STATUS          PORTS
postgres                 Up              0.0.0.0:5432->5432/tcp
cassandra                Up              0.0.0.0:9042->9042/tcp
zookeeper                Up              0.0.0.0:2181->2181/tcp
kafka                    Up              0.0.0.0:9092->9092/tcp
debezium                 Up              0.0.0.0:8083->8083/tcp
app                      Up
grafana                  Up              0.0.0.0:3000->3000/tcp
Enter fullscreen mode

Exit fullscreen mode

Application Logs:

Starting crypto data pipeline with Debezium CDC...
Starting Debezium CDC consumer...
[2025-11-04 14:19:37] Stored 5 ticker records
[2025-11-04 14:19:39] Stored klines for 5 symbols
[2025-11-04 14:20:39] Replicated ticker: BTCUSDT
[2025-11-04 14:20:39] Replicated ticker: ETHUSDT
[2025-11-04 14:20:40] Replicated kline: BTCUSDT
Enter fullscreen mode

Exit fullscreen mode

Data Verification:

PostgreSQL:

crypto_db=# SELECT COUNT(*) FROM ticker_24h;
 count
-------
  245
Enter fullscreen mode

Exit fullscreen mode

Cassandra:

cqlsh> SELECT symbol, timestamp, last_price
       FROM crypto_data.ticker_24h
       WHERE symbol='BTCUSDT' LIMIT 3;

 symbol   | timestamp               | last_price
----------+-------------------------+-------------
 BTCUSDT  | 2025-11-04 14:20:37+00  | 103793.97
 BTCUSDT  | 2025-11-04 14:19:37+00  | 103954.50
 BTCUSDT  | 2025-11-04 14:16:29+00  | 103888.55
Enter fullscreen mode

Exit fullscreen mode

Grafana Dashboard:

The dashboard auto-refreshes every 10 seconds and shows:

  • Top 5 cryptos by 24h gain
  • Real-time price trends for all tracked symbols
  • BTC candlestick chart (from Cassandra)
  • Trading volume analysis

Grafana Dashboard



Lessons Learned

  1. Debezium decimal encoding is brutal – Spent hours debugging before realizing they’re base64-encoded. Always check the raw Kafka messages first.

  2. Kafka listener configuration matters – Debezium must use the internal listener (kafka:29092), not the external one (localhost:9092).

  3. PostgreSQL needs logical replication – Set wal_level=logical or Debezium won’t work.

  4. Cassandra startup is slow – Added retry logic with 30 attempts @ 2s intervals. Cassandra takes 30-60 seconds to initialize.

  5. Docker networking is your friend – Use service names (postgres, kafka) instead of localhost when services communicate.



Performance

With this setup:

  • Latency: ~500ms from PostgreSQL write to Cassandra insert
  • Throughput: Handles 100+ inserts/second easily
  • Data Loss: Zero (thanks to Kafka’s persistence)
  • Scalability: Can add more Kafka partitions and consumers horizontally



Running It Yourself

Clone and run:

git clone https://github.com/yourusername/crypto-cdc-pipeline
cd crypto-cdc-pipeline

# Start everything
docker-compose up -d

# Check logs
docker-compose logs -f app

# Open Grafana
open http://localhost:3000
# Login: admin/admin
Enter fullscreen mode

Exit fullscreen mode

The repository includes:

  • Complete source code
  • Docker Compose setup
  • Grafana dashboard JSON
  • Debezium connector config



What’s Next?

Potential improvements:

  • Add more crypto symbols (currently tracking 5)
  • Implement alerting for price thresholds
  • Add data retention policies
  • Set up Kafka Connect clustering for HA
  • Stream processing with Kafka Streams



Conclusion

Building a real-time data pipeline with CDC isn’t trivial, but Debezium makes it manageable. The key is understanding the data flow and handling the edge cases (like decimal encoding).

This architecture is production-ready and can handle millions of events per day. I’m using a similar setup at work for our financial data pipeline.

Got questions? Drop them in the comments!



Resources


If you found this useful, give it a ❤️ and bookmark for later!



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *