Saltar para o conteúdo
Filas de Jobs com BullMQ e Redis: Processamento Assíncrono
Node.js

Filas de Jobs com BullMQ e Redis: Processamento Assíncrono

5 de maio de 2024·Paulo de Paula

Todo sistema de produção tem operações que não devem travar a resposta HTTP: envio de email, geração de relatório, processamento de imagem, notificações push. Filas de jobs desacoplam o “disparar” do “executar”.

Quando usar filas

Bom caso de uso:

  • Resposta HTTP precisa ser rápida, mas o trabalho demora
  • Operação pode falhar e precisa de retry automático
  • Múltiplas tarefas do mesmo tipo precisam ser processadas em paralelo controlado
  • Rate limiting (enviar max 100 emails/minuto para não ser marcado como spam)

Mau caso de uso:

  • Resultado precisa ser devolvido na mesma requisição HTTP
  • Lógica simples que completa em menos de 200ms

Instalação e configuração

npm install bullmq ioredis
// lib/redis.ts
import Redis from 'ioredis';

export const redisConnection = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null, // obrigatório para BullMQ
  enableReadyCheck: false,
});

Criando filas tipadas

// queues/emailQueue.ts
import { Queue } from 'bullmq';
import { redisConnection } from '../lib/redis';

export interface EmailJobData {
  to: string;
  template: 'boas-vindas' | 'redefinir-senha' | 'compra-confirmada';
  payload: Record<string, unknown>;
}

export const emailQueue = new Queue<EmailJobData>('emails', {
  connection: redisConnection,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000, // 1s, 2s, 4s
    },
    removeOnComplete: { count: 1000 }, // mantém últimos 1000 jobs completados
    removeOnFail: { count: 5000 },
  },
});

Adicionando jobs com opções

// Envio imediato
await emailQueue.add('enviar', {
  to: 'usuario@email.com',
  template: 'boas-vindas',
  payload: { nome: 'Maria' },
});

// Com delay (enviar em 10 minutos)
await emailQueue.add('enviar', {
  to: 'usuario@email.com',
  template: 'compra-confirmada',
  payload: { pedidoId: '123' },
}, {
  delay: 10 * 60 * 1000,
});

// Com prioridade (menor número = maior prioridade)
await emailQueue.add('enviar-urgente', dados, { priority: 1 });
await emailQueue.add('enviar-marketing', dados, { priority: 10 });

// Job único (não duplica se já está na fila)
await emailQueue.add('lembrete', dados, {
  jobId: `lembrete:${usuarioId}`, // ID fixo previne duplicatas
});

Worker com tratamento de erros

// workers/emailWorker.ts
import { Worker, Job } from 'bullmq';
import { redisConnection } from '../lib/redis';
import { EmailJobData } from '../queues/emailQueue';
import { emailService } from '../services/emailService';

export const emailWorker = new Worker<EmailJobData>(
  'emails',
  async (job: Job<EmailJobData>) => {
    const { to, template, payload } = job.data;

    // Atualiza progresso (visível no Bull Board)
    await job.updateProgress(10);

    const html = await renderTemplate(template, payload);
    await job.updateProgress(50);

    await emailService.enviar({ to, html, subject: getSubject(template) });
    await job.updateProgress(100);

    return { enviado: true, timestamp: new Date().toISOString() };
  },
  {
    connection: redisConnection,
    concurrency: 5, // processa até 5 emails em paralelo
    limiter: {
      max: 100,      // máximo de 100 jobs
      duration: 60000, // por minuto (rate limiting)
    },
  }
);

// Eventos do worker
emailWorker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completado:`, result);
});

emailWorker.on('failed', (job, error) => {
  console.error(`Job ${job?.id} falhou (tentativa ${job?.attemptsMade}):`, error.message);
  
  if (job?.attemptsMade === job?.opts.attempts) {
    // Última tentativa — alertar equipe
    alertarEquipe(`Email falhou definitivamente: ${job?.data.to}`);
  }
});

emailWorker.on('stalled', (jobId) => {
  console.warn(`Job ${jobId} travado — worker pode ter caído`);
});

Jobs repetíveis (cron)

// Relatório diário às 8h
await emailQueue.add(
  'relatorio-diario',
  { template: 'relatorio', payload: {} },
  {
    repeat: { cron: '0 8 * * *', tz: 'America/Sao_Paulo' },
    jobId: 'relatorio-diario', // ID fixo previne duplicatas no restart
  }
);

// Limpeza a cada hora
await limpezaQueue.add(
  'limpar-sessoes',
  {},
  {
    repeat: { every: 60 * 60 * 1000 }, // a cada 1 hora em ms
  }
);

Bull Board: interface de monitoramento

// app.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue } from './queues/emailQueue';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/filas');

createBullBoard({
  queues: [new BullMQAdapter(emailQueue)],
  serverAdapter,
});

// Proteja com autenticação!
app.use('/admin/filas', autenticarAdmin, serverAdapter.getRouter());

O Bull Board mostra jobs ativos, aguardando, completados, falhos, e permite retry manual de jobs falhos — essencial para debugging em produção.

Graceful shutdown

// Garante que workers terminam o job atual antes de fechar
process.on('SIGTERM', async () => {
  console.log('SIGTERM recebido — encerrando workers...');
  
  await emailWorker.close();
  // Espera jobs ativos terminarem (timeout 30s)
  
  await redisConnection.quit();
  process.exit(0);
});

Padrão: processamento em pipeline

// Encadeamento: ao completar um job, dispara o próximo
emailWorker.on('completed', async (job) => {
  if (job.data.template === 'boas-vindas') {
    // Após email de boas-vindas, agenda onboarding em 3 dias
    await onboardingQueue.add('step-1', { usuarioId: job.data.payload.usuarioId }, {
      delay: 3 * 24 * 60 * 60 * 1000,
    });
  }
});

BullMQ com Redis é a combinação padrão para filas em Node.js. Para workloads de alto volume (milhões de jobs/dia), considere o Redis Cluster ou Upstash Redis com suporte nativo a BullMQ.