Recently, I found myself architecting a system that needed to handle thousands of asynchronous events per second. The challenge wasn’t just about speed; it was about building something resilient, something that wouldn’t lose a single message even under heavy load. This led me to combine Fastify’s raw performance with the durable messaging of Redis Streams, all wrapped in the safety net of TypeScript. I want to share this approach with you.
Have you ever considered what happens to a message if a service crashes while processing it?
Let’s start with the foundation. An event-driven architecture decouples services, allowing them to communicate through events rather than direct API calls. This is key for building scalable systems. Redis Streams, specifically, is a game-changer compared to older pub/sub patterns. It persists messages, supports multiple consumer groups for load balancing, and provides acknowledgment mechanisms. This means you can build systems that are both fast and reliable.
Here’s a basic setup for our Redis Streams client. Notice how TypeScript helps us define clear interfaces for our events.
// types/events.ts
export interface OrderCreatedEvent {
id: string;
type: 'order.created';
timestamp: Date;
payload: {
orderId: string;
customerId: string;
totalAmount: number;
};
}
Connecting to Redis is straightforward, but building in resilience from the start is crucial. We’ll use ioredis
for its robust connection handling.
// services/redis-streams.service.ts
import { Redis } from 'ioredis';
export class RedisStreamsService {
private redis: Redis;
constructor() {
this.redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379');
this.redis.on('error', (err) => console.error('Redis Client Error', err));
}
async publishEvent(streamKey: string, event: object): Promise<string> {
return this.redis.xadd(streamKey, '*', 'event', JSON.stringify(event));
}
}
Now, let’s integrate this with Fastify. Fastify’s plugin system is perfect for encapsulating functionality. We’ll create a plugin to register our Redis service.
// plugins/redis.ts
import fp from 'fastify-plugin';
import { RedisStreamsService } from '../services/redis-streams.service';
declare module 'fastify' {
interface FastifyInstance {
redisStreams: RedisStreamsService;
}
}
export default fp(async (fastify) => {
const redisService = new RedisStreamsService();
fastify.decorate('redisStreams', redisService);
fastify.addHook('onClose', async (instance) => {
// Cleanup on server shutdown
});
});
What if you need multiple services to process the same stream of events without duplicating work? This is where consumer groups shine. They allow you to parallelize processing while ensuring each message is handled by only one consumer in the group.
Here’s how you can create a consumer group and start reading messages. The >
symbol tells Redis to only deliver new messages.
// services/order-processor.service.ts
async startOrderConsumer(groupName: string, consumerName: string) {
const streamKey = 'orders';
try {
// Create the consumer group if it doesn't exist
await this.redis.xgroup('CREATE', streamKey, groupName, '0', 'MKSTREAM');
} catch (e) {
// Group likely already exists
}
while (true) {
const results = await this.redis.xreadgroup(
'GROUP', groupName, consumerName,
'BLOCK', 1000, 'STREAMS', streamKey, '>'
);
if (results) {
for (const [stream, messages] of results) {
for (const [id, fields] of messages) {
await this.processOrderMessage(id, fields);
// Acknowledge the message to remove it from the pending list
await this.redis.xack(streamKey, groupName, id);
}
}
}
}
}
Error handling is non-negotiable. What should happen when a message fails to process? A good pattern is to move failed messages to a dead-letter stream after several retries. This prevents one bad message from blocking the entire queue.
async processOrderMessage(messageId: string, fields: string[]) {
const maxRetries = 3;
let retryCount = 0;
let success = false;
while (retryCount < maxRetries && !success) {
try {
const eventData = JSON.parse(fields[1]); // Assuming 'event' is at index 1
// Your business logic here
await this.handleOrderCreated(eventData);
success = true;
} catch (error) {
retryCount++;
if (retryCount === maxRetries) {
await this.redis.xadd('orders:dead-letter', '*', 'originalMessageId', messageId, 'error', error.message);
}
}
}
}
Monitoring is your window into the system. You need to know how many messages are pending, how fast you’re processing them, and if consumers are healthy. Fastify’s metrics plugin can expose this data.
// routes/metrics.ts
import { FastifyInstance } from 'fastify';
export default async function (fastify: FastifyInstance) {
fastify.get('/metrics', async (request, reply) => {
const pendingInfo = await fastify.redisStreams.redis.xpending('orders', 'order-processors');
return { pendingMessages: pendingInfo };
});
}
Finally, how do you know your service is ready to handle real traffic? Docker Compose lets you define your entire stack for local development and testing.
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes
app:
build: .
ports:
- "3000:3000"
environment:
- REDIS_URL=redis://redis:6379
depends_on:
- redis
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 30s
timeout: 10s
retries: 3
Building this system taught me that performance isn’t just about raw speed. It’s about designing for failure, ensuring data integrity, and having clear visibility into your application’s behavior. The combination of Fastify, TypeScript, and Redis Streams provides a powerful foundation for building microservices that are not only fast but also robust and maintainable.
What challenges have you faced with asynchronous processing? I’d love to hear about your experiences. If you found this guide helpful, please share it with your network and leave a comment below.