Banco de Dados
Redis: Muito Além do Cache
15 de maio de 2026·Paulo Pereira
Redis é um Banco de Dados
A maioria usa Redis apenas como cache de chave-valor. Mas ele é um banco de dados em memória com estruturas ricas: strings, listas, sets, sorted sets, hashes, streams, bitmaps e HyperLogLog.
Setup com Docker
# docker-compose.yml
services:
redis:
image: redis:7-alpine
command: redis-server --save 60 1 --loglevel warning
ports: ["6379:6379"]
volumes: ["redis_data:/data"]Rate Limiting com INCR + EXPIRE
import { Redis } from 'ioredis';
const redis = new Redis();
async function rateLimiter(
userId: string,
limite: number,
janela: number // segundos
): Promise<{ permitido: boolean; restante: number }> {
const chave = `rate:${userId}:${Math.floor(Date.now() / (janela * 1000))}`;
const contagem = await redis.incr(chave);
if (contagem === 1) {
await redis.expire(chave, janela);
}
return {
permitido: contagem <= limite,
restante: Math.max(0, limite - contagem),
};
}
// Middleware Express
app.use(async (req, res, next) => {
const { permitido, restante } = await rateLimiter(req.ip, 100, 60);
res.setHeader('X-RateLimit-Remaining', restante);
if (!permitido) return res.status(429).json({ erro: 'Muitas requisições' });
next();
});Sessões Distribuídas
import redis
import json
import secrets
from datetime import timedelta
r = redis.Redis(decode_responses=True)
def criar_sessao(usuario_id: int, dados: dict) -> str:
session_id = secrets.token_urlsafe(32)
chave = f"sessao:{session_id}"
r.setex(
chave,
timedelta(hours=24),
json.dumps({"usuario_id": usuario_id, **dados})
)
return session_id
def obter_sessao(session_id: str) -> dict | None:
dados = r.get(f"sessao:{session_id}")
return json.loads(dados) if dados else None
def renovar_sessao(session_id: str):
r.expire(f"sessao:{session_id}", timedelta(hours=24))
def invalidar_sessao(session_id: str):
r.delete(f"sessao:{session_id}")Pub/Sub para Eventos em Tempo Real
import threading
import redis
def publicador():
r = redis.Redis()
r.publish('notificacoes', json.dumps({
"tipo": "novo_pedido",
"pedido_id": 12345,
"usuario": "Paulo"
}))
def ouvinte():
r = redis.Redis()
pubsub = r.pubsub()
pubsub.subscribe('notificacoes')
for mensagem in pubsub.listen():
if mensagem['type'] == 'message':
dados = json.loads(mensagem['data'])
processar_evento(dados)
thread = threading.Thread(target=ouvinte, daemon=True)
thread.start()Sorted Sets: Leaderboard em Tempo Real
LEADERBOARD = "ranking:jogo"
def atualizar_pontuacao(usuario: str, pontos: float):
r.zadd(LEADERBOARD, {usuario: pontos})
def top_jogadores(n: int = 10) -> list[dict]:
jogadores = r.zrevrange(LEADERBOARD, 0, n - 1, withscores=True)
return [
{"posicao": i + 1, "usuario": u.decode(), "pontos": int(p)}
for i, (u, p) in enumerate(jogadores)
]
def posicao_usuario(usuario: str) -> int | None:
pos = r.zrevrank(LEADERBOARD, usuario)
return pos + 1 if pos is not None else None
# Top 3 em tempo real
top = top_jogadores(3)
# [{"posicao": 1, "usuario": "ana", "pontos": 9850}, ...]Redis Streams: Event Sourcing Leve
STREAM = "eventos:pedidos"
def publicar_evento(tipo: str, dados: dict):
r.xadd(STREAM, {"tipo": tipo, "dados": json.dumps(dados)})
def consumir_eventos(grupo: str, consumidor: str, count: int = 10):
try:
r.xgroup_create(STREAM, grupo, id='0', mkstream=True)
except redis.ResponseError:
pass # grupo já existe
mensagens = r.xreadgroup(
grupo, consumidor, {STREAM: '>'}, count=count, block=1000
)
for _, eventos in (mensagens or []):
for msg_id, campos in eventos:
yield msg_id, {
"tipo": campos[b"tipo"].decode(),
"dados": json.loads(campos[b"dados"])
}
def confirmar_processamento(msg_id: bytes):
r.xack(STREAM, grupo, msg_id)HyperLogLog: Contagem Aproximada de Únicos
# Conta visitantes únicos com apenas ~12KB de memória
# Independente do número de visitantes
def registrar_visita(pagina: str, usuario_id: str):
chave = f"visitantes:{pagina}:{date.today()}"
r.pfadd(chave, usuario_id)
r.expire(chave, timedelta(days=30))
def visitantes_unicos(pagina: str) -> int:
chave = f"visitantes:{pagina}:{date.today()}"
return r.pfcount(chave) # erro < 1%Distributed Lock (Redlock)
import uuid
def adquirir_lock(recurso: str, ttl_ms: int = 5000) -> str | None:
lock_id = str(uuid.uuid4())
chave = f"lock:{recurso}"
adquirido = r.set(chave, lock_id, px=ttl_ms, nx=True)
return lock_id if adquirido else None
def liberar_lock(recurso: str, lock_id: str):
# Script Lua garante atomicidade
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
r.eval(script, 1, f"lock:{recurso}", lock_id)Conclusão
Redis é um canivete suíço para problemas de infraestrutura: cache, sessões, rate limiting, filas, leaderboards, pub/sub, distributed locks. Cada estrutura de dados foi projetada para um caso de uso específico — conhecê-las evita reinventar a roda.