js

Complete Node.js Event Sourcing Guide: TypeScript, PostgreSQL, and Real-World Implementation

Learn to implement Event Sourcing with Node.js, TypeScript & PostgreSQL. Build event stores, handle versioning, create projections & optimize performance for scalable systems.

Complete Node.js Event Sourcing Guide: TypeScript, PostgreSQL, and Real-World Implementation

Here’s a comprehensive guide to implementing event sourcing with Node.js, TypeScript, and PostgreSQL:


Lately, I’ve been thinking about how we track changes in complex systems. Traditional approaches often lose valuable historical context. That’s when event sourcing caught my attention - a method where we capture every state change as immutable events. Why settle for current state alone when you can reconstruct any moment in time? Let me show you how to implement this powerful pattern.

First, we need a solid foundation. Start by setting up your project:

mkdir event-sourcing-app
cd event-sourcing-app
npm init -y
npm install express pg uuid @types/uuid @types/pg
npm install -D typescript @types/node ts-node

Configure TypeScript with this tsconfig.json:

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "rootDir": "./src",
    "outDir": "./dist",
    "strict": true,
    "esModuleInterop": true
  }
}

Our event store needs a proper PostgreSQL schema. Notice how we’re capturing both the event data and metadata:

CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    version INTEGER NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE (aggregate_id, version)
);

CREATE INDEX idx_aggregate_id ON events (aggregate_id);

Now, let’s define our core interfaces. This TypeScript foundation ensures type safety throughout our system:

// src/types/events.ts
export interface DomainEvent {
  aggregateId: string;
  eventType: string;
  eventData: unknown;
  version: number;
}

export interface EventStore {
  saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  getEvents(aggregateId: string): Promise<DomainEvent[]>;
}

The real magic happens in the event store implementation. Notice how we handle concurrency conflicts:

// src/infrastructure/PostgresEventStore.ts
import { Pool } from 'pg';
import { DomainEvent, EventStore } from '../types/events';

export class PostgresEventStore implements EventStore {
  constructor(private pool: Pool) {}

  async saveEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');
      
      const res = await client.query(
        'SELECT MAX(version) as current FROM events WHERE aggregate_id = $1',
        [aggregateId]
      );
      const currentVersion = res.rows[0]?.current || 0;
      
      if (currentVersion !== expectedVersion) {
        throw new Error(`Version conflict: Expected ${expectedVersion}, found ${currentVersion}`);
      }
      
      for (const [index, event] of events.entries()) {
        await client.query(
          `INSERT INTO events (aggregate_id, event_type, event_data, version)
           VALUES ($1, $2, $3, $4)`,
          [aggregateId, event.eventType, JSON.stringify(event.eventData), expectedVersion + index + 1]
        );
      }
      
      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const res = await this.pool.query(
      `SELECT event_type, event_data, version 
       FROM events 
       WHERE aggregate_id = $1 
       ORDER BY version ASC`,
      [aggregateId]
    );
    
    return res.rows.map(row => ({
      aggregateId,
      eventType: row.event_type,
      eventData: JSON.parse(row.event_data),
      version: row.version
    }));
  }
}

Aggregates are where business rules live. They process commands and produce events. How might we handle inventory updates?

// src/domain/InventoryItem.ts
export class InventoryItem {
  private _pendingEvents: DomainEvent[] = [];

  constructor(
    public readonly id: string,
    private count: number,
    private version: number = 0
  ) {}

  get pendingEvents(): DomainEvent[] {
    return this._pendingEvents;
  }

  adjustCount(change: number): void {
    if (this.count + change < 0) {
      throw new Error('Insufficient inventory');
    }
    
    this.count += change;
    this._pendingEvents.push({
      aggregateId: this.id,
      eventType: 'InventoryAdjusted',
      eventData: { change, newCount: this.count },
      version: this.version + this._pendingEvents.length + 1
    });
  }

  static fromEvents(id: string, events: DomainEvent[]): InventoryItem {
    let count = 0;
    events.forEach(event => {
      if (event.eventType === 'InventoryAdjusted') {
        count += (event.eventData as any).change;
      }
    });
    return new InventoryItem(id, count, events.length);
  }
}

Projections transform events into read-optimized views. Here’s a simple example:

// src/projections/inventoryProjection.ts
export class InventoryProjection {
  private inventory: Map<string, number> = new Map();

  applyEvent(event: DomainEvent): void {
    if (event.eventType === 'InventoryAdjusted') {
      const current = this.inventory.get(event.aggregateId) || 0;
      this.inventory.set(event.aggregateId, current + (event.eventData as any).change);
    }
  }

  getCount(itemId: string): number {
    return this.inventory.get(itemId) || 0;
  }
}

