I’ve been working with microservices for several years, and recently found myself needing a more resilient communication system between services. After evaluating several options, Redis Streams stood out as a powerful solution for event-driven architectures. The combination of persistence, consumer groups, and low latency made it ideal for our needs at scale. What if I told you could build a production-ready event bus in just a few hours?
Let me show you how to implement this with NestJS and TypeScript. First, we’ll set up our project structure and core dependencies:
mkdir event-driven-microservices
cd event-driven-microservices
npm init -y
npm install @nestjs/{core,common,microservices} redis class-validator
For Redis configuration, we’ll use Docker Compose:
# docker-compose.yml
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
Now, let’s create our Redis connection module:
// shared/redis.module.ts
import { Module, Global } from '@nestjs/common';
import { createClient } from 'redis';
@Global()
@Module({
providers: [
{
provide: 'REDIS_CLIENT',
useFactory: async () => {
const client = createClient({ url: 'redis://localhost:6379' });
await client.connect();
return client;
}
}
],
exports: ['REDIS_CLIENT'],
})
export class RedisModule {}
Our event bus needs a standardized event structure. Here’s our base event class:
// shared/events/base-event.ts
export abstract class BaseEvent {
id: string;
aggregateId: string;
timestamp: string;
constructor(partial: Partial<BaseEvent>) {
this.id = partial.id || uuidv4();
this.timestamp = partial.timestamp || new Date().toISOString();
}
}
For publishing events, we implement this service:
// services/event-bus.service.ts
import { Inject, Injectable } from '@nestjs/common';
import type { RedisClientType } from 'redis';
@Injectable()
export class EventBusService {
constructor(
@Inject('REDIS_CLIENT') private readonly redis: RedisClientType
) {}
async publish(stream: string, event: BaseEvent): Promise<string> {
return this.redis.xAdd(stream, '*', { event: JSON.stringify(event) });
}
}
Now, what happens when we need multiple services to process the same events? Redis Consumer Groups solve this elegantly. Here’s how we create a consumer:
// services/order-consumer.service.ts
@Injectable()
export class OrderConsumer {
private groupCreated = false;
constructor(
@Inject('REDIS_CLIENT') private readonly redis: RedisClientType
) {}
async startConsumer(stream: string, group: string) {
if (!this.groupCreated) {
try {
await this.redis.xGroupCreate(stream, group, '0', { MKSTREAM: true });
} catch (e) { /* Group exists */ }
this.groupCreated = true;
}
while (true) {
const events = await this.redis.xReadGroup(
group, 'order-service', { key: stream, id: '>' }, { COUNT: 10 }
);
for (const event of events) {
try {
// Process event
await this.redis.xAck(stream, group, event.id);
} catch (error) {
console.error(`Processing failed: ${event.id}`, error);
}
}
}
}
}
But how do we ensure we don’t lose messages during failures? Dead letter queues are essential:
// error-handling.decorator.ts
export function DeadLetterQueue(stream: string) {
return function(target: any, key: string, descriptor: PropertyDescriptor) {
const originalMethod = descriptor.value;
descriptor.value = async function(...args: any[]) {
try {
return await originalMethod.apply(this, args);
} catch (error) {
const event = args[0];
await this.redis.xAdd(`${stream}:DLQ`, '*', {
originalEvent: JSON.stringify(event),
error: error.message,
timestamp: new Date().toISOString()
});
throw error;
}
};
return descriptor;
};
}
For monitoring, we can implement tracing with minimal overhead:
// tracing.interceptor.ts
@Injectable()
export class TracingInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler) {
const request = context.switchToHttp().getRequest();
const correlationId = request.headers['x-correlation-id'] || uuidv4();
return next.handle().pipe(
tap(() => {
this.redis.zAdd('event:trace', {
score: Date.now(),
value: JSON.stringify({
correlationId,
service: context.getClass().name,
timestamp: new Date().toISOString()
})
});
})
);
}
}
When testing, remember to verify both happy paths and failure scenarios. How might we simulate network partitions or Redis failures? Use a library like jest-redis
to mock Redis behavior:
// event-bus.service.spec.ts
import { Test } from '@nestjs/testing';
import { mockRedis } from 'jest-redis';
describe('EventBusService', () => {
let service: EventBusService;
beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
EventBusService,
{ provide: 'REDIS_CLIENT', useFactory: mockRedis.createClient }
]
}).compile();
service = module.get(EventBusService);
});
it('should publish events', async () => {
const publishSpy = jest.spyOn(service, 'publish');
await service.publish('orders', new OrderCreatedEvent());
expect(publishSpy).toHaveBeenCalled();
});
});
For production deployments, consider these performance optimizations:
- Pipeline multiple commands
- Use Lua scripts for complex operations
- Monitor memory usage with
INFO memory
- Adjust consumer group parameters based on load
We’ve covered the essentials, but remember that every system has unique requirements. What additional safeguards would you implement for financial transactions versus social notifications? The patterns remain similar, but the rigor differs significantly.
I’d love to hear about your experiences with event-driven architectures! If you found this useful, please share it with others who might benefit. Have you implemented something similar? What challenges did you face? Let me know in the comments!