Here’s a comprehensive guide to implementing event sourcing with Node.js, TypeScript, and PostgreSQL:
Lately, I’ve been thinking about how we track changes in complex systems. Traditional approaches often lose valuable historical context. That’s when event sourcing caught my attention - a method where we capture every state change as immutable events. Why settle for current state alone when you can reconstruct any moment in time? Let me show you how to implement this powerful pattern.
First, we need a solid foundation. Start by setting up your project:
mkdir event-sourcing-app
cd event-sourcing-app
npm init -y
npm install express pg uuid @types/uuid @types/pg
npm install -D typescript @types/node ts-node
Configure TypeScript with this tsconfig.json
:
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"rootDir": "./src",
"outDir": "./dist",
"strict": true,
"esModuleInterop": true
}
}
Our event store needs a proper PostgreSQL schema. Notice how we’re capturing both the event data and metadata:
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
aggregate_id UUID NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
version INTEGER NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE (aggregate_id, version)
);
CREATE INDEX idx_aggregate_id ON events (aggregate_id);
Now, let’s define our core interfaces. This TypeScript foundation ensures type safety throughout our system:
// src/types/events.ts
export interface DomainEvent {
aggregateId: string;
eventType: string;
eventData: unknown;
version: number;
}
export interface EventStore {
saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
getEvents(aggregateId: string): Promise<DomainEvent[]>;
}
The real magic happens in the event store implementation. Notice how we handle concurrency conflicts:
// src/infrastructure/PostgresEventStore.ts
import { Pool } from 'pg';
import { DomainEvent, EventStore } from '../types/events';
export class PostgresEventStore implements EventStore {
constructor(private pool: Pool) {}
async saveEvents(
aggregateId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const res = await client.query(
'SELECT MAX(version) as current FROM events WHERE aggregate_id = $1',
[aggregateId]
);
const currentVersion = res.rows[0]?.current || 0;
if (currentVersion !== expectedVersion) {
throw new Error(`Version conflict: Expected ${expectedVersion}, found ${currentVersion}`);
}
for (const [index, event] of events.entries()) {
await client.query(
`INSERT INTO events (aggregate_id, event_type, event_data, version)
VALUES ($1, $2, $3, $4)`,
[aggregateId, event.eventType, JSON.stringify(event.eventData), expectedVersion + index + 1]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const res = await this.pool.query(
`SELECT event_type, event_data, version
FROM events
WHERE aggregate_id = $1
ORDER BY version ASC`,
[aggregateId]
);
return res.rows.map(row => ({
aggregateId,
eventType: row.event_type,
eventData: JSON.parse(row.event_data),
version: row.version
}));
}
}
Aggregates are where business rules live. They process commands and produce events. How might we handle inventory updates?
// src/domain/InventoryItem.ts
export class InventoryItem {
private _pendingEvents: DomainEvent[] = [];
constructor(
public readonly id: string,
private count: number,
private version: number = 0
) {}
get pendingEvents(): DomainEvent[] {
return this._pendingEvents;
}
adjustCount(change: number): void {
if (this.count + change < 0) {
throw new Error('Insufficient inventory');
}
this.count += change;
this._pendingEvents.push({
aggregateId: this.id,
eventType: 'InventoryAdjusted',
eventData: { change, newCount: this.count },
version: this.version + this._pendingEvents.length + 1
});
}
static fromEvents(id: string, events: DomainEvent[]): InventoryItem {
let count = 0;
events.forEach(event => {
if (event.eventType === 'InventoryAdjusted') {
count += (event.eventData as any).change;
}
});
return new InventoryItem(id, count, events.length);
}
}
Projections transform events into read-optimized views. Here’s a simple example:
// src/projections/inventoryProjection.ts
export class InventoryProjection {
private inventory: Map<string, number> = new Map();
applyEvent(event: DomainEvent): void {
if (event.eventType === 'InventoryAdjusted') {
const current = this.inventory.get(event.aggregateId) || 0;
this.inventory.set(event.aggregateId, current + (event.eventData as any).change);
}
}
getCount(itemId: string): number {
return this.inventory.get(itemId) || 0;
}
}
For performance, we implement snapshots. When would you trigger a snapshot? Typically after every 100 events or so:
CREATE TABLE snapshots (
aggregate_id UUID PRIMARY KEY,
data JSONB NOT NULL,
version INTEGER NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// src/infrastructure/SnapshotRepository.ts
export async function saveSnapshot(
pool: Pool,
aggregateId: string,
data: any,
version: number
): Promise<void> {
await pool.query(
`INSERT INTO snapshots (aggregate_id, data, version)
VALUES ($1, $2, $3)
ON CONFLICT (aggregate_id)
DO UPDATE SET data = $2, version = $3`,
[aggregateId, JSON.stringify(data), version]
);
}
export async function loadSnapshot(
pool: Pool,
aggregateId: string
): Promise<{ data: any; version: number } | null> {
const res = await pool.query(
'SELECT data, version FROM snapshots WHERE aggregate_id = $1',
[aggregateId]
);
return res.rows[0]
? { data: JSON.parse(res.rows[0].data), version: res.rows[0].version }
: null;
}
Testing event-sourced systems requires special attention. How do you verify temporal behavior? I recommend these patterns:
// tests/inventory.test.ts
describe('InventoryItem', () => {
it('should reject negative inventory', () => {
const item = new InventoryItem('item1', 10);
expect(() => item.adjustCount(-11)).toThrow('Insufficient inventory');
});
it('should rebuild state from events', () => {
const events = [
{ eventType: 'InventoryAdjusted', eventData: { change: 5 } },
{ eventType: 'InventoryAdjusted', eventData: { change: -3 } }
] as DomainEvent[];
const item = InventoryItem.fromEvents('item1', events);
expect(item.count).toEqual(2);
});
});
For production, consider these optimizations:
- Partition events by aggregate ID
- Use MATERIALIZED VIEWS for frequent queries
- Compress older events
- Separate read/write databases
Common pitfalls I’ve encountered:
- Forgetting to reset pending events after saving
- Version mismatch during concurrent updates
- Overlooking event schema evolution
- Projection latency in read models
Event sourcing isn’t just a technical pattern - it changes how you think about system state. By capturing every change as immutable facts, we gain audit trails, temporal querying, and robust failure recovery. The initial effort pays off in maintainability and insight.
Found this useful? Implement it in your next project and share your experience! Like this guide if it helped you, share it with your team, and comment below with your event sourcing challenges.
This implementation provides:
- Full event storage with concurrency control
- Aggregate root pattern enforcement
- Read model projections
- Snapshot optimization
- Comprehensive error handling
- Testing strategies
- Production-ready optimizations
The code examples show actual implementation patterns you can extend for real-world scenarios while maintaining the integrity of the event sourcing pattern.