Python
Python Assíncrono: asyncio e aiohttp na Prática
15 de maio de 2026·Paulo Pereira
O Problema do I/O Bloqueante
Imagine buscar dados de 100 APIs externas de forma síncrona: cada chamada espera a anterior terminar. Com async, você dispara todas simultaneamente e espera pelo mais lento — algo como 100x mais rápido.
Fundamentos do asyncio
import asyncio
async def tarefa(nome: str, delay: float):
print(f"{nome}: iniciando")
await asyncio.sleep(delay) # simula I/O
print(f"{nome}: concluído")
return f"resultado de {nome}"
async def main():
# Executa em paralelo — total ~2s, não ~5s
resultados = await asyncio.gather(
tarefa("A", 2.0),
tarefa("B", 1.0),
tarefa("C", 1.5),
)
print(resultados)
asyncio.run(main())aiohttp: HTTP Assíncrono
import aiohttp
import asyncio
async def buscar_url(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def buscar_varios(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tarefas = [buscar_url(session, url) for url in urls]
return await asyncio.gather(*tarefas)Semáforo para Limitar Concorrência
Quando temos muitos requests, limitar a concorrência evita sobrecarregar o servidor de destino:
async def buscar_com_limite(urls: list[str], max_concurrent: int = 10):
semaforo = asyncio.Semaphore(max_concurrent)
async def buscar_uma(session, url):
async with semaforo:
async with session.get(url) as resp:
return await resp.json()
async with aiohttp.ClientSession() as session:
tarefas = [buscar_uma(session, url) for url in urls]
return await asyncio.gather(*tarefas, return_exceptions=True)Timeout e Tratamento de Erros
from asyncio import TimeoutError
async def buscar_seguro(session, url: str, timeout: float = 5.0):
try:
async with asyncio.timeout(timeout):
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
except TimeoutError:
return {"erro": "timeout", "url": url}
except aiohttp.ClientError as e:
return {"erro": str(e), "url": url}asyncio.Queue: Produtor-Consumidor
async def produtor(queue: asyncio.Queue, itens: list):
for item in itens:
await queue.put(item)
await queue.put(None) # sinal de fim
async def consumidor(queue: asyncio.Queue, worker_id: int):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # repassa o sinal para outros workers
break
await processar(item)
queue.task_done()Dicas de Performance
- Use
asyncio.gatherpara paralelismo de I/O, não de CPU - Para CPU-bound, prefira
ProcessPoolExecutorcomloop.run_in_executor - Evite chamadas síncronas bloqueantes dentro de coroutines (ex:
requests.get) asyncio.create_taské preferível agatherquando você quer iniciar sem aguardar
Conclusão
Async em Python deixou de ser nicho e se tornou essencial para qualquer código que faça I/O em escala. Com asyncio e aiohttp, você ganha throughput significativo sem a complexidade de threads.