For performance, we implement snapshots. When would you trigger a snapshot? Typically after every 100 events or so:

CREATE TABLE snapshots (
    aggregate_id UUID PRIMARY KEY,
    data JSONB NOT NULL,
    version INTEGER NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// src/infrastructure/SnapshotRepository.ts
export async function saveSnapshot(
  pool: Pool, 
  aggregateId: string, 
  data: any, 
  version: number
): Promise<void> {
  await pool.query(
    `INSERT INTO snapshots (aggregate_id, data, version)
     VALUES ($1, $2, $3)
     ON CONFLICT (aggregate_id) 
     DO UPDATE SET data = $2, version = $3`,
    [aggregateId, JSON.stringify(data), version]
  );
}

export async function loadSnapshot(
  pool: Pool, 
  aggregateId: string
): Promise<{ data: any; version: number } | null> {
  const res = await pool.query(
    'SELECT data, version FROM snapshots WHERE aggregate_id = $1',
    [aggregateId]
  );
  return res.rows[0] 
    ? { data: JSON.parse(res.rows[0].data), version: res.rows[0].version } 
    : null;
}

Testing event-sourced systems requires special attention. How do you verify temporal behavior? I recommend these patterns:

// tests/inventory.test.ts
describe('InventoryItem', () => {
  it('should reject negative inventory', () => {
    const item = new InventoryItem('item1', 10);
    expect(() => item.adjustCount(-11)).toThrow('Insufficient inventory');
  });

  it('should rebuild state from events', () => {
    const events = [
      { eventType: 'InventoryAdjusted', eventData: { change: 5 } },
      { eventType: 'InventoryAdjusted', eventData: { change: -3 } }
    ] as DomainEvent[];
    
    const item = InventoryItem.fromEvents('item1', events);
    expect(item.count).toEqual(2);
  });
});

For production, consider these optimizations:

  • Partition events by aggregate ID
  • Use MATERIALIZED VIEWS for frequent queries
  • Compress older events
  • Separate read/write databases

Common pitfalls I’ve encountered:

  • Forgetting to reset pending events after saving
  • Version mismatch during concurrent updates
  • Overlooking event schema evolution
  • Projection latency in read models

Event sourcing isn’t just a technical pattern - it changes how you think about system state. By capturing every change as immutable facts, we gain audit trails, temporal querying, and robust failure recovery. The initial effort pays off in maintainability and insight.

Found this useful? Implement it in your next project and share your experience! Like this guide if it helped you, share it with your team, and comment below with your event sourcing challenges.


This implementation provides:

  • Full event storage with concurrency control
  • Aggregate root pattern enforcement
  • Read model projections
  • Snapshot optimization
  • Comprehensive error handling
  • Testing strategies
  • Production-ready optimizations

The code examples show actual implementation patterns you can extend for real-world scenarios while maintaining the integrity of the event sourcing pattern.

Keywords: event sourcing node.js, typescript event sourcing, postgresql event store, node.js event sourcing tutorial, event sourcing architecture, typescript postgresql events, event sourcing implementation guide, node.js cqrs event sourcing, event sourcing patterns typescript, postgresql event sourcing database



Similar Posts
Blog Image
Next.js Prisma Integration Guide: Build Type-Safe Full-Stack Applications with Modern Database Toolkit

Learn how to integrate Next.js with Prisma ORM for type-safe, scalable full-stack applications. Build seamless database operations with modern tools.

Blog Image
Complete Guide to Integrating Next.js with Prisma ORM: Build Type-Safe Full-Stack Applications

Learn to integrate Next.js with Prisma ORM for type-safe database operations. Build full-stack apps with seamless data handling and TypeScript support.

Blog Image
Build Real-Time Next.js Apps with Socket.io: Complete Integration Guide for Modern Developers

Learn how to integrate Socket.io with Next.js to build powerful real-time web applications. Master WebSocket setup, API routes, and live data flow for chat apps and dashboards.

Blog Image
Complete Guide to Next.js Prisma Integration: Build Type-Safe Database-Driven Apps in 2024

Learn how to integrate Next.js with Prisma ORM for type-safe, database-driven web apps. Build powerful full-stack applications with seamless frontend-backend unity.

Blog Image
Complete Guide to Integrating Svelte with Firebase: Build Real-Time Apps Fast

Learn to integrate Svelte with Firebase for seamless full-stack development. Build reactive apps with real-time data, authentication & cloud services effortlessly.

Blog Image
Why Deno, Oak, and MongoDB Might Be the Future of Backend Development

Explore how Deno, Oak, and MongoDB combine to create a secure, modern, and minimal backend stack for building APIs.