Why Microservices Need Type-Safe Eventing
After wrestling with distributed system failures at scale, I’ve become obsessed with type safety in event-driven architectures. Loose contracts between services lead to production fires. Let me show you how NestJS, RabbitMQ, and Prisma create bulletproof microservices that scale.
We’ll build three coordinated services:
- User service handles registration
- Order service processes purchases
- Notification service dispatches alerts
Each service owns its data but communicates through strongly typed events. Why gamble with JSON blobs when TypeScript and Zod can validate payloads at runtime?
Laying the Foundation
Our monorepo uses pnpm workspaces with shared code packages. Here’s the core event schema:
// Shared event base class
export abstract class Event {
public readonly id: string;
public readonly type: string;
public readonly timestamp: Date;
constructor(type: string) {
this.id = crypto.randomUUID();
this.type = type;
this.timestamp = new Date();
}
}
// Domain-specific event
export class UserCreatedEvent extends Event {
constructor(
public readonly userId: string,
public readonly email: string
) {
super('user.created');
}
}
Zod validation ensures events follow contracts:
// Validation schema
const OrderCreatedSchema = z.object({
orderId: z.string().uuid(),
items: z.array(z.object({
productId: z.string().uuid(),
quantity: z.number().positive()
})),
status: z.enum(['pending','confirmed'])
});
// Runtime validation
const parseResult = OrderCreatedSchema.safeParse(eventData);
if (!parseResult.success) {
throw new InvalidEventError(parseResult.error);
}
RabbitMQ with Docker
Our docker-compose.yml defines RabbitMQ with management plugin:
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "status"]
In NestJS, we configure connections:
// app.module.ts
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [{ name: 'user_events', type: 'topic' }],
uri: process.env.RABBITMQ_URI,
connectionInitOptions: { wait: false }
})
]
})
Event Publishing in User Service
When a user registers, we publish an event:
// user.controller.ts
@Post()
async createUser(@Body() createUserDto: CreateUserDto) {
const user = await this.usersService.create(createUserDto);
const event = new UserCreatedEvent(user.id, user.email);
// Publish to RabbitMQ
this.amqpConnection.publish('user_events', 'user.created', event);
return user;
}
What happens if the message broker fails mid-publish? We’ll solve that soon.
Consuming Events in Notification Service
Other services subscribe to relevant events:
// notification.service.ts
@RabbitSubscribe({
exchange: 'user_events',
routingKey: 'user.created',
queue: 'notifications_queue'
})
async handleUserCreated(event: UserCreatedEvent) {
await this.mailService.sendWelcomeEmail(event.email);
}
Notice we’re using the same UserCreatedEvent
class from our shared library. Type safety from publisher to consumer!
Database Operations with Prisma
Prisma ensures type-safe database access. Each service has its own schema:
// notification_service/prisma/schema.prisma
model Notification {
id String @id @default(uuid())
userId String
type String
content String
createdAt DateTime @default(now())
}
Transactional outbox pattern prevents data inconsistencies:
// With Prisma transaction
await prisma.$transaction([
prisma.user.create({ data: user }),
prisma.outbox.create({
data: {
eventType: 'user.created',
payload: JSON.stringify(event)
}
})
]);
// Separate process sends outbox to RabbitMQ
Error Handling That Doesn’t Fail
RabbitMQ dead letter exchanges handle poison messages:
@RabbitSubscribe({
exchange: 'user_events',
routingKey: 'user.created',
queue: 'notifications_queue',
queueOptions: {
deadLetterExchange: 'dead_letters',
deadLetterRoutingKey: 'failed_notifications'
},
errorHandler: (channel, msg, error) => {
channel.nack(msg, false, false); // Reject to DLX
}
})
Exponential backoff for retries:
async sendWithRetry(event: Event, attempts = 0) {
try {
await publishToRabbitMQ(event);
} catch (err) {
const delay = 2 ** attempts * 1000;
await new Promise(res => setTimeout(res, delay));
this.sendWithRetry(event, attempts + 1);
}
}
Observability Essentials
Distributed tracing with OpenTelemetry:
// Tracing publisher
const tracer = trace.getTracer('event-producer');
tracer.startActiveSpan('publish.event', span => {
span.setAttribute('event.type', event.type);
this.amqpConnection.publish(exchange, routingKey, event);
span.end();
});
Log correlation IDs through all services:
// Global interceptor
@Injectable()
export class CorrelationIdInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler) {
const request = context.switchToHttp().getRequest();
const correlationId = request.headers['x-correlation-id'] || uuid();
// Attach to logger
Logger.setContext(`[${correlationId}]`);
return next.handle();
}
}
Deployment with Docker Compose
Our production-grade docker-compose.yml:
services:
user_service:
build: ./packages/user-service
depends_on:
rabbitmq:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
rabbitmq:
image: rabbitmq:3-management-alpine
healthcheck:
test: rabbitmq-diagnostics check_port_connectivity
interval: 5s
prometheus:
image: prom/prometheus
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
Lessons from Production
- Schema evolution: Always add new fields as optional
- Consumer idempotency: Handle duplicate events gracefully
- Versioned events: Include schema version in all payloads
What happens when you need to change an event structure? We use schema registries with compatibility checks.
Your Next Steps
I’ve shared battle-tested patterns for robust event-driven systems. Now I’d love to hear your experiences!
👉 Like this approach? Share it with your team!
👉 Have questions? Comments? Let’s discuss below!
👉 Try the complete example repo: github.com/your-repo
What challenges have you faced with microservices? What patterns saved you? Join the conversation!