As a developer working on complex distributed systems, I’ve repeatedly faced challenges with service communication. How do we ensure reliability when components fail? How can we maintain data consistency across microservices? These questions led me to explore event-driven architecture. Today, I’ll share how we built a robust, type-safe system using TypeScript, NestJS, and Redis Streams.
Setting up our environment begins with core dependencies. We install NestJS for our framework foundation and Redis for stream handling. Here’s our dependency setup:
npm install @nestjs/common @nestjs/core redis @nestjs/bull bull class-validator
Our project structure organizes events logically:
src/
├── events/
│ ├── schemas/
│ ├── handlers/
│ └── store/
├── modules/
└── main.ts
Type safety starts with event schemas. We define base event properties and extend them for domain-specific needs. Notice how we use class validators:
// base-event.ts
import { IsUUID, IsDate } from 'class-validator';
export abstract class BaseEvent {
@IsUUID()
eventId: string;
@IsDate()
timestamp: Date;
constructor() {
this.eventId = uuidv4();
this.timestamp = new Date();
}
}
// order-created.event.ts
export class OrderCreatedEvent extends BaseEvent {
@IsString()
readonly eventType = 'OrderCreated';
@IsArray()
items: OrderItem[];
}
Redis Streams became our event backbone. Why choose streams over traditional queues? Streams provide persistence, consumer groups, and message history - crucial for event replay. Here’s our connection setup:
// redis.service.ts
import { createClient } from 'redis';
@Injectable()
export class RedisService {
private client: RedisClientType;
constructor() {
this.client = createClient({ url: 'redis://localhost:6379' });
this.client.connect();
}
async addToStream(stream: string, event: BaseEvent) {
return this.client.xAdd(stream, '*', { ...event });
}
}
For publishing events, we created a decorator-driven approach. This ensures every event passes validation before publishing:
// event-publisher.decorator.ts
export function PublishEvent(stream: string) {
return (target: any, key: string, descriptor: PropertyDescriptor) => {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
const result = await originalMethod.apply(this, args);
const event = result.event;
// Validate against schema
const errors = validateSync(event);
if (errors.length > 0) throw new EventValidationError(errors);
await redisService.addToStream(stream, event);
return result;
};
};
}
Handling events efficiently requires careful design. We implemented consumer groups with dead-letter handling. What happens when an event repeatedly fails? Our system moves it to a dedicated stream for inspection:
// event-consumer.service.ts
@Injectable()
export class EventConsumerService {
async processStream(stream: string, group: string) {
const events = await this.client.xReadGroup(
group, 'consumer1', { key: stream, id: '>' }, { COUNT: 10 }
);
for (const event of events) {
try {
await this.handleEvent(event);
await this.client.xAck(stream, group, event.id);
} catch (error) {
await this.client.xAdd(`${stream}:DLQ`, '*', event);
}
}
}
}
Versioning events presents unique challenges. We implemented a versioning strategy using semantic versioning in our event schemas. When encountering older event formats, we transform them to current specifications:
// event-versioning.util.ts
export function migrateEvent(event: any): BaseEvent {
switch(event.eventVersion) {
case '1.0.0':
return new OrderCreatedV1Adapter(event).convert();
case '1.1.0':
return new OrderCreatedEvent(event);
default:
throw new EventVersionError(event.eventVersion);
}
}
Monitoring event flows is essential in production. We integrated OpenTelemetry to trace events across services:
// event-tracing.interceptor.ts
@Injectable()
export class EventTracingInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler) {
const tracer = trace.getTracer('event-tracer');
return tracer.startActiveSpan('process_event', async span => {
span.setAttribute('event.type', context.getArgByIndex(0).eventType);
const result = await next.handle();
span.end();
return result;
});
}
}
Testing event-driven systems requires simulating real-world conditions. We created a test harness that replays events from specific points in time:
// event-replay.test.ts
describe('Order Workflow', () => {
it('should process abandoned carts', async () => {
const replayDate = new Date(Date.now() - 24*3600*1000);
await eventReplayer.replayFrom('OrderCreated', replayDate);
const abandoned = await orderService.findAbandonedCarts();
expect(abandoned.length).toBeGreaterThan(0);
});
});
In production deployments, we prioritize scalability and resilience. Our Kubernetes configuration includes:
# deployment.yaml
containers:
- name: order-service
env:
- name: REDIS_STREAMS
value: "order-events,payment-events"
resources:
limits:
memory: 512Mi
requests:
cpu: 100m
Common pitfalls we encountered include unordered event processing and schema drift. We addressed these through:
- Partition keys for ordered event sequences
- Schema registry checks during development
- Automated contract testing in CI/CD pipelines
- Monitoring consumer lag metrics
The journey to robust event-driven systems requires careful planning. How might your current architecture benefit from events? What reliability challenges could this approach solve?
This implementation has transformed how our services communicate. We’ve reduced direct dependencies between microservices by 70% while improving system resilience. Events give us audit trails for compliance and enable powerful time-travel debugging.
If you found this practical guide helpful, share it with your team. Have questions or insights about event-driven systems? Leave a comment below - let’s learn together!