Asyncio é uma biblioteca para escrever código concorrente com async/await. Ela usa um event loop para gerenciar múltiplas tarefas em uma única thread. Primeiramente, isso difere de threads e processos tradicionais. Por exemplo, uma tarefa pode esperar dados da rede sem bloquear as outras. Além disso, asyncio é extremamente leve e escala para milhares de conexões. Consequentemente, a memória consumida é muito menor que em threads. Assim, você consegue alta concorrência sem custo excessivo. Então, quando utilizar asyncio? Em aplicações com muito I/O. Por exemplo, servidores web, chats, proxies ou crawlers. Da mesma forma, é ótimo para APIs que chamam outras APIs externas. No entanto, asyncio não é indicado para tarefas com CPU intensiva. Portanto, identifique primeiro o tipo do seu gargalo. Vamos explorar conceitos, padrões e boas práticas. Três subtítulos guiarão você pelo mundo assíncrono em Python. Finalmente, você construirá aplicações eficientes e escaláveis.
Conceitos fundamentais: corrotinas, tasks e event loop
Corrotinas são funções definidas com async def.
Elas podem pausar sua execução com await sem bloquear a thread.
Tasks são corrotinas agendadas no event loop para execução.
O event loop gerencia todas as tasks e retoma cada uma no momento certo.
Assim, você consegue concorrência cooperativa muito eficiente.
Por outro lado, programadores iniciantes podem achar o conceito complexo.
Então, quando usar corrotinas? Sempre que você tiver operações de I/O.
Por exemplo, chamadas de rede, acesso a banco ou leitura de arquivos.
Exemplo básico mostrando corrotinas e event loop:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import asyncio import time async def dizer_ola(nome, atraso): """Corrotina que espera e depois imprime.""" print(f"Iniciando saudação para {nome}") await asyncio.sleep(atraso) # Simula I/O, não bloqueia print(f"Olá, {nome}! (após {atraso}s)") return f"Resultado de {nome}" async def main(): print(f"Início: {time.strftime('%H:%M:%S')}") # Executando corrotinas sequencialmente resultado1 = await dizer_ola("Ana", 2) resultado2 = await dizer_ola("Bob", 1) print(f"Sequencial: {resultado1}, {resultado2}") print(f"\nExecutando concorrentemente:") inicio_conc = time.time() # Criando tasks para execução concorrente task1 = asyncio.create_task(dizer_ola("Carlos", 2)) task2 = asyncio.create_task(dizer_ola("Diana", 1)) task3 = asyncio.create_task(dizer_ola("Eva", 1.5)) # Aguardando todas completarem resultados = await asyncio.gather(task1, task2, task3) tempo_conc = time.time() - inicio_conc print(f"Resultados concorrentes: {resultados}") print(f"Tempo concorrente: {tempo_conc:.1f}s (equivalente à tarefa mais longa)") # Usando as_completed para processar conforme terminam print("\nProcessando conforme terminam:") tarefas = [ dizer_ola("Fábio", 2), dizer_ola("Gina", 1), dizer_ola("Hugo", 1.8) ] for corrotina in asyncio.as_completed(tarefas): resultado = await corrotina print(f"Resultado recebido: {resultado}") if __name__ == "__main__": asyncio.run(main()) |
Tasks concorrentes executam em paralelo lógico, não físico. O tempo total é o da tarefa mais longa, não a soma de todas. Portanto, esse é o grande poder do asyncio para I/O.
Padrões práticos: produtor-consumidor e timeouts
Asyncio oferece filas (asyncio.Queue) para comunicação entre tasks.
Assim, você implementa facilmente o padrão produtor-consumidor assíncrono.
Timeouts são essenciais para evitar bloqueios eternos.
Use asyncio.wait_for() para limitar o tempo de espera.
Quando usar filas? Em pipelines de processamento de dados.
Por exemplo, crawlers web com múltiplas etapas.
Exemplo de produtor-consumidor com asyncio:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import asyncio import random async def produtor(queue, id_produtor, n_itens): """Produz itens e coloca na fila.""" for i in range(n_itens): item = f"P{id_produtor}-Item{i}" await queue.put(item) print(f"[Produtor {id_produtor}] Produziu: {item}") await asyncio.sleep(random.uniform(0.1, 0.3)) print(f"[Produtor {id_produtor}] Finalizou") async def consumidor(queue, id_consumidor, nome): """Consome itens da fila.""" while True: try: # Timeout para evitar espera infinita item = await asyncio.wait_for(queue.get(), timeout=1.0) print(f"[Consumidor {id_consumidor} ({nome})] Consumiu: {item}") # Simula processamento do item await asyncio.sleep(random.uniform(0.2, 0.5)) queue.task_done() except asyncio.TimeoutError: print(f"[Consumidor {id_consumidor}] Timeout, verificando se terminou...") if queue.empty() and all(produtor_task.done() for produtor_task in produtor_tasks): break continue async def main(): queue = asyncio.Queue(maxsize=5) # Fila com tamanho máximo global produtor_tasks # Produtores produtor_tasks = [] for i in range(2): p = asyncio.create_task(produtor(queue, i, 5)) produtor_tasks.append(p) # Consumidores consumidor_tasks = [] for i in range(3): c = asyncio.create_task(consumidor(queue, i, f"Cons-{i}")) consumidor_tasks.append(c) # Aguardar produtores await asyncio.gather(*produtor_tasks) print("\nProdutores finalizaram. Aguardando consumidores...") # Aguardar consumidores await asyncio.gather(*consumidor_tasks) print("Todos os consumidores finalizaram") # Exemplo de timeout em operação print("\n=== Exemplo de Timeout ===") try: resultado = await asyncio.wait_for(asyncio.sleep(3), timeout=1) except asyncio.TimeoutError: print("Timeout! Operação levou mais de 1 segundo") if __name__ == "__main__": asyncio.run(main()) |
Filas assíncronas são thread-safe e perfeitas para produtor-consumidor. Timeouts previnem que tarefas travem o sistema indefinidamente. Assim, esses padrões são essenciais para sistemas robustos.
Asyncio vs. threading vs. multiprocessing
Asyncio é leve e escala para milhares de conexões simultâneas. Threading tem overhead maior e sofre com GIL para CPU. Multiprocessing é mais pesado, mas permite paralelismo verdadeiro. Quando escolher asyncio? Em servidores de rede com alta concorrência. Também em clientes que chamam muitas APIs externas. Por outro lado, para CPU intensivo, prefira multiprocessing. A fórmula de eficiência para I/O-bound é clara: \(E_{\text{asyncio}} \approx \frac{M_{\text{thread}}}{M_{\text{asyncio}}} \approx 10\) Portanto, asyncio pode usar 10x menos memória que threads. Exemplo comparativo de desempenho para I/O:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import asyncio import threading import time import aiohttp import requests # Asyncio version async def baixar_asyncio(urls): async with aiohttp.ClientSession() as session: tarefas = [] for url in urls: tarefas.append(session.get(url)) respostas = await asyncio.gather(*tarefas) return [await r.text() for r in respostas] # Threading version def baixar_thread(urls): def fetch(url, idx, resultados): resultados[idx] = requests.get(url).text resultados = [None] * len(urls) threads = [] for i, url in enumerate(urls): t = threading.Thread(target=fetch, args=(url, i, resultados)) threads.append(t) t.start() for t in threads: t.join() return resultados # Simulando muitas requisições async def main(): urls = ['https://httpbin.org/delay/0.1'] * 20 print("Asyncio (muitas conexões leves):") inicio = time.time() await baixar_asyncio(urls) print(f"Tempo: {time.time() - inicio:.2f}s") print("\nThreading (mesmas conexões):") inicio = time.time() baixar_thread(urls) print(f"Tempo: {time.time() - inicio:.2f}s") print("\n=== Servidor Echo Simples ===") # Servidor echo (apenas para demonstração) if __name__ == "__main__": asyncio.run(main()) |
Asyncio é a escolha moderna para aplicações I/O intensivas.
Ele combina performance com simplicidade de código.
Use asyncio.run() como ponto de entrada principal.
Nunca misture código síncrono bloqueante dentro de corrotinas.
Para tarefas CPU-bound, use asyncio.to_thread() para delegar.
Portanto, domine asyncio e suas aplicações serão extremamente eficientes.
Comece com exemplos pequenos e escale para sistemas reais.
Finalmente, o futuro da concorrência em Python é assíncrono.