js

How to Build a Real-Time Stream Processing Pipeline with Node.js, Kafka, and ClickHouse

Learn to build a production-ready real-time data pipeline using Node.js, Kafka, and ClickHouse. Stream, process, and analyze events instantly.

How to Build a Real-Time Stream Processing Pipeline with Node.js, Kafka, and ClickHouse

I’ve been thinking about data a lot lately. Not just any data, but the kind that moves—the constant, flowing stream of information that powers our modern world. Every click, every transaction, every sensor reading is a tiny pulse in a vast, real-time system. For years, I built applications that treated data as something static, something to be processed in big, slow batches at the end of the day. But the world doesn’t work in batches. It works in streams. So, I set out to build something better: a pipeline that could not only keep up but make sense of the flow as it happens. This is how you build a production-ready stream processing system with Node.js, Kafka, and ClickHouse.

Think about the last time you saw a live dashboard update, watched a fraud alert pop up on your phone, or got a real-time recommendation. What’s happening behind the scenes? A traditional application would store that event in a database and maybe process it hours later. A stream processing system sees that event as a message on a conveyor belt, transforming and analyzing it before it even reaches permanent storage. The shift in mindset—from database-first to event-first—is everything.

Why these tools? Node.js handles asynchronous operations beautifully, making it ideal for the I/O-heavy nature of streams. Apache Kafka is the industrial-strength message bus that guarantees your data gets where it’s going, even if parts of the system fail. ClickHouse is the analytical engine that can make sense of billions of rows in milliseconds. Together, they form a complete loop: ingest, process, analyze.

Let’s start with the foundation. We need our data infrastructure running. A simple Docker setup gets Kafka and ClickHouse ready on your machine.

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports: ["8123:8123", "9000:9000"]

Run docker-compose up -d, and you have a streaming platform and a database ready. It’s that simple to start.

Now, let’s talk about events. What exactly are we sending? We need a clear contract. I use Zod for validation because a broken message can break the entire pipeline.

// src/schemas/event.schema.ts
import { z } from 'zod';

export const UserEventSchema = z.object({
  eventId: z.string().uuid(),
  userId: z.string(),
  eventType: z.enum(['page_view', 'click', 'purchase', 'login']),
  timestamp: z.string().datetime(),
  properties: z.record(z.any()).optional(),
});

export type UserEvent = z.infer<typeof UserEventSchema>;

This schema ensures every message has a unique ID, a user, a known type, and a proper timestamp. The properties field is a flexible bag for extra data. Can you see how this structure could represent almost any user interaction?

With our schema defined, we need to produce events. A producer in Kafka is just a client that sends messages to a named channel called a topic.

// src/producers/eventProducer.ts
import { Kafka } from 'kafkajs';
import { UserEventSchema } from '../schemas/event.schema.js';

const kafka = new Kafka({ clientId: 'web-app', brokers: ['localhost:9092'] });
const producer = kafka.producer();

await producer.connect();

async function sendEvent(eventData: unknown) {
  const validation = UserEventSchema.safeParse(eventData);
  
  if (!validation.success) {
    console.error('Invalid event:', validation.error);
    // Send to a dead-letter topic for inspection
    await producer.send({
      topic: 'dead_letter_events',
      messages: [{ value: JSON.stringify(eventData) }],
    });
    return;
  }

  const event = validation.data;
  
  await producer.send({
    topic: 'raw_user_events',
    messages: [
      {
        key: event.userId, // Key ensures user events go to same partition
        value: JSON.stringify(event),
        headers: { 'event-type': event.eventType },
      },
    ],
  });
  console.log(`Event ${event.eventId} sent.`);
}

// Simulate an event
await sendEvent({
  eventId: '123e4567-e89b-12d3-a456-426614174000',
  userId: 'user_789',
  eventType: 'purchase',
  timestamp: new Date().toISOString(),
  properties: { item: 'Laptop', amount: 1299.99 },
});

Notice the key in the message? Setting it to the userId is a crucial trick. All events for a specific user will go to the same Kafka partition, guaranteeing they are processed in order. Why is order important for a user’s session?

Producing is only half the story. The real magic happens in the consumer. This is where we process the stream. Let’s build a consumer that reads these raw events, enriches them, and writes them to ClickHouse.

// src/consumers/eventProcessor.ts
import { Kafka } from 'kafkajs';
import { ClickHouseClient, createClient } from '@clickhouse/client';

