Let me share why event-driven systems have been on my mind lately. In my recent work with distributed systems, I noticed how quickly untyped events lead to debugging nightmares. That’s when I turned to TypeScript and Redis Streams to build something better. If you’ve struggled with event chaos in Node.js applications, you’ll find this practical approach valuable. Stick with me, and I’ll show you how to implement a robust solution that scales. Don’t forget to share your thoughts in the comments!
Setting up the foundation is straightforward. We begin with a new Node.js project and essential dependencies:
npm init -y
npm install redis ioredis zod uuid winston
npm install -D @types/node @types/uuid typescript ts-node
Our TypeScript configuration (tsconfig.json
) enables strict type checking and modern features. The project structure organizes events, infrastructure, and services logically. Why does this matter? A clean setup prevents complexity creep as your system grows.
For event schemas, Zod provides validation superpowers. Consider this base event structure:
// BaseEvent.ts
import { z } from 'zod';
export const BaseEventSchema = z.object({
id: z.string().uuid(),
type: z.string(),
aggregateId: z.string(),
timestamp: z.date(),
version: z.number().positive()
});
export type BaseEvent = z.infer<typeof BaseEventSchema>;
Specific events extend this foundation. Here’s a user registration event:
// UserEvents.ts
export const UserRegisteredSchema = BaseEventSchema.extend({
type: z.literal('UserRegistered'),
data: z.object({
email: z.string().email(),
name: z.string().min(1)
})
});
export class UserRegisteredEvent {
constructor(
public readonly aggregateId: string,
public readonly data: { email: string; name: string }
) {}
}
Notice how we enforce email formats and name requirements? This prevents invalid data from entering our system. Have you ever traced a bug to malformed event data? This approach eliminates that.
Redis Streams power our event bus. We initialize the client with retry logic for resilience:
// RedisClient.ts
import Redis from 'ioredis';
export class RedisClient {
private static instance: Redis;
static getInstance(): Redis {
if (!this.instance) {
this.instance = new Redis(process.env.REDIS_URL, {
retryStrategy: (times) => Math.min(times * 500, 5000)
});
this.instance.on('error', (err) =>
console.error('Redis error:', err)
);
}
return this.instance;
}
}
Publishing events becomes type-safe and straightforward:
// EventPublisher.ts
const redis = RedisClient.getInstance();
export async function publishEvent(stream: string, event: BaseEvent) {
await redis.xadd(stream, '*',
'event', JSON.stringify(event)
);
}
// Usage
const newUserEvent = new UserRegisteredEvent(
'user-123',
{ email: '[email protected]', name: 'Alex' }
);
publishEvent('users', newUserEvent);
What happens if a consumer fails? We implement consumer groups with dead letter queues:
// EventConsumer.ts
async function createConsumerGroup(stream: string, group: string) {
try {
await redis.xgroup('CREATE', stream, group, '0', 'MKSTREAM');
} catch (err) {
if (err.message !== 'BUSYGROUP') throw err;
}
}
async function processEvents(stream: string, group: string, consumer: string) {
while (true) {
const events = await redis.xreadgroup(
'GROUP', group, consumer,
'COUNT', '10', 'STREAMS', stream, '>'
);
if (!events) continue;
for (const event of events[0][1]) {
try {
const parsed = JSON.parse(event[1][1]);
// Processing logic here
await redis.xack(stream, group, event[0]);
} catch (err) {
await redis.xadd(`${stream}:dlq`, '*',
'original', JSON.stringify(event),
'error', err.message
);
}
}
}
}
Error handling shines here. Failed events move to a dead letter queue for analysis without blocking the main stream. How often have you seen one bad event halt an entire system? This pattern prevents that.
For event sourcing, we reconstruct state by replaying events:
// UserAggregate.ts
export class UserAggregate {
constructor(public id: string, private events: BaseEvent[] = []) {}
applyEvent(event: BaseEvent) {
switch (event.type) {
case 'UserRegistered':
// State update logic
break;
}
this.events.push(event);
}
static async loadFromHistory(id: string) {
const events = await redis.xrange(`user:${id}`, '-', '+');
return events.reduce((agg, event) =>
agg.applyEvent(JSON.parse(event[1][1])),
new UserAggregate(id)
);
}
}
Monitoring ties everything together. Winston logs key actions:
// logger.ts
import winston from 'winston';
export const logger = winston.createLogger({
transports: [
new winston.transports.Console({
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
)
})
]
});
// In consumer
logger.info('Processing event', { eventId: event.id });
Testing strategies include integration tests with a local Redis instance. We verify event publishing, consumption, and error scenarios. What’s your approach to testing event flows? Share your experiences below!
Performance matters. We batch event processing and optimize Redis configurations:
# redis.conf
stream-node-max-entries 100000
maxmemory-policy volatile-lru
Common pitfalls? Avoid overloading streams and always set max entry limits. Use consumer groups properly to prevent event loss. Type safety isn’t optional—it’s your first defense against runtime errors.
This approach transformed how I build resilient systems. The combination of TypeScript’s types, Zod’s validation, and Redis Streams’ reliability creates a foundation you can trust. If you implement this, start small and expand as needed.
Found this useful? Help others discover it—like and share this article. Questions or improvements? Let’s discuss in the comments! Your feedback shapes future content.