> rabbitmq
Build messaging systems with RabbitMQ — queues, exchanges, routing, dead-letter queues, delayed messages, priority queues, and RPC patterns. Use when tasks involve task queues, background job processing, inter-service communication, request-reply patterns, or reliable message delivery with acknowledgments.
curl "https://skillshub.wtf/TerminalSkills/skills/rabbitmq?format=md"RabbitMQ
Build reliable messaging and task queue systems for microservice architectures and background processing.
Setup
# docker-compose.yml — RabbitMQ with management UI
services:
rabbitmq:
image: rabbitmq:3.13-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq-data:/var/lib/rabbitmq
volumes:
rabbitmq-data:
Management UI at http://localhost:15672 (admin/secret).
Core Concepts
RabbitMQ routes messages through exchanges to queues. Producers send to exchanges, consumers read from queues. The exchange type determines routing logic:
- Direct — routes by exact routing key match
- Fanout — broadcasts to all bound queues (pub/sub)
- Topic — routes by pattern matching on routing key (
order.*,#.error) - Headers — routes by message header values
Producer and Consumer
Node.js (amqplib)
// connection.ts — Shared connection with auto-reconnect
import amqp, { Channel, Connection } from 'amqplib';
let connection: Connection;
let channel: Channel;
async function getChannel(): Promise<Channel> {
if (channel) return channel;
connection = await amqp.connect('amqp://admin:secret@localhost:5672');
channel = await connection.createChannel();
// Prefetch 10 — each consumer processes up to 10 unacked messages
// Prevents one slow consumer from starving others
await channel.prefetch(10);
connection.on('close', () => {
console.error('RabbitMQ connection closed, reconnecting...');
channel = null!;
setTimeout(getChannel, 5000);
});
return channel;
}
export { getChannel };
// task-producer.ts — Send tasks to a work queue
import { getChannel } from './connection';
/** Send a background task to the processing queue.
*
* @param queue - Target queue name.
* @param task - Task payload.
* @param priority - 0 (normal) to 9 (highest). Requires priority queue.
*/
async function sendTask(queue: string, task: object, priority = 0) {
const ch = await getChannel();
// Assert queue exists with desired properties (idempotent)
await ch.assertQueue(queue, {
durable: true, // Survives broker restart
arguments: {
'x-max-priority': 10, // Enable priority queue
'x-dead-letter-exchange': 'dlx', // Failed messages go here
'x-message-ttl': 86400000, // 24h TTL
},
});
ch.sendToQueue(queue, Buffer.from(JSON.stringify(task)), {
persistent: true, // Write to disk (survives restart)
priority,
contentType: 'application/json',
timestamp: Date.now(),
messageId: crypto.randomUUID(),
});
}
// Example: send an email task
await sendTask('email-queue', {
to: 'user@example.com',
template: 'welcome',
data: { name: 'Alex', activationLink: 'https://app.example.com/activate/abc123' },
});
// High-priority: password reset (skip ahead of marketing emails)
await sendTask('email-queue', {
to: 'user@example.com',
template: 'password-reset',
data: { resetLink: 'https://app.example.com/reset/xyz789' },
}, 9);
// task-consumer.ts — Process tasks with acknowledgment
import { getChannel } from './connection';
/** Start consuming tasks from a queue.
*
* @param queue - Queue to consume from.
* @param handler - Async function that processes each task.
*/
async function startWorker(queue: string, handler: (task: any) => Promise<void>) {
const ch = await getChannel();
await ch.assertQueue(queue, { durable: true });
ch.consume(queue, async (msg) => {
if (!msg) return;
const task = JSON.parse(msg.content.toString());
const msgId = msg.properties.messageId;
try {
await handler(task);
ch.ack(msg); // Success — remove from queue
} catch (err) {
console.error(`Task ${msgId} failed:`, err);
// Check if this has been retried too many times
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
if (retryCount >= 3) {
ch.nack(msg, false, false); // Send to dead-letter queue after 3 retries
} else {
// Requeue with incremented retry count
ch.nack(msg, false, false);
ch.sendToQueue(queue, msg.content, {
...msg.properties,
headers: { ...msg.properties.headers, 'x-retry-count': retryCount + 1 },
});
}
}
});
console.log(`Worker started on queue: ${queue}`);
}
// Start processing emails
startWorker('email-queue', async (task) => {
await sendEmail(task.to, task.template, task.data);
});
For Python, use pika with BlockingConnection, basic_qos(prefetch_count=N), and basic_consume with basic_ack/basic_nack for message handling.
Exchange Patterns
Fanout (broadcast)
Every bound queue gets every message — useful for notifications, cache invalidation, audit logging:
// fanout.ts — Broadcast events to multiple consumers
const ch = await getChannel();
await ch.assertExchange('user-events', 'fanout', { durable: true });
// Each service creates its own queue bound to the exchange
// Email service queue
await ch.assertQueue('user-events-email', { durable: true });
await ch.bindQueue('user-events-email', 'user-events', '');
// Analytics service queue
await ch.assertQueue('user-events-analytics', { durable: true });
await ch.bindQueue('user-events-analytics', 'user-events', '');
// Publish — all bound queues receive the message
ch.publish('user-events', '', Buffer.from(JSON.stringify({
type: 'user.registered',
userId: 'usr-456',
email: 'new@example.com',
})), { persistent: true });
Topic (pattern routing)
Route messages to queues based on pattern matching:
// topic.ts — Route by pattern (e.g., order.created goes to fulfillment, order.* goes to audit)
const ch = await getChannel();
await ch.assertExchange('events', 'topic', { durable: true });
// Fulfillment only cares about new orders
await ch.assertQueue('fulfillment', { durable: true });
await ch.bindQueue('fulfillment', 'events', 'order.created');
// Audit log captures everything
await ch.assertQueue('audit-log', { durable: true });
await ch.bindQueue('audit-log', 'events', '#'); // # matches all routing keys
// Payment service handles payment events
await ch.assertQueue('payment-processing', { durable: true });
await ch.bindQueue('payment-processing', 'events', 'payment.*');
// Publish with routing key
ch.publish('events', 'order.created', Buffer.from(JSON.stringify({ orderId: 'ord-123' })));
ch.publish('events', 'payment.received', Buffer.from(JSON.stringify({ amount: 89.97 })));
Dead-Letter Queue
Failed messages need a place to go for inspection and replay:
// dlq.ts — Dead-letter exchange and queue setup
const ch = await getChannel();
// Dead-letter exchange and queue
await ch.assertExchange('dlx', 'direct', { durable: true });
await ch.assertQueue('dead-letters', { durable: true });
await ch.bindQueue('dead-letters', 'dlx', '');
// Main queue — failed messages automatically route to DLX
await ch.assertQueue('orders', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': '',
},
});
// Monitor dead letters
ch.consume('dead-letters', (msg) => {
if (!msg) return;
const reason = msg.properties.headers?.['x-death']?.[0]?.reason;
console.error(`Dead letter (${reason}):`, msg.content.toString());
// Alert, log to monitoring, or attempt manual replay
ch.ack(msg);
});
Delayed Messages
Use the rabbitmq_delayed_message_exchange plugin. Assert an exchange with type x-delayed-message, then publish messages with headers: { 'x-delay': delayMs } to schedule future delivery.
RPC (Request-Reply)
For request-reply, create an exclusive auto-delete reply queue, send with correlationId and replyTo, and consume the reply queue filtering by correlationId.
Guidelines
- Always acknowledge messages — unacked messages block the queue and prevent other consumers from getting them
- Set prefetch count — without it, RabbitMQ sends all messages to one consumer, starving others
- Use durable queues and persistent messages for anything that matters — in-memory queues vanish on restart
- Dead-letter queues are not optional — without them, failed messages disappear silently
- One queue per consumer type — don't have email and SMS services reading from the same queue
- Idempotent consumers — messages can be delivered more than once (network issues, rebalances). Design handlers to be safe to re-run.
- Monitor queue depth — growing queue length means consumers can't keep up. Alert on it.
- Connection pooling — create one connection with multiple channels, not one connection per operation
> related_skills --same-repo
> zustand
You are an expert in Zustand, the small, fast, and scalable state management library for React. You help developers manage global state without boilerplate using Zustand's hook-based stores, selectors for performance, middleware (persist, devtools, immer), computed values, and async actions — replacing Redux complexity with a simple, un-opinionated API in under 1KB.
> zoho
Integrate and automate Zoho products. Use when a user asks to work with Zoho CRM, Zoho Books, Zoho Desk, Zoho Projects, Zoho Mail, or Zoho Creator, build custom integrations via Zoho APIs, automate workflows with Deluge scripting, sync data between Zoho apps and external systems, manage leads and deals, automate invoicing, build custom Zoho Creator apps, set up webhooks, or manage Zoho organization settings. Covers Zoho CRM, Books, Desk, Projects, Creator, and cross-product integrations.
> zod
You are an expert in Zod, the TypeScript-first schema declaration and validation library. You help developers define schemas that validate data at runtime AND infer TypeScript types at compile time — eliminating the need to write types and validators separately. Used for API input validation, form validation, environment variables, config files, and any data boundary.
> zipkin
Deploy and configure Zipkin for distributed tracing and request flow visualization. Use when a user needs to set up trace collection, instrument Java/Spring or other services with Zipkin, analyze service dependencies, or configure storage backends for trace data.