const kafka = new Kafka({ clientId: 'event-processor', brokers: ['localhost:9092'] });
const clickhouse: ClickHouseClient = createClient({ host: 'http://localhost:8123' });

const consumer = kafka.consumer({ groupId: 'event-processing-group' });

await consumer.connect();
await consumer.subscribe({ topic: 'raw_user_events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ message }) => {
    try {
      const event = JSON.parse(message.value!.toString());
      
      // Simple enrichment: add processing timestamp
      const processedEvent = {
        ...event,
        processed_at: new Date().toISOString(),
        ingested_at: message.timestamp,
      };

      // Insert into ClickHouse
      await clickhouse.insert({
        table: 'user_events',
        values: [processedEvent],
        format: 'JSONEachRow',
      });

      console.log(`Processed event ${event.eventId}`);
      
    } catch (error) {
      console.error('Failed to process message:', error);
      // In a real system, you might retry or send to a dead-letter queue here.
    }
  },
});

This consumer does three things: it reads a message, adds a timestamp, and inserts it into the database. It’s a simple but powerful pattern. But what if we need to do more than just store the data? What if we need to count, sum, or average in real-time?

This is where stateful processing and windowing come in. Let’s say we want to count page views per minute. We can’t just count all events forever; we need to define a time window.

// src/processors/windowedAggregator.ts
import { Kafka } from 'kafkajs';

const kafka = new Kafka({ clientId: 'aggregator', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'aggregator-group' });
const producer = kafka.producer();

await Promise.all([consumer.connect(), producer.connect()]);
await consumer.subscribe({ topic: 'raw_user_events' });

// In-memory store for the current one-minute window
let windowCounts = new Map<string, number>(); // key: "YYYY-MM-DD HH:MM", value: count
let currentWindowKey: string = getMinuteWindowKey();

function getMinuteWindowKey(): string {
  const now = new Date();
  return `${now.getFullYear()}-${now.getMonth()+1}-${now.getDate()} ${now.getHours()}:${now.getMinutes()}`;
}

setInterval(() => {
  // Every minute, emit the aggregated result and reset
  console.log(`Window ${currentWindowKey}: ${windowCounts.size} unique users.`);
  
  // Send aggregation to a new topic for dashboards
  producer.send({
    topic: 'minute_user_counts',
    messages: [{
      value: JSON.stringify({
        window: currentWindowKey,
        unique_users: windowCounts.size,
        timestamp: new Date().toISOString(),
      }),
    }],
  });
  
  // Reset for the next window
  windowCounts.clear();
  currentWindowKey = getMinuteWindowKey();
}, 60000); // 60 seconds

await consumer.run({
  eachMessage: async ({ message }) => {
    const event = JSON.parse(message.value!.toString());
    if (event.eventType === 'page_view') {
      // Use userId as the key to count unique users
      windowCounts.set(event.userId, 1);
    }
  },
});

This aggregator creates one-minute tumbling windows. Every 60 seconds, it sends the count of unique users who viewed a page to a new Kafka topic. A dashboard could subscribe to that topic and update in real-time. This is the core of real-time analytics. How would you modify this to track a running total of purchase amounts?

Of course, we need somewhere to store these final results for long-term querying. This is where ClickHouse shines. Let’s set up our table.

