Here’s a comprehensive article on building distributed event-driven systems:
Recently, I faced a complex problem in our inventory management system. We needed to track every state change across multiple warehouses while maintaining real-time stock visibility. Traditional CRUD databases couldn’t provide the audit trail and temporal querying we required. This led me to explore event sourcing - a pattern that captures state changes as immutable events. Using EventStore, Node.js, and TypeScript proved to be the perfect combination for building this distributed system.
Event sourcing fundamentally changes how we manage state. Instead of updating records, we store every change as an event. This creates a complete historical record of our system’s evolution. EventStoreDB is purpose-built for this pattern, offering features like persistent subscriptions and stream processing. But how do we ensure our events are properly structured?
TypeScript gives us a significant advantage here. We can define strict event schemas that validate data at runtime. Consider this user registration event definition:
// src/domain/events/user.events.ts
import { z } from 'zod';
export const UserRegisteredSchema = z.object({
userId: z.string().uuid(),
email: z.string().email(),
username: z.string().min(3).max(50),
hashedPassword: z.string().length(60),
registrationDate: z.string().datetime()
});
export type UserRegisteredEvent = z.infer<typeof UserRegisteredSchema> & {
eventType: 'UserRegistered';
aggregateId: string;
version: number;
};
This schema ensures every event meets our business requirements before storage. Why is this critical? Because invalid events could corrupt our entire system state.
Setting up EventStore is straightforward with Docker. Once running, we connect from our Node.js application:
// src/infrastructure/eventstore/connection.ts
const client = EventStoreDBClient.connectionString(
'esdb://localhost:2113?tls=false'
);
async function testConnection() {
try {
const events = client.readAll({ maxCount: 1 });
for await (const event of events) break; // Connection successful
} catch (error) {
throw new Error('EventStore connection failed');
}
}
Aggregates handle commands and produce events. They enforce business rules before changes occur. For example, when registering a user:
// src/domain/aggregates/user.aggregate.ts
registerUser(command: RegisterUserCommand): UserRegisteredEvent[] {
if (this.state.registered) {
throw new Error('User already registered');
}
return [{
eventType: 'UserRegistered',
aggregateId: command.userId,
version: this.version + 1,
...command.data
}];
}
Projections transform events into read models. They enable efficient queries without impacting write performance. Here’s a simple projection building a user list:
// src/infrastructure/projections/user.projections.ts
const userList = new Map<string, UserProfile>();
eventBus.on('UserRegistered', (event) => {
userList.set(event.aggregateId, {
id: event.aggregateId,
email: event.email,
username: event.username
});
});
CQRS separates reads from writes. Commands change state, while queries retrieve data. This separation allows independent scaling. But how do we handle eventual consistency? We design our UI to reflect that updates might take milliseconds to propagate.
Concurrency control is crucial. EventStore uses optimistic concurrency checks through stream versions:
async function appendEvents(
stream: string,
events: EventData[],
expectedVersion: number
) {
try {
await client.appendToStream(stream, events, {
expectedRevision: expectedVersion
});
} catch (error) {
if (error instanceof WrongExpectedVersionError) {
// Handle concurrency conflict
}
}
}
Testing requires a different approach. We verify that commands produce correct events and that projections build proper read models:
// Tests for user registration
test('registering user emits UserRegistered event', () => {
const user = new UserAggregate();
const events = user.registerUser(validCommand);
expect(events[0].eventType).toBe('UserRegistered');
});
test('projection updates user list on UserRegistered', () => {
emitTestEvent(validUserRegistered);
expect(userList.size).toBe(1);
});
For monitoring, we instrument event handlers and track processing times. Prometheus metrics help identify bottlenecks:
// src/infrastructure/monitoring/metrics.ts
const eventProcessingTime = new prometheus.Histogram({
name: 'event_processing_seconds',
help: 'Time taken to process events',
labelNames: ['event_type']
});
function trackEventProcessing(eventType: string, fn: () => Promise<void>) {
const end = eventProcessingTime.startTimer({ eventType });
await fn();
end();
}
Performance optimization focuses on batch processing and parallel projections. EventStore’s persistent subscriptions handle backpressure automatically, pausing when consumers fall behind.
Compared to message brokers like Kafka, EventStore provides stronger event sourcing guarantees with its append-only storage and stream semantics. For systems requiring audit trails and temporal queries, it’s an excellent choice.
Building this system taught me valuable lessons. Event sourcing requires different thinking patterns but pays dividends in traceability and flexibility. The combination of TypeScript’s type safety with EventStore’s robustness creates systems that can evolve without losing historical context.
If you’re facing similar challenges with audit requirements or complex state transitions, I recommend exploring this architecture. Have you encountered situations where traditional databases limited your auditing capabilities? Share your experiences below. If this approach resonates with you, please like and share this article with others who might benefit.