Node.js
Filas de Jobs com BullMQ e Redis: Processamento Assíncrono
5 de maio de 2024·Paulo de Paula
Todo sistema de produção tem operações que não devem travar a resposta HTTP: envio de email, geração de relatório, processamento de imagem, notificações push. Filas de jobs desacoplam o “disparar” do “executar”.
Quando usar filas
Bom caso de uso:
- Resposta HTTP precisa ser rápida, mas o trabalho demora
- Operação pode falhar e precisa de retry automático
- Múltiplas tarefas do mesmo tipo precisam ser processadas em paralelo controlado
- Rate limiting (enviar max 100 emails/minuto para não ser marcado como spam)
Mau caso de uso:
- Resultado precisa ser devolvido na mesma requisição HTTP
- Lógica simples que completa em menos de 200ms
Instalação e configuração
npm install bullmq ioredis// lib/redis.ts
import Redis from 'ioredis';
export const redisConnection = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // obrigatório para BullMQ
enableReadyCheck: false,
});Criando filas tipadas
// queues/emailQueue.ts
import { Queue } from 'bullmq';
import { redisConnection } from '../lib/redis';
export interface EmailJobData {
to: string;
template: 'boas-vindas' | 'redefinir-senha' | 'compra-confirmada';
payload: Record<string, unknown>;
}
export const emailQueue = new Queue<EmailJobData>('emails', {
connection: redisConnection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
removeOnComplete: { count: 1000 }, // mantém últimos 1000 jobs completados
removeOnFail: { count: 5000 },
},
});Adicionando jobs com opções
// Envio imediato
await emailQueue.add('enviar', {
to: 'usuario@email.com',
template: 'boas-vindas',
payload: { nome: 'Maria' },
});
// Com delay (enviar em 10 minutos)
await emailQueue.add('enviar', {
to: 'usuario@email.com',
template: 'compra-confirmada',
payload: { pedidoId: '123' },
}, {
delay: 10 * 60 * 1000,
});
// Com prioridade (menor número = maior prioridade)
await emailQueue.add('enviar-urgente', dados, { priority: 1 });
await emailQueue.add('enviar-marketing', dados, { priority: 10 });
// Job único (não duplica se já está na fila)
await emailQueue.add('lembrete', dados, {
jobId: `lembrete:${usuarioId}`, // ID fixo previne duplicatas
});Worker com tratamento de erros
// workers/emailWorker.ts
import { Worker, Job } from 'bullmq';
import { redisConnection } from '../lib/redis';
import { EmailJobData } from '../queues/emailQueue';
import { emailService } from '../services/emailService';
export const emailWorker = new Worker<EmailJobData>(
'emails',
async (job: Job<EmailJobData>) => {
const { to, template, payload } = job.data;
// Atualiza progresso (visível no Bull Board)
await job.updateProgress(10);
const html = await renderTemplate(template, payload);
await job.updateProgress(50);
await emailService.enviar({ to, html, subject: getSubject(template) });
await job.updateProgress(100);
return { enviado: true, timestamp: new Date().toISOString() };
},
{
connection: redisConnection,
concurrency: 5, // processa até 5 emails em paralelo
limiter: {
max: 100, // máximo de 100 jobs
duration: 60000, // por minuto (rate limiting)
},
}
);
// Eventos do worker
emailWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completado:`, result);
});
emailWorker.on('failed', (job, error) => {
console.error(`Job ${job?.id} falhou (tentativa ${job?.attemptsMade}):`, error.message);
if (job?.attemptsMade === job?.opts.attempts) {
// Última tentativa — alertar equipe
alertarEquipe(`Email falhou definitivamente: ${job?.data.to}`);
}
});
emailWorker.on('stalled', (jobId) => {
console.warn(`Job ${jobId} travado — worker pode ter caído`);
});Jobs repetíveis (cron)
// Relatório diário às 8h
await emailQueue.add(
'relatorio-diario',
{ template: 'relatorio', payload: {} },
{
repeat: { cron: '0 8 * * *', tz: 'America/Sao_Paulo' },
jobId: 'relatorio-diario', // ID fixo previne duplicatas no restart
}
);
// Limpeza a cada hora
await limpezaQueue.add(
'limpar-sessoes',
{},
{
repeat: { every: 60 * 60 * 1000 }, // a cada 1 hora em ms
}
);Bull Board: interface de monitoramento
// app.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue } from './queues/emailQueue';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/filas');
createBullBoard({
queues: [new BullMQAdapter(emailQueue)],
serverAdapter,
});
// Proteja com autenticação!
app.use('/admin/filas', autenticarAdmin, serverAdapter.getRouter());O Bull Board mostra jobs ativos, aguardando, completados, falhos, e permite retry manual de jobs falhos — essencial para debugging em produção.
Graceful shutdown
// Garante que workers terminam o job atual antes de fechar
process.on('SIGTERM', async () => {
console.log('SIGTERM recebido — encerrando workers...');
await emailWorker.close();
// Espera jobs ativos terminarem (timeout 30s)
await redisConnection.quit();
process.exit(0);
});Padrão: processamento em pipeline
// Encadeamento: ao completar um job, dispara o próximo
emailWorker.on('completed', async (job) => {
if (job.data.template === 'boas-vindas') {
// Após email de boas-vindas, agenda onboarding em 3 dias
await onboardingQueue.add('step-1', { usuarioId: job.data.payload.usuarioId }, {
delay: 3 * 24 * 60 * 60 * 1000,
});
}
});BullMQ com Redis é a combinação padrão para filas em Node.js. Para workloads de alto volume (milhões de jobs/dia), considere o Redis Cluster ou Upstash Redis com suporte nativo a BullMQ.