Event loop é um mecanismo que gerencia tarefas assíncronas.
Ele executa corrotinas e retoma aquelas que estão aguardando I/O.
Primeiramente, o event loop roda em uma única thread.
Por exemplo, ele pode alternar entre 1000 tarefas de rede rapidamente.
Além disso, o event loop nunca bloqueia em operações lentas.
A voz passiva é usada aqui: “as tarefas são agendadas pelo loop”.
Quando utilizar event loop? Em aplicações com alta concorrência.
Por exemplo, servidores web, chats ou crawlers de internet.
O Python fornece o asyncio com um event loop padrão.
Vamos explorar como ele funciona e como usá-lo.
Três subtítulos guiarão você pelo coração do asyncio.
Ao final, você dominará o event loop e suas aplicações.
Como o event loop funciona internamente
O event loop mantém uma fila de tarefas prontas para executar.
Cada tarefa é uma corrotina que pode ser pausada com await.
Quando uma tarefa chama await, ela retorna o controle ao loop.
O loop então executa a próxima tarefa da fila.
Assim que o I/O aguardado termina, a tarefa é reagendada.
A voz passiva é aplicada: “os descritores de arquivo são monitorados”.
Exemplo mostrando o event loop em ação:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
import asyncio import time async def tarefa_curta(nome, duracao): """Tarefa que cede o controle após cada passo.""" print(f" {nome}: iniciando") for i in range(3): print(f" {nome}: passo {i} - vai dormir {duracao}s") await asyncio.sleep(duracao) # Cede o controle ao loop print(f" {nome}: passo {i} - acordou") print(f" {nome}: finalizada") return f"Resultado de {nome}" async def demonstrar_event_loop(): """Demonstra como o event loop intercala tarefas.""" print("=== Event loop alternando entre tarefas ===") # Criando três tarefas com durações diferentes tarefas = [ asyncio.create_task(tarefa_curta("Tarefa A", 0.3)), asyncio.create_task(tarefa_curta("Tarefa B", 0.5)), asyncio.create_task(tarefa_curta("Tarefa C", 0.2)), ] # Aguarda todas completarem resultados = await asyncio.gather(*tarefas) print(f"Resultados: {resultados}") async def obter_event_loop_info(): """Obtém informações sobre o event loop atual.""" loop = asyncio.get_running_loop() print(f"Loop atual: {loop}") print(f"Loop é fechado? {loop.is_closed()}") print(f"Loop está rodando? {loop.is_running()}") async def main(): await demonstrar_event_loop() print("\n=== Informações do Event Loop ===") await obter_event_loop_info() print("\n=== Simulação de I/O com diferentes velocidades ===") async def operacao_lenta(nome): for i in range(5): await asyncio.sleep(0.1) print(f" {nome}: progresso {i+1}/5") return nome # Executa múltiplas operações concorrentemente inicio = time.time() resultados = await asyncio.gather( operacao_lenta("Op1"), operacao_lenta("Op2"), operacao_lenta("Op3") ) print(f"Tempo total: {time.time() - inicio:.1f}s") print(f"Resultados: {resultados}") if __name__ == "__main__": asyncio.run(main()) |
O event loop intercala as tarefas automaticamente.
Nenhuma tarefa bloqueia a execução das outras durante sleeps.
Isso permite alta concorrência com recursos mínimos.
Controlando o event loop manualmente
Em alguns casos, você precisa controlar o loop diretamente.
Use asyncio.get_event_loop() para obter o loop atual.
Métodos como call_soon() agendam funções imediatamente.
call_later() agenda após um atraso em segundos.
call_at() agenda em um tempo absoluto (monotônico).
Quando usar controle manual? Em integrações com código síncrono.
Também em sistemas que precisam de temporizadores precisos.
A voz passiva é aplicada: “as funções são agendadas com prioridade”.
Exemplo de controle manual do event loop:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
import asyncio import time def funcao_sincrona(nome): """Função síncrona chamada pelo event loop.""" print(f" Função síncrona {nome} executando em {time.strftime('%H:%M:%S')}") return f"Resultado de {nome}" async def tarefa_assincrona(nome, duracao): """Tarefa assíncrona normal.""" print(f" Tarefa {nome}: iniciando em {time.strftime('%H:%M:%S')}") await asyncio.sleep(duracao) print(f" Tarefa {nome}: finalizando em {time.strftime('%H:%M:%S')}") return nome async def demonstrar_controle_manual(): print("=== Controle manual do event loop ===") loop = asyncio.get_running_loop() # call_soon - executa o mais rápido possível loop.call_soon(funcao_sincrona, "call_soon") # call_later - executa após 0.5 segundos loop.call_later(0.5, funcao_sincrona, "call_later") # call_at - executa em um tempo absoluto agora = loop.time() loop.call_at(agora + 1.0, funcao_sincrona, "call_at") # Executa algumas tarefas assíncronas enquanto isso tarefas = [ asyncio.create_task(tarefa_assincrona("Async1", 0.3)), asyncio.create_task(tarefa_assincrona("Async2", 0.6)), ] # Aguarda um pouco para ver os callbacks await asyncio.sleep(1.5) await asyncio.gather(*tarefas) async def criar_event_loop_personalizado(): """Cria um event loop personalizado.""" print("\n=== Event loop personalizado ===") # Cria um novo loop (não o padrão) novo_loop = asyncio.new_event_loop() def tarefa_no_loop(): print(f" Executando no loop personalizado") return "OK" # Agenda e executa no loop personalizado novo_loop.call_soon(tarefa_no_loop) novo_loop.call_later(0.3, tarefa_no_loop) # Executa o loop por um tempo limitado novo_loop.run_until_complete(asyncio.sleep(0.5)) novo_loop.close() print("Loop personalizado fechado") async def temporizador_preciso(): """Demonstra temporizadores com alta precisão.""" print("\n=== Temporizador com precisão ===") inicio = loop.time() def marcador(): agora = loop.time() print(f" Marca em t={agora - inicio:.3f}s") loop = asyncio.get_running_loop() # Agenda marcações em intervalos precisos loop.call_at(inicio + 0.1, marcador) loop.call_at(inicio + 0.2, marcador) loop.call_at(inicio + 0.3, marcador) loop.call_at(inicio + 0.5, marcador) await asyncio.sleep(0.6) if __name__ == "__main__": asyncio.run(demonstrar_controle_manual()) asyncio.run(criar_event_loop_personalizado()) asyncio.run(temporizador_preciso()) |
Callbacks síncronos não devem executar operações lentas.
Eles bloqueiam todo o event loop se demorarem demais.
Use call_soon_threadsafe() para agendar de outras threads.
Event loop na prática: servidores e clientes
O event loop é a base de servidores assíncronos.
Ele gerencia milhares de conexões simultâneas sem threads.
Cada cliente é tratado como uma tarefa independente.
O loop alterna entre clientes enquanto eles aguardam dados.
Quando usar servidores com event loop? Em aplicações de rede escaláveis.
Por exemplo, APIs REST, WebSockets ou servidores de jogos.
A voz passiva é aplicada: “as conexões são aceitas e tratadas”.
Exemplo de servidor e cliente com event loop:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
import asyncio # Servidor echo assíncrono async def handle_client(reader, writer): """Trata um cliente conectado.""" addr = writer.get_extra_info('peername') print(f"Cliente conectado: {addr}") try: while True: # Lê dados do cliente (não bloqueante) data = await reader.read(1024) if not data: break message = data.decode() print(f"Recebido de {addr}: {message}") # Ecoa de volta writer.write(f"Echo: {message}".encode()) await writer.drain() except asyncio.CancelledError: print(f"Cliente {addr} cancelado") finally: writer.close() await writer.wait_closed() print(f"Cliente {addr} desconectado") async def run_server(): """Inicia o servidor no event loop.""" server = await asyncio.start_server( handle_client, '127.0.0.1', 8888 ) addr = server.sockets[0].getsockname() print(f"Servidor rodando em {addr}") async with server: await server.serve_forever() # Cliente assíncrono para testar async def run_client(mensagens): """Cliente que se conecta ao servidor.""" reader, writer = await asyncio.open_connection('127.0.0.1', 8888) for msg in mensagens: print(f"Enviando: {msg}") writer.write(msg.encode()) await writer.drain() resposta = await reader.read(1024) print(f"Resposta: {resposta.decode()}") await asyncio.sleep(0.5) writer.close() await writer.wait_closed() print("Cliente finalizado") async def demonstrar_servidor_cliente(): """Demonstra servidor e cliente no mesmo event loop.""" print("=== Servidor Echo com Event Loop ===\n") # Inicia o servidor em background server_task = asyncio.create_task(run_server()) # Aguarda o servidor iniciar await asyncio.sleep(0.1) # Executa alguns clientes await run_client(["Olá", "Como vai?", "Teste final"]) # Cancela o servidor (demonstração) server_task.cancel() try: await server_task except asyncio.CancelledError: print("\nServidor encerrado") print("Demonstração concluída") async def monitorar_event_loop(): """Monitora quantas tarefas estão no loop.""" loop = asyncio.get_running_loop() # Tarefa que se auto-monitora async def tarefa_trabalhadora(id_, duracao): print(f"Tarefa {id_}: iniciando por {duracao}s") await asyncio.sleep(duracao) print(f"Tarefa {id_}: finalizada") return id_ # Cria muitas tarefas tarefas = [asyncio.create_task(tarefa_trabalhadora(i, 1)) for i in range(10)] # Aguarda todas resultados = await asyncio.gather(*tarefas) print(f"Todas as {len(resultados)} tarefas completaram") # Mostra estatísticas do loop if hasattr(loop, 'get_debug'): print(f"Debug mode: {loop.get_debug()}") if __name__ == "__main__": # Executa a demonstração asyncio.run(demonstrar_servidor_cliente()) asyncio.run(monitorar_event_loop()) |
O event loop gerencia automaticamente todas as conexões.
Nunca execute código bloqueante dentro de handlers.
Use asyncio.to_thread() para funções CPU-intensivas.
O event loop é o motor que torna tudo isso possível.
Domine-o e você construirá sistemas extremamente escaláveis.