Building a Distributed Event-Driven Architecture with NestJS, Redis Streams, and TypeScript
I recently faced a challenge in our e-commerce platform where tightly coupled services caused cascading failures during peak sales. Orders would fail because inventory checks timed out, which then prevented notifications from being sent. This pain point led me to explore event-driven architecture using Redis Streams. Let’s build this together.
Why Redis Streams? Unlike traditional pub/sub, Redis Streams provide persistent storage, consumer groups for parallel processing, and message acknowledgments. Ever wondered how systems handle thousands of events without losing data? This is how.
First, we structure our monorepo:
event-driven-ecommerce/
├── packages/
│ ├── shared/ # Reusable events and utilities
│ ├── order-service/
│ ├── inventory-service/
│ └── notification-service/
Our core event definition ensures consistency:
// shared/events/order-events.ts
export class OrderCreatedEvent extends BaseEvent {
constructor(
public readonly orderId: string,
public readonly items: { productId: string; quantity: number }[],
correlationId?: string
) {
super(correlationId);
}
getEventType() { return 'order.created'; }
getData() {
return {
orderId: this.orderId,
items: this.items
};
}
}
Notice the correlationId
? That’s our trace through distributed systems. How do you track requests across services?
Redis Configuration is critical for reliability. We use ioredis with retry logic:
// shared/services/redis-stream.service.ts
@Injectable()
export class RedisStreamService {
private readonly redis = new Redis({
host: process.env.REDIS_HOST,
retryStrategy: (times) => Math.min(times * 500, 5000)
});
async publishEvent(stream: string, event: BaseEvent) {
await this.redis.xadd(stream, '*',
'event', JSON.stringify({
type: event.getEventType(),
data: event.getData(),
metadata: {
eventId: event.eventId,
timestamp: event.timestamp
}
})
);
}
}
Retry strategies prevent network blips from causing data loss. What’s your fallback when cloud services hiccup?
Producers in our order service create events:
// order-service/src/orders.controller.ts
@Post()
async createOrder(@Body() orderDto: CreateOrderDto) {
const order = await this.ordersService.create(orderDto);
const event = new OrderCreatedEvent(
order.id,
orderDto.items,
request.correlationId // Passed via middleware
);
await this.redisStream.publishEvent('orders_stream', event);
return { id: order.id, status: 'processing' };
}
The key here? Return immediately after event publishing. No waiting for downstream systems!
Consumers need robust patterns. Here’s how inventory handles events:
// inventory-service/src/consumers/order-consumer.ts
@Injectable()
export class OrderConsumer {
constructor(
private readonly redisStream: RedisStreamService,
private readonly inventoryService: InventoryService
) {}
async start() {
await this.redisStream.createConsumerGroup('orders_stream', 'inventory-group');
this.redisStream.consumeEvents('orders_stream',
'inventory-group',
'inventory-consumer-1',
async (events) => {
for (const event of events) {
try {
await this.inventoryService.reserveItems(
event.data.items
);
} catch (error) {
await this.handleFailure(event);
}
}
}
);
}
private async handleFailure(event: StreamEvent) {
// Dead letter queue pattern
await this.redisStream.publishEvent('orders_dlq', event);
}
}
See the consumeEvents
loop? That’s where Redis Streams shine - processing batches efficiently. What happens when inventory reservation fails? We route to a dead letter queue.
Consumer Groups enable horizontal scaling. Run multiple instances with unique consumer names:
// In inventory-service main.ts
const consumer1 = app.get(OrderConsumer);
const consumer2 = app.get(OrderConsumer);
consumer1.start('inventory-consumer-1');
consumer2.start('inventory-consumer-2');
Redis automatically load-balances events across consumers. No external orchestrator needed!
Schema Evolution is handled with versioning:
// In consumer logic
if (event.metadata.version === '1.0') {
// Process legacy format
} else if (event.metadata.version === '1.1') {
// New fields
}
Always add new fields instead of modifying existing ones. How do you handle breaking changes?
Monitoring requires tracking event flows:
// Decorator for event logging
export function LogEventDuration() {
return function(target: any, propertyKey: string, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function(...args: any[]) {
const start = Date.now();
const result = await originalMethod.apply(this, args);
const duration = Date.now() - start;
metrics.timing('event.processing.time', duration, {
event_type: args[0].type
});
return result;
}
}
}
// Usage in consumer
@LogEventDuration()
async processEvent(event: StreamEvent) {
// Business logic
}
This captures processing times per event type. Critical for spotting bottlenecks.
Testing producers and consumers separately:
// Test producer
it('should publish order.created event', async () => {
const redisSpy = jest.spyOn(redisStream, 'publishEvent');
await ordersController.createOrder(testOrder);
expect(redisSpy).toHaveBeenCalledWith(
'orders_stream',
expect.objectContaining({
eventType: 'order.created'
})
);
});
// Test consumer with mock stream
const testEvents = [{
id: '1678123456789-0',
event: JSON.stringify({
type: 'order.created',
data: { orderId: 'test-123', items: [...] }
})
}];
await orderConsumer.processEvents(testEvents);
expect(inventoryService.reserveItems).toHaveBeenCalled();
Mock the stream for isolated consumer tests. How do you validate complex workflows?
Performance Tip: Tune Redis memory settings:
# Redis configuration for streams
stream-node-max-bytes 4gb
stream-node-max-entries 1000000
Prevents out-of-memory errors during traffic spikes.
Common pitfalls I’ve encountered:
- Not setting
maxRetriesPerRequest
in Redis client - Forgetting correlation IDs for tracing
- Blocking event loops with synchronous processing
- Not monitoring consumer lag
This architecture now handles 5,000+ events/second in our production environment. Orders, inventory, and notifications operate independently - a failure in one doesn’t cascade. The loose coupling lets us deploy services multiple times daily without downtime.
What challenges have you faced with distributed systems? Share your experiences below! If this approach resonates with you, like this post and share it with your network. Questions? Drop them in comments - I respond to every one.