Servidores tradicionais usam um thread por conexão.
Isso não escala bem para milhares de clientes simultâneos.
Uma abordagem mais eficiente é o modelo event-driven.
Python oferece o módulo asyncio para isso.
Ele permite que um único thread gerencie muitas conexões.
Primeiramente, entenda que asyncio é baseado em corrotinas.
Essas funções podem pausar e retomar voluntariamente.
Enquanto uma espera dados, outra corrotina executa.
Isso é chamado de concorrência cooperativa.
Por essa razão, o código é muito mais leve que threads.
Além disso, evita problemas clássicos de concorrência.
Portanto, asyncio é ideal para servidores de rede.
Como funciona o modelo event-driven com asyncio
No centro do asyncio existe um event loop. Ele gerencia todas as tarefas e operações de I/O. Quando uma corrotina aguarda I/O, ela cede o controle. O event loop então executa outra corrotina pronta. Assim que o I/O completa, a primeira é retomada. Isso é semelhante a sistemas como Node.js. No entanto, Python tem sintaxe mais clara com async/await. Uma fórmula simples representa esse comportamento:
time.sleep() ou código CPU intensivo.
Para essas tarefas, use run_in_executor() como alternativa.
Assim, o loop principal continua responsivo.
Outra característica importante é o suporte a protocolos.
asyncio fornece classes como Protocol e Transport.
Elas abstraem detalhes de sockets e buffers de dados.
Isso foi projetado para facilitar a criação de servidores.
Você implementa métodos como data_received().
O event loop chama esses métodos automaticamente.
Portanto, você foca na lógica de negócio, não no gerenciamento.
Quando utilizar servidores assíncronos
Use servidores asyncio para aplicações com muitas conexões. Chats em tempo real são exemplos perfeitos. Servidores de jogos multiplayer também se beneficiam. APIs REST com alto tráfego podem usar asyncio. WebSockets e sistemas de notificação são casos clássicos. Por outro lado, evite asyncio para tarefas CPU-bound. Processamento de imagens ou cálculos pesados não funcionam bem. Para esses casos, prefira multiprocessamento. Além disso, asyncio não acelera operações sequenciais simples. Se você tem poucos clientes, um servidor síncrono basta. Primeiramente, avalie o perfil de I/O da sua aplicação. Se há muita espera por rede ou disco, asyncio é ideal.
Outro bom uso é para proxies e gateways de API. Eles precisam encaminhar requisições rapidamente. Servidores de arquivos com muitos downloads simultâneos também. Frameworks populares como FastAPI usam asyncio por baixo. Isso foi adotado por empresas como Uber e Netflix. Portanto, dominar asyncio é um diferencial profissional. Então, comece com projetos pequenos e evolua gradualmente.
Exemplo prático: servidor echo assíncrono
O código abaixo implementa um servidor echo simples.
Ele recebe mensagens de clientes e as devolve de volta.
Este é o “Hello World” dos servidores de rede.
Usamos o módulo asyncio.streams para facilitar.
As funções asyncio.start_server cria o servidor.
Cada conexão é tratada por uma corrotina separada.
Observe que não há threads explícitas em lugar algum.
A concorrência é gerenciada pelo event loop.
Isso permite milhares de conexões simultâneas.
Vamos ao código comentado para entendimento.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
import asyncio import time from datetime import datetime # ============================================ # SERVIDOR ECHO ASSÍNCRONO # ============================================ async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): """ Corrotina que atende um cliente individual. É chamada automaticamente para cada nova conexão. """ client_address = writer.get_extra_info('peername') print(f"[{datetime.now().time()}] Cliente conectado: {client_address}") try: while True: # Lê dados do cliente (até 1024 bytes) data = await reader.read(1024) if not data: # Cliente fechou a conexão break message = data.decode() print(f"Recebido de {client_address}: {message.strip()}") # Ecoa a mensagem de volta writer.write(f"Echo: {message}".encode()) await writer.drain() # Garante que os dados foram enviados # Simula processamento (operação não-bloqueante) await asyncio.sleep(0.01) # Pequena pausa controlada except ConnectionResetError: print(f"Conexão resetada por {client_address}") except Exception as e: print(f"Erro com {client_address}: {e}") finally: print(f"Cliente desconectado: {client_address}") writer.close() await writer.wait_closed() async def run_server(host='127.0.0.1', port=8888): """ Inicia o servidor assíncrono. """ server = await asyncio.start_server( handle_client, host, port, start_serving=True ) print(f"Servidor rodando em {host}:{port}") print(f"Conecte com 'telnet {host} {port}' ou um cliente TCP") async with server: await server.serve_forever() # ============================================ # CLIENTE DE TESTE (opcional) # ============================================ async def test_client(messages: list, host='127.0.0.1', port=8888): """ Cliente simples para testar o servidor. """ reader, writer = await asyncio.open_connection(host, port) print(f"Conectado ao servidor {host}:{port}") for msg in messages: writer.write(msg.encode()) await writer.drain() print(f"Enviado: {msg.strip()}") data = await reader.read(1024) print(f"Resposta: {data.decode().strip()}") await asyncio.sleep(1) writer.close() await writer.wait_closed() print("Cliente finalizado") # ============================================ # PONTO DE ENTRADA # ============================================ async def main(): # Opção 1: rodar apenas o servidor await run_server() # Opção 2: rodar servidor e cliente em paralelo (descomente abaixo) # server_task = asyncio.create_task(run_server()) # await asyncio.sleep(1) # Aguarda servidor iniciar # await test_client(["Olá servidor!\n", "Como você está?\n", "Saindo...\n"]) # server_task.cancel() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nServidor encerrado manualmente.") |
No código, o servidor escuta na porta 8888 local.
Cada cliente rodará na mesma thread sem bloqueios.
A função handle_client é uma corrotina independente.
Quando await reader.read() é chamado, o controle é cedido.
Outros clientes são atendidos enquanto aguardamos dados.
Isso é o coração do modelo event-driven.
Para testar, use telnet localhost 8888 no terminal.
Digite qualquer mensagem e veja o eco retornar.
Você pode abrir múltiplos terminais simultaneamente.
Observe como todos são atendidos concorrentemente.
Além disso, o servidor lida bem com desconexões inesperadas.
Portanto, é um exemplo robusto para começar.
Outro ponto importante é o tratamento de erros.
Cada cliente é isolado; uma falha não afeta os outros.
Isso foi demonstrado com try/except no handle_client.
O servidor continua funcionando mesmo com erros pontuais.
Use essa estrutura para criar servidores reais.
Primeiramente, substitua a lógica de echo pelo seu negócio.
Adicione timeouts para evitar clientes lentos.
Use asyncio.wait_for() para isso.
Além disso, monitore o loop principal periodicamente.
Assim, você garante alta disponibilidade do serviço.
Escala para milhares de conexões
Baixo overhead de memória
Código mais simples que threads
Ideal para I/O-bound
Não acelera CPU-bound
Curva de aprendizado inicial
Biblioteca padrão menor que frameworks
Debugging pode ser desafiador
Para produção, use bibliotecas como asyncio com cuidado.
Frameworks como aiohttp ou FastAPI abstraem complexidades.
Eles oferecem routers, middlewares e validação nativa.
No entanto, entender as bases é fundamental.
Primeiramente, domine o exemplo acima.
Depois, evolua para soluções mais completas.
Finalmente, lembre-se de testar sob carga real.
Ferramentas como locust ajudam na simulação.
Assim, você garante que seu servidor aguenta o tráfego.
Portanto, asyncio é uma ferramenta poderosa.
Use-a com sabedoria nos projetos certos.