-- Run this in the ClickHouse client
CREATE TABLE analytics.user_events
(
    eventId String,
    userId String,
    eventType String,
    timestamp DateTime64(3),
    properties String,
    processed_at DateTime64(3),
    ingested_at Int64
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (userId, timestamp);

The ORDER BY clause is critical. ClickHouse stores data physically sorted by these columns. Querying for all events for a specific user is incredibly fast because the data is co-located on disk. This is the columnar database advantage.

Now, let’s query it. The power is in combining fresh stream data with historical context.

// src/analytics/queryService.ts
import { createClient } from '@clickhouse/client';

const client = createClient({ host: 'http://localhost:8123' });

async function getRealTimeFunnel(userId: string) {
  const query = `
    SELECT 
      eventType,
      count() as count,
      max(timestamp) as last_seen
    FROM user_events
    WHERE userId = {userId: String}
      AND timestamp > now() - INTERVAL 1 HOUR
    GROUP BY eventType
    ORDER BY last_seen DESC
  `;
  
  const result = await client.query({
    query,
    query_params: { userId },
    format: 'JSONEachRow',
  });
  
  const rows = await result.json();
  return rows; // e.g., [{eventType: 'page_view', count: '5', last_seen: '2024-01-15 10:30:00'}, ...]
}

// Get a user's recent activity
const userFunnel = await getRealTimeFunnel('user_789');
console.log(userFunnel);

This query gives us a live view of what a single user has done in the last hour. It’s fast because it uses the primary key userId and a time filter. But what about system health? A pipeline is useless if you don’t know it’s working.

Monitoring is non-negotiable. We need to track lag, errors, and throughput.

// src/monitoring/metrics.js
import { Kafka } from 'kafkajs';

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const admin = kafka.admin();

async function getConsumerLag(groupId, topic) {
  const offsets = await admin.fetchOffsets({ groupId, topic });
  const topicOffsets = await admin.fetchTopicOffsets(topic);
  
  let totalLag = 0;
  for (let i = 0; i < offsets.length; i++) {
    const partitionLag = Number(topicOffsets[i].offset) - Number(offsets[i].offset);
    totalLag += partitionLag;
  }
  return totalLag;
}

// Check every 30 seconds
setInterval(async () => {
  const lag = await getConsumerLag('event-processing-group', 'raw_user_events');
  console.log(`Current consumer lag: ${lag} messages`);
  
  if (lag > 10000) {
    console.error('Lag is too high! Need to scale out consumers.');
  }
}, 30000);

Lag is the number of unprocessed messages. If it grows, your consumers can’t keep up with the producers. You need to add more consumer instances. This is how you scale horizontally.

Building this pipeline taught me that resilience is about expecting failure. Networks drop. Databases restart. Code has bugs. The key is to handle it gracefully. Use dead-letter topics for bad messages. Use consumer groups for parallel processing. Use idempotent operations so processing the same message twice doesn’t cause problems.

So, what’s the end result? You have a system that ingests a continuous stream of events. It validates and enriches them. It can perform real-time aggregations, like counting or summing. It stores everything in a database optimized for fast queries. And it provides hooks for monitoring and scaling. You’ve moved from asking “what happened yesterday?” to “what is happening right now?”

The journey from batch to real-time is challenging but transformative. It changes how you think about data, from a passive record to an active signal. Start small. Produce one event. Consume it. Store it. Query it. Then add a window, an aggregation, a dashboard. Each step builds your confidence.

I hope this guide helps you build something amazing. What will you build with your real-time data stream? If you found this walkthrough useful, please like, share, and leave a comment below with your thoughts or questions. Let’s keep the conversation flowing.


As a best-selling author, I invite you to explore my books on Amazon. Don’t forget to follow me on Medium and show your support. Thank you! Your support means the world!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!


📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!


Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Keywords: real-time data,stream processing,nodejs,kafka,clickhouse



Similar Posts
Blog Image
Complete Guide to Integrating Next.js with Prisma ORM for Type-Safe Full-Stack Development

Learn how to integrate Next.js with Prisma ORM for type-safe, full-stack applications. Complete guide with setup, best practices, and examples.

Blog Image
Complete Guide to Next.js Prisma Integration: Build Type-Safe Full-Stack Applications with Modern ORM

Learn how to integrate Next.js with Prisma ORM for type-safe, scalable web applications. Discover seamless database operations and performance optimization. Start building today!

Blog Image
How to Integrate Vite with Tailwind CSS: Complete Setup Guide for Lightning-Fast Frontend Development

Learn how to integrate Vite with Tailwind CSS for lightning-fast frontend development. Boost build speeds, reduce CSS bundles, and streamline your workflow today.

Blog Image
Complete Guide to Next.js Prisma Integration: Build Type-Safe Full-Stack Applications in 2024

Learn how to integrate Next.js with Prisma ORM for type-safe full-stack apps. Build seamless database operations with auto-generated schemas and TypeScript support.

Blog Image
Complete Guide to Integrating Next.js with Prisma ORM for Type-Safe Full-Stack Development

Learn how to integrate Next.js with Prisma ORM for type-safe database operations. Complete guide with setup, configuration, and best practices.

Blog Image
Complete Guide to Integrating Svelte with Firebase: Build Real-Time Apps Fast

Learn how to integrate Svelte with Firebase for powerful web apps. Build real-time applications with authentication, databases, and hosting. Start building today!