js

Complete Node.js Event Sourcing Guide: TypeScript, PostgreSQL, and Real-World Implementation

Learn to implement Event Sourcing with Node.js, TypeScript & PostgreSQL. Build event stores, handle versioning, create projections & optimize performance for scalable systems.

Complete Node.js Event Sourcing Guide: TypeScript, PostgreSQL, and Real-World Implementation

Here’s a comprehensive guide to implementing event sourcing with Node.js, TypeScript, and PostgreSQL:


Lately, I’ve been thinking about how we track changes in complex systems. Traditional approaches often lose valuable historical context. That’s when event sourcing caught my attention - a method where we capture every state change as immutable events. Why settle for current state alone when you can reconstruct any moment in time? Let me show you how to implement this powerful pattern.

First, we need a solid foundation. Start by setting up your project:

mkdir event-sourcing-app
cd event-sourcing-app
npm init -y
npm install express pg uuid @types/uuid @types/pg
npm install -D typescript @types/node ts-node

Configure TypeScript with this tsconfig.json:

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "rootDir": "./src",
    "outDir": "./dist",
    "strict": true,
    "esModuleInterop": true
  }
}

Our event store needs a proper PostgreSQL schema. Notice how we’re capturing both the event data and metadata:

CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    version INTEGER NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE (aggregate_id, version)
);

CREATE INDEX idx_aggregate_id ON events (aggregate_id);

Now, let’s define our core interfaces. This TypeScript foundation ensures type safety throughout our system:

// src/types/events.ts
export interface DomainEvent {
  aggregateId: string;
  eventType: string;
  eventData: unknown;
  version: number;
}

export interface EventStore {
  saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  getEvents(aggregateId: string): Promise<DomainEvent[]>;
}

The real magic happens in the event store implementation. Notice how we handle concurrency conflicts:

// src/infrastructure/PostgresEventStore.ts
import { Pool } from 'pg';
import { DomainEvent, EventStore } from '../types/events';

export class PostgresEventStore implements EventStore {
  constructor(private pool: Pool) {}

  async saveEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');
      
      const res = await client.query(
        'SELECT MAX(version) as current FROM events WHERE aggregate_id = $1',
        [aggregateId]
      );
      const currentVersion = res.rows[0]?.current || 0;
      
      if (currentVersion !== expectedVersion) {
        throw new Error(`Version conflict: Expected ${expectedVersion}, found ${currentVersion}`);
      }
      
      for (const [index, event] of events.entries()) {
        await client.query(
          `INSERT INTO events (aggregate_id, event_type, event_data, version)
           VALUES ($1, $2, $3, $4)`,
          [aggregateId, event.eventType, JSON.stringify(event.eventData), expectedVersion + index + 1]
        );
      }
      
      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const res = await this.pool.query(
      `SELECT event_type, event_data, version 
       FROM events 
       WHERE aggregate_id = $1 
       ORDER BY version ASC`,
      [aggregateId]
    );
    
    return res.rows.map(row => ({
      aggregateId,
      eventType: row.event_type,
      eventData: JSON.parse(row.event_data),
      version: row.version
    }));
  }
}

Aggregates are where business rules live. They process commands and produce events. How might we handle inventory updates?

// src/domain/InventoryItem.ts
export class InventoryItem {
  private _pendingEvents: DomainEvent[] = [];

  constructor(
    public readonly id: string,
    private count: number,
    private version: number = 0
  ) {}

  get pendingEvents(): DomainEvent[] {
    return this._pendingEvents;
  }

  adjustCount(change: number): void {
    if (this.count + change < 0) {
      throw new Error('Insufficient inventory');
    }
    
    this.count += change;
    this._pendingEvents.push({
      aggregateId: this.id,
      eventType: 'InventoryAdjusted',
      eventData: { change, newCount: this.count },
      version: this.version + this._pendingEvents.length + 1
    });
  }

  static fromEvents(id: string, events: DomainEvent[]): InventoryItem {
    let count = 0;
    events.forEach(event => {
      if (event.eventType === 'InventoryAdjusted') {
        count += (event.eventData as any).change;
      }
    });
    return new InventoryItem(id, count, events.length);
  }
}

Projections transform events into read-optimized views. Here’s a simple example:

// src/projections/inventoryProjection.ts
export class InventoryProjection {
  private inventory: Map<string, number> = new Map();

  applyEvent(event: DomainEvent): void {
    if (event.eventType === 'InventoryAdjusted') {
      const current = this.inventory.get(event.aggregateId) || 0;
      this.inventory.set(event.aggregateId, current + (event.eventData as any).change);
    }
  }

  getCount(itemId: string): number {
    return this.inventory.get(itemId) || 0;
  }
}

For performance, we implement snapshots. When would you trigger a snapshot? Typically after every 100 events or so:

CREATE TABLE snapshots (
    aggregate_id UUID PRIMARY KEY,
    data JSONB NOT NULL,
    version INTEGER NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// src/infrastructure/SnapshotRepository.ts
export async function saveSnapshot(
  pool: Pool, 
  aggregateId: string, 
  data: any, 
  version: number
): Promise<void> {
  await pool.query(
    `INSERT INTO snapshots (aggregate_id, data, version)
     VALUES ($1, $2, $3)
     ON CONFLICT (aggregate_id) 
     DO UPDATE SET data = $2, version = $3`,
    [aggregateId, JSON.stringify(data), version]
  );
}

export async function loadSnapshot(
  pool: Pool, 
  aggregateId: string
): Promise<{ data: any; version: number } | null> {
  const res = await pool.query(
    'SELECT data, version FROM snapshots WHERE aggregate_id = $1',
    [aggregateId]
  );
  return res.rows[0] 
    ? { data: JSON.parse(res.rows[0].data), version: res.rows[0].version } 
    : null;
}

Testing event-sourced systems requires special attention. How do you verify temporal behavior? I recommend these patterns:

// tests/inventory.test.ts
describe('InventoryItem', () => {
  it('should reject negative inventory', () => {
    const item = new InventoryItem('item1', 10);
    expect(() => item.adjustCount(-11)).toThrow('Insufficient inventory');
  });

  it('should rebuild state from events', () => {
    const events = [
      { eventType: 'InventoryAdjusted', eventData: { change: 5 } },
      { eventType: 'InventoryAdjusted', eventData: { change: -3 } }
    ] as DomainEvent[];
    
    const item = InventoryItem.fromEvents('item1', events);
    expect(item.count).toEqual(2);
  });
});

For production, consider these optimizations:

  • Partition events by aggregate ID
  • Use MATERIALIZED VIEWS for frequent queries
  • Compress older events
  • Separate read/write databases

Common pitfalls I’ve encountered:

  • Forgetting to reset pending events after saving
  • Version mismatch during concurrent updates
  • Overlooking event schema evolution
  • Projection latency in read models

Event sourcing isn’t just a technical pattern - it changes how you think about system state. By capturing every change as immutable facts, we gain audit trails, temporal querying, and robust failure recovery. The initial effort pays off in maintainability and insight.

Found this useful? Implement it in your next project and share your experience! Like this guide if it helped you, share it with your team, and comment below with your event sourcing challenges.


This implementation provides:

  • Full event storage with concurrency control
  • Aggregate root pattern enforcement
  • Read model projections
  • Snapshot optimization
  • Comprehensive error handling
  • Testing strategies
  • Production-ready optimizations

The code examples show actual implementation patterns you can extend for real-world scenarios while maintaining the integrity of the event sourcing pattern.

Keywords: event sourcing node.js, typescript event sourcing, postgresql event store, node.js event sourcing tutorial, event sourcing architecture, typescript postgresql events, event sourcing implementation guide, node.js cqrs event sourcing, event sourcing patterns typescript, postgresql event sourcing database



Similar Posts
Blog Image
How to Scale React Apps with Webpack Module Federation and Micro-Frontends

Discover how to break up monolithic React apps using Webpack Module Federation for scalable, independent micro-frontend architecture.

Blog Image
How to Seamlessly Integrate Zustand with React Router for Smarter Navigation

Learn how to connect Zustand and React Router to simplify state-driven navigation and streamline your React app's logic.

Blog Image
Complete Guide: Build Production-Ready GraphQL API with NestJS, Prisma, and Redis Caching

Build a production-ready GraphQL API with NestJS, Prisma ORM, and Redis caching. Complete guide covers authentication, real-time subscriptions, and performance optimization techniques.

Blog Image
Complete Guide to Integrating Next.js with Prisma ORM for Type-Safe Database-Driven Applications

Learn how to integrate Next.js with Prisma ORM for type-safe, full-stack web applications. Build database-driven apps with seamless frontend-backend connectivity.

Blog Image
Build Real-time Collaborative Document Editor: Socket.io, Operational Transform & MongoDB Complete Tutorial

Build real-time collaborative document editor with Socket.io, Operational Transform & MongoDB. Learn conflict resolution, cursor tracking & performance optimization for concurrent editing.

Blog Image
Build Multi-Tenant SaaS Applications with NestJS, Prisma and PostgreSQL Row-Level Security Guide

Learn to build a scalable multi-tenant SaaS app with NestJS, Prisma & PostgreSQL RLS. Master tenant isolation, JWT auth, and performance optimization for production-ready applications.