Saltar para o conteúdo
Redis: Muito Além do Cache
Banco de Dados

Redis: Muito Além do Cache

15 de maio de 2026·Paulo Pereira

Redis é um Banco de Dados

A maioria usa Redis apenas como cache de chave-valor. Mas ele é um banco de dados em memória com estruturas ricas: strings, listas, sets, sorted sets, hashes, streams, bitmaps e HyperLogLog.

Setup com Docker

# docker-compose.yml
services:
  redis:
    image: redis:7-alpine
    command: redis-server --save 60 1 --loglevel warning
    ports: ["6379:6379"]
    volumes: ["redis_data:/data"]

Rate Limiting com INCR + EXPIRE

import { Redis } from 'ioredis';

const redis = new Redis();

async function rateLimiter(
  userId: string,
  limite: number,
  janela: number  // segundos
): Promise<{ permitido: boolean; restante: number }> {
  const chave = `rate:${userId}:${Math.floor(Date.now() / (janela * 1000))}`;

  const contagem = await redis.incr(chave);
  if (contagem === 1) {
    await redis.expire(chave, janela);
  }

  return {
    permitido: contagem <= limite,
    restante: Math.max(0, limite - contagem),
  };
}

// Middleware Express
app.use(async (req, res, next) => {
  const { permitido, restante } = await rateLimiter(req.ip, 100, 60);
  res.setHeader('X-RateLimit-Remaining', restante);
  if (!permitido) return res.status(429).json({ erro: 'Muitas requisições' });
  next();
});

Sessões Distribuídas

import redis
import json
import secrets
from datetime import timedelta

r = redis.Redis(decode_responses=True)

def criar_sessao(usuario_id: int, dados: dict) -> str:
    session_id = secrets.token_urlsafe(32)
    chave = f"sessao:{session_id}"
    r.setex(
        chave,
        timedelta(hours=24),
        json.dumps({"usuario_id": usuario_id, **dados})
    )
    return session_id

def obter_sessao(session_id: str) -> dict | None:
    dados = r.get(f"sessao:{session_id}")
    return json.loads(dados) if dados else None

def renovar_sessao(session_id: str):
    r.expire(f"sessao:{session_id}", timedelta(hours=24))

def invalidar_sessao(session_id: str):
    r.delete(f"sessao:{session_id}")

Pub/Sub para Eventos em Tempo Real

import threading
import redis

def publicador():
    r = redis.Redis()
    r.publish('notificacoes', json.dumps({
        "tipo": "novo_pedido",
        "pedido_id": 12345,
        "usuario": "Paulo"
    }))

def ouvinte():
    r = redis.Redis()
    pubsub = r.pubsub()
    pubsub.subscribe('notificacoes')

    for mensagem in pubsub.listen():
        if mensagem['type'] == 'message':
            dados = json.loads(mensagem['data'])
            processar_evento(dados)

thread = threading.Thread(target=ouvinte, daemon=True)
thread.start()

Sorted Sets: Leaderboard em Tempo Real

LEADERBOARD = "ranking:jogo"

def atualizar_pontuacao(usuario: str, pontos: float):
    r.zadd(LEADERBOARD, {usuario: pontos})

def top_jogadores(n: int = 10) -> list[dict]:
    jogadores = r.zrevrange(LEADERBOARD, 0, n - 1, withscores=True)
    return [
        {"posicao": i + 1, "usuario": u.decode(), "pontos": int(p)}
        for i, (u, p) in enumerate(jogadores)
    ]

def posicao_usuario(usuario: str) -> int | None:
    pos = r.zrevrank(LEADERBOARD, usuario)
    return pos + 1 if pos is not None else None

# Top 3 em tempo real
top = top_jogadores(3)
# [{"posicao": 1, "usuario": "ana", "pontos": 9850}, ...]

Redis Streams: Event Sourcing Leve

STREAM = "eventos:pedidos"

def publicar_evento(tipo: str, dados: dict):
    r.xadd(STREAM, {"tipo": tipo, "dados": json.dumps(dados)})

def consumir_eventos(grupo: str, consumidor: str, count: int = 10):
    try:
        r.xgroup_create(STREAM, grupo, id='0', mkstream=True)
    except redis.ResponseError:
        pass  # grupo já existe

    mensagens = r.xreadgroup(
        grupo, consumidor, {STREAM: '>'}, count=count, block=1000
    )

    for _, eventos in (mensagens or []):
        for msg_id, campos in eventos:
            yield msg_id, {
                "tipo": campos[b"tipo"].decode(),
                "dados": json.loads(campos[b"dados"])
            }

def confirmar_processamento(msg_id: bytes):
    r.xack(STREAM, grupo, msg_id)

HyperLogLog: Contagem Aproximada de Únicos

# Conta visitantes únicos com apenas ~12KB de memória
# Independente do número de visitantes

def registrar_visita(pagina: str, usuario_id: str):
    chave = f"visitantes:{pagina}:{date.today()}"
    r.pfadd(chave, usuario_id)
    r.expire(chave, timedelta(days=30))

def visitantes_unicos(pagina: str) -> int:
    chave = f"visitantes:{pagina}:{date.today()}"
    return r.pfcount(chave)  # erro < 1%

Distributed Lock (Redlock)

import uuid

def adquirir_lock(recurso: str, ttl_ms: int = 5000) -> str | None:
    lock_id = str(uuid.uuid4())
    chave = f"lock:{recurso}"
    adquirido = r.set(chave, lock_id, px=ttl_ms, nx=True)
    return lock_id if adquirido else None

def liberar_lock(recurso: str, lock_id: str):
    # Script Lua garante atomicidade
    script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    r.eval(script, 1, f"lock:{recurso}", lock_id)

Conclusão

Redis é um canivete suíço para problemas de infraestrutura: cache, sessões, rate limiting, filas, leaderboards, pub/sub, distributed locks. Cada estrutura de dados foi projetada para um caso de uso específico — conhecê-las evita reinventar a roda.