Programação distribuída executa código em vários computadores ao mesmo tempo. Ela conecta máquinas através de uma rede para resolver problemas grandes. Primeiramente, isso permite escalar horizontalmente além de um único servidor. Por exemplo, processar terabytes de dados com 100 máquinas em paralelo. Além disso, sistemas distribuídos são mais tolerantes a falhas. Assim, uma máquina cair não derruba todo o sistema. Consequentemente, a disponibilidade aumenta significativamente. Quando utilizar programação distribuída? Em problemas massivamente paralelos. Por exemplo, processamento de dados, machine learning ou busca em larga escala. Da mesma forma, qualquer tarefa que não caiba em uma máquina. Python oferece ferramentas como Celery, Dask, Ray e PySpark. Então, vamos explorar conceitos, padrões e exemplos práticos. Três subtítulos guiarão você pelo mundo da computação distribuída. Portanto, ao final, você saberá quando e como distribuir seu código.
Conceitos fundamentais: nós, comunicação e coordenação
Um nó é uma máquina individual no sistema distribuído. Nós se comunicam via rede usando mensagens ou chamadas RPC. A coordenação garante que todos trabalhem em direção ao mesmo objetivo. Quando usar arquitetura distribuída? Quando uma máquina é insuficiente. Também quando você precisa de alta disponibilidade. Por exemplo, serviços como Google ou Netflix usam milhares de nós. Exemplo básico de comunicação entre processos via socket:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# servidor_distribuido.py import socket import json import threading import time def processar_tarefa(tarefa): """Processa uma tarefa recebida.""" tipo = tarefa.get('tipo') dados = tarefa.get('dados', {}) if tipo == 'soma': return sum(dados.get('numeros', [])) elif tipo == 'multiplicacao': resultado = 1 for n in dados.get('numeros', []): resultado *= n return resultado elif tipo == 'echo': return dados.get('mensagem', '') else: return f"Tipo desconhecido: {tipo}" def tratar_cliente(conn, addr): """Trata um cliente conectado.""" print(f"Cliente conectado: {addr}") try: while True: dados = conn.recv(4096) if not dados: break tarefa = json.loads(dados.decode()) print(f"Recebida tarefa: {tarefa['tipo']} de {addr}") resultado = processar_tarefa(tarefa) resposta = json.dumps({'status': 'ok', 'resultado': resultado}) conn.send(resposta.encode()) except Exception as e: print(f"Erro com {addr}: {e}") finally: conn.close() print(f"Cliente {addr} desconectado") def iniciar_servidor(host='localhost', porta=8888): """Inicia o servidor distribuído.""" servidor = socket.socket(socket.AF_INET, socket.SOCK_STREAM) servidor.bind((host, porta)) servidor.listen(5) print(f"Servidor distribuído rodando em {host}:{porta}") try: while True: conn, addr = servidor.accept() thread = threading.Thread(target=tratar_cliente, args=(conn, addr)) thread.start() except KeyboardInterrupt: print("Servidor encerrado") finally: servidor.close() if __name__ == "__main__": iniciar_servidor() # cliente_distribuido.py import socket import json def enviar_tarefa(host, porta, tarefa): """Envia uma tarefa para o servidor.""" cliente = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: cliente.connect((host, porta)) cliente.send(json.dumps(tarefa).encode()) resposta = cliente.recv(4096) return json.loads(resposta.decode()) finally: cliente.close() if __name__ == "__main__": # Testando o servidor tarefas = [ {'tipo': 'soma', 'dados': {'numeros': [1, 2, 3, 4, 5]}}, {'tipo': 'multiplicacao', 'dados': {'numeros': [2, 3, 4]}}, {'tipo': 'echo', 'dados': {'mensagem': 'Olá servidor!'}}, ] for tarefa in tarefas: resultado = enviar_tarefa('localhost', 8888, tarefa) print(f"Tarefa {tarefa['tipo']}: {resultado}") |
Esse exemplo mostra um nó servidor processando requisições. Vários clientes podem enviar tarefas simultaneamente. Na prática, usamos bibliotecas mais robustas que sockets puros.
Padrões de programação distribuída
Existem padrões comuns em sistemas distribuídos. O padrão mestre-escravo divide trabalho entre trabalhadores. O padrão pipeline processa dados em etapas sequenciais. O padrão publish-subscribe notifica múltiplos interessados. Quando usar cada padrão? Mestre-escravo para tarefas independentes. Pipeline para processamento de fluxo de dados. Pub-sub para eventos e notificações em tempo real. Além disso, cada padrão resolve problemas específicos. Exemplo simplificado do padrão mestre-escravo:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
import multiprocessing import time import random class Escravo: """Trabalhador que processa uma parte da tarefa.""" def __init__(self, id_): self.id = id_ def processar(self, dados): """Processa um lote de dados.""" print(f"Escravo {self.id}: processando {len(dados)} itens") time.sleep(random.uniform(0.5, 1.5)) # Simula trabalho resultado = [x * 2 for x in dados] # Exemplo: dobra os valores print(f"Escravo {self.id}: concluído") return resultado class Mestre: """Coordenador que distribui trabalho entre escravos.""" def __init__(self, num_escravos): self.escravos = [Escravo(i) for i in range(num_escravos)] def distribuir(self, dados, num_particoes=None): """Divide os dados e distribui para os escravos.""" if num_particoes is None: num_particoes = len(self.escravos) # Divide os dados em partes tamanho_parte = len(dados) // num_particoes partes = [] for i in range(num_particoes): inicio = i * tamanho_parte fim = inicio + tamanho_parte if i < num_particoes - 1 else len(dados) partes.append(dados[inicio:fim]) # Distribui para os escravos (usando processos) with multiprocessing.Pool(processes=len(self.escravos)) as pool: resultados = pool.map(self._processar_escravo, [(i, partes[i]) for i in range(len(partes))]) # Combina os resultados resultado_final = [] for r in resultados: resultado_final.extend(r) return resultado_final def _processar_escravo(self, args): idx, dados = args return self.escravos[idx].processar(dados) # Exemplo de uso if __name__ == "__main__": dados = list(range(100)) # 100 números print(f"Dados originais: {dados[:10]}...") mestre = Mestre(num_escravos=4) inicio = time.time() resultado = mestre.distribuir(dados) tempo = time.time() - inicio print(f"Resultado: {resultado[:10]}...") print(f"Tempo de processamento: {tempo:.2f}s") print(f"Verificação: {resultado == [x*2 for x in dados]}") |
O mestre divide o trabalho, e os escravos processam em paralelo. Esse padrão escala adicionando mais escravos (máquinas). Portanto, é um dos mais úteis na prática.
Ferramentas práticas: celery, dask e ray
Celery é o framework mais popular para filas de tarefas distribuídas. Ele usa um broker (Redis ou RabbitMQ) para comunicação. Dask é especializado em computação paralela para dados grandes. Ray é moderno e focado em machine learning distribuído. Quando usar cada ferramenta? Celery para tarefas assíncronas web. Dask para processamento de arrays e dataframes. Ray para treinamento de modelos e reinforcement learning. Além disso, cada uma tem sua comunidade e documentação. Exemplo conceitual com Celery (código parcial):
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# Exemplo conceitual usando Celery (requer instalação) # Arquivo: tasks.py """ from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task def processar_dados(n): # Simula processamento pesado resultado = sum(i * i for i in range(n)) return resultado @app.task def baixar_url(url): import requests resposta = requests.get(url) return resposta.status_code """ # Exemplo prático com Dask (mais fácil de testar) import dask from dask.distributed import Client import time def trabalho_pesado(x): """Função que será distribuída.""" time.sleep(0.1) # Simula trabalho return x * x def demonstrar_dask(): """Demonstra computação distribuída com Dask.""" print("=== Dask - Computação Distribuída ===") # Inicia um cluster local (simula múltiplas máquinas) client = Client(n_workers=4, threads_per_worker=1) print(f"Dashboard disponível em: {client.dashboard_link}") # Cria tarefas atrasadas (lazy evaluation) tarefas = [dask.delayed(trabalho_pesado)(i) for i in range(20)] # Executa em paralelo inicio = time.time() resultados = dask.compute(*tarefas) tempo = time.time() - inicio print(f"Resultados: {resultados[:5]}...") print(f"Tempo com 4 workers: {tempo:.2f}s") # Comparação sequencial inicio_seq = time.time() resultados_seq = [trabalho_pesado(i) for i in range(20)] tempo_seq = time.time() - inicio_seq print(f"Tempo sequencial: {tempo_seq:.2f}s") print(f"Aceleração: {tempo_seq / tempo:.2f}x") client.close() # Exemplo com Ray (moderno para ML) def demonstrar_ray(): """Demonstra Ray para computação distribuída.""" try: import ray ray.init(ignore_reinit_error=True) @ray.remote def tarefa_ray(x): time.sleep(0.1) return x * x # Executa tarefas em paralelo inicio = time.time() futures = [tarefa_ray.remote(i) for i in range(20)] resultados = ray.get(futures) tempo = time.time() - inicio print(f"\n=== Ray - Computação Distribuída ===") print(f"Resultados: {resultados[:5]}...") print(f"Tempo: {tempo:.2f}s") ray.shutdown() except ImportError: print("\nRay não está instalado. Instale com: pip install ray") if __name__ == "__main__": demonstrar_dask() demonstrar_ray() print("\n=== Dica: Para usar Celery ===") print("pip install celery redis") print("celery -A tasks worker --loglevel=info") |
Essas ferramentas abstraem a complexidade da rede. Você escreve código como se fosse local, mas ele roda distribuído. A fórmula da aceleração em sistemas distribuídos: \(S = \frac{T_{\text{seq}}}{T_{\text{dist}}} \approx N_{\text{máquinas}} \times E\) Onde E é a eficiência (geralmente 0.7 a 0.9). Portanto, programação distribuída é o próximo nível após multiprocessamento. Comece com Dask para dados e Ray para ML. Celery é ótimo para tarefas web assíncronas. Finalmente, distribua seu código e conquiste a escalabilidade horizontal.