js

How to Build a Real-Time Data Pipeline with Change Data Capture and Kafka

Learn how to use Debezium, Kafka, and TypeScript to stream database changes in real time using Change Data Capture.

How to Build a Real-Time Data Pipeline with Change Data Capture and Kafka

I was building a microservices system last month when I hit a familiar wall. My services needed to know about changes in the database instantly, but constant polling was slowing everything down. I needed a better way. That’s when I decided to build a real-time data pipeline using Change Data Capture. Let me show you how you can do it too.

Change Data Capture, or CDC, is a method for tracking changes in a database. Instead of asking the database what’s new every few seconds, CDC tells you immediately when something changes. Think of it like getting a notification on your phone instead of checking an app manually.

Why does this matter? In modern applications, data changes need to be reflected everywhere at once. A user updates their profile, and that change should immediately update their session, refresh a cache, and sync to a search index. Doing this with traditional methods creates lag and puts unnecessary load on your database.

So, how does it work in practice? We use a tool called Debezium. It acts as a connector, watching your PostgreSQL database. When a row is inserted, updated, or deleted, Debezium captures that event and sends it to a message broker like Apache Kafka. From there, any number of services written in TypeScript can consume those events and act on them.

Have you ever wondered how large platforms keep data consistent across so many services? This pattern is often the answer.

Let’s start by setting up our environment. We’ll use Docker Compose to run everything locally. This setup includes PostgreSQL, Kafka, and the Debezium connector service.

# docker-compose.yml
version: '3.8'
services:
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: dbuser
      POSTGRES_PASSWORD: dbpass
      POSTGRES_DB: app_db
    command: ["postgres", "-c", "wal_level=logical"]
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
  debezium:
    image: debezium/connect:2.4
    depends_on: [kafka, postgres]
    ports: ["8083:8083"]

Notice the wal_level=logical command for Postgres. This setting is crucial. It allows the database to stream a log of changes, which Debezium reads from. Without it, CDC won’t work.

Next, we need to register the Debezium connector. This tells Debezium which database to watch and how to connect to it. We do this by sending a configuration to its API.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @config.json

The configuration file defines the source. Here’s a basic example:

{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "dbuser",
    "database.password": "dbpass",
    "database.dbname": "app_db",
    "topic.prefix": "cdc",
    "table.include.list": "public.orders"
  }
}

Once this is running, any change to the orders table will become an event in a Kafka topic named something like cdc.public.orders. But what does that event look like?

This is where TypeScript comes in. We need to write a service that listens to these events. First, let’s install the necessary packages.

npm install kafkajs zod
npm install -D typescript @types/node ts-node

KafkaJS is a robust Kafka client for Node.js. Zod is a library for schema validation, which is perfect for ensuring our events have the correct shape. Now, let’s look at the structure of a CDC event.

Debezium events have a specific format. They contain metadata about the operation (create, update, delete) and the data itself, both before and after the change. We should define a TypeScript type for this.

// types/cdc-event.ts
export type DBOperation = 'c' | 'u' | 'd' | 'r'; // create, update, delete, read

export interface DebeziumEvent {
  before: Record<string, any> | null;
  after: Record<string, any> | null;
  source: {
    table: string;
    db: string;
  };
  op: DBOperation;
  ts_ms: number;
}

But how can we be sure the data from Kafka matches this type? We use Zod to create a validation schema at runtime.

// processors/event-validator.ts
import { z } from 'zod';

const DebeziumEventSchema = z.object({
  before: z.record(z.any()).nullable(),
  after: z.record(z.any()).nullable(),
  source: z.object({
    table: z.string(),
    db: z.string(),
  }),
  op: z.enum(['c', 'u', 'd', 'r']),
  ts_ms: z.number(),
});

export function validateEvent(rawMessage: unknown) {
  const result = DebeziumEventSchema.safeParse(rawMessage);
  if (!result.success) {
    console.error('Invalid event shape:', result.error);
    return null;
  }
  return result.data;
}

This validation is a safety net. It prevents malformed data from crashing our service. Now, let’s write the main Kafka consumer.

What does a robust consumer need? It needs to connect to Kafka, subscribe to a topic, and process each message reliably. It also needs to handle errors and manage its connection to the broker.

// consumers/materialized-view.consumer.ts
import { Kafka, Consumer } from 'kafkajs';
import { validateEvent } from '../processors/event-validator';

export class MaterializedViewConsumer {
  private consumer: Consumer;

  constructor() {
    const kafka = new Kafka({
      clientId: 'materialized-view-service',
      brokers: ['localhost:9092'],
    });

    this.consumer = kafka.consumer({ groupId: 'view-group' });
  }

  async start() {
    await this.consumer.connect();
    await this.consumer.subscribe({ 
      topic: 'cdc.public.orders', 
      fromBeginning: false 
    });

    await this.consumer.run({
      eachMessage: async ({ message }) => {
        await this.processMessage(message);
      },
    });
  }

  private async processMessage(message: any) {
    try {
      const value = JSON.parse(message.value.toString());
      const event = validateEvent(value);

      if (!event) {
        // Log invalid event and skip
        return;
      }

      console.log(`New event from table ${event.source.table}: ${event.op}`);
      await this.updateMaterializedView(event);
      
    } catch (error) {
      console.error('Failed to process message:', error);
      // Implement retry logic here
    }
  }

  private async updateMaterializedView(event: any) {
    // This is where you update your read-optimized view.
    // For example, update a summary table in another database.
    if (event.op === 'c') {
      console.log('Inserting new record:', event.after);
    } else if (event.op === 'u') {
      console.log('Updating record:', event.after);
    } else if (event.op === 'd') {
      console.log('Deleting record:', event.before);
    }
  }
}

The key part is the eachMessage handler. This function is called for every event. We parse the message, validate its structure, and then decide what to do based on the operation type. This is the heart of the event-driven logic.

But what about more complex scenarios? Let’s say an order has related order_items. When an order is updated, you might need to recalculate a total. Your consumer can handle this by fetching related data.

// processors/order-aggregator.ts
export async function processOrderEvent(event: any, dbClient: any) {
  if (event.source.table !== 'orders') return;

  if (event.op === 'c' || event.op === 'u') {
    // Fetch the latest items for this order
    const items = await dbClient.query(
      'SELECT SUM(price * quantity) as total FROM order_items WHERE order_id = $1',
      [event.after.id]
    );
    
    // Update a summary table
    await dbClient.query(
      `INSERT INTO order_summaries (order_id, total, updated_at)
       VALUES ($1, $2, NOW())
       ON CONFLICT (order_id) DO UPDATE
       SET total = $2, updated_at = NOW()`,
      [event.after.id, items.rows[0].total]
    );
  }
}

This pattern is powerful. It keeps derived data, like a summary total, in sync without the main application code needing to manage it. The database change triggers the entire update chain.

Of course, things can go wrong. A service might crash, or an event might be malformed. How do we build resilience? One way is through Kafka’s built-in offset management. The consumer group tracks which messages have been processed. If your service restarts, it picks up where it left off.

We can add more robustness with a dead-letter queue. If processing fails after several retries, we send the event to a separate topic for manual inspection.

// utils/error-handler.ts
import { Producer } from 'kafkajs';

export class ErrorHandler {
  constructor(private dlqProducer: Producer) {}

  async handleProcessingError(event: any, error: Error, retryCount: number) {
    if (retryCount > 3) {
      console.error('Max retries exceeded. Sending to DLQ.');
      await this.sendToDeadLetterQueue(event, error);
    } else {
      // Wait and retry
      await new Promise(resolve => setTimeout(resolve, 1000 * retryCount));
      throw error; // Re-throw to trigger retry
    }
  }

  private async sendToDeadLetterQueue(event: any, error: Error) {
    const dlqMessage = {
      originalEvent: event,
      error: error.message,
      timestamp: new Date().toISOString(),
    };
    await this.dlqProducer.send({
      topic: 'cdc.dlq',
      messages: [{ value: JSON.stringify(dlqMessage) }],
    });
  }
}

Monitoring is also essential. How do you know if your pipeline is healthy? You can export metrics like the number of events processed, average processing time, and error rates. These can be sent to a monitoring tool like Prometheus.

// utils/metrics.ts
export class PipelineMetrics {
  private eventsProcessed = 0;
  private processingErrors = 0;

  recordEvent() {
    this.eventsProcessed++;
  }

  recordError() {
    this.processingErrors++;
  }

  getStatus() {
    return {
      eventsProcessed: this.eventsProcessed,
      processingErrors: this.processingErrors,
      errorRate: this.processingErrors / Math.max(this.eventsProcessed, 1),
    };
  }
}

Imagine deploying this. Your database changes become a real-time stream. One service updates a materialized view for fast queries. Another service invalidates a Redis cache. A third service sends a notification to a user. All of these actions happen automatically, driven by the change event.

The beauty of this setup is its separation of concerns. The application writing to the database doesn’t need to know about all the downstream systems. It just makes a change. Debezium captures it, Kafka distributes it, and your TypeScript services react. It’s a clean, scalable architecture.

Start with a single table and a simple consumer. See how it feels to have real-time data flow. You might be surprised at how many manual batch jobs it can replace.

I hope this guide helps you build more reactive and robust systems. If you found this walk-through useful, please share it with a colleague who might be battling with database polling. Have you tried implementing CDC before? What challenges did you face? Let me know in the comments.


As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!


📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!


Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Keywords: change data capture,debezium,kafka,typescript,real-time data



Similar Posts
Blog Image
Build Full-Stack Web Apps Fast: Complete Guide to Svelte and Supabase Integration

Build powerful full-stack apps with Svelte and Supabase integration. Learn real-time data sync, authentication, and seamless PostgreSQL connectivity. Get started today!

Blog Image
Build High-Performance GraphQL APIs: NestJS, Prisma & Redis Caching Complete Guide

Learn to build scalable GraphQL APIs with NestJS, Prisma ORM, and Redis caching. Master N+1 queries, auth, and performance optimization. Start building now!

Blog Image
Build Complete Multi-Tenant SaaS API with NestJS Prisma PostgreSQL Row-Level Security Tutorial

Learn to build a secure multi-tenant SaaS API using NestJS, Prisma & PostgreSQL Row-Level Security. Complete guide with tenant isolation, authentication & performance optimization.

Blog Image
Build Production-Ready Event-Driven Microservices with NestJS, Redis Streams, and TypeScript Tutorial

Learn to build scalable event-driven microservices with NestJS, Redis Streams & TypeScript. Complete guide with error handling, testing & production deployment tips.

Blog Image
Build Event-Driven Microservices with NestJS, RabbitMQ, and MongoDB: Complete Production-Ready Architecture Guide

Learn to build scalable event-driven microservices with NestJS, RabbitMQ & MongoDB. Master inter-service communication, distributed transactions & error handling.

Blog Image
Build Event-Driven Microservices with NestJS, Redis, and Bull Queue: Complete Professional Guide

Master event-driven microservices with NestJS, Redis & Bull Queue. Learn architecture design, job processing, inter-service communication & deployment strategies.