Ray é um framework para computação distribuída em Python.
Ele transforma funções e classes em tarefas remotas facilmente.
Primeiramente, Ray escala de um laptop para um cluster gigante.
Por exemplo, você pode paralelizar loops sem modificar muito código.
Além disso, Ray é excelente para machine learning e reinforcement learning.
Assim, cientistas de dados ganham produtividade imensa.
Consequentemente, projetos que levavam dias agora levam horas.
Portanto, a adoção de Ray cresce rapidamente na indústria.
Quando utilizar Ray? Em computação paralela de larga escala.
Também em treinamento de modelos, otimização de hiperparâmetros e simulações.
Da mesma forma, qualquer tarefa paralelizável se beneficia.
Por outro lado, para tarefas sequenciais simples, Ray é desnecessário.
Ray oferece atores (estado compartilhado) e tarefas sem estado.
Então, vamos explorar instalação, conceitos e exemplos práticos.
Três subtítulos guiarão você pelo universo do Ray.
Portanto, ao final, você paralelizará código como nunca antes.
Instalação e primeiros passos com ray
Instale Ray com pip: pip install ray.
Depois, importe ray e inicie com ray.init().
O decorador @ray.remote torna uma função distribuída.
Chamá-la com .remote() cria uma tarefa assíncrona.
Use ray.get() para obter os resultados.
Quando usar tarefas Ray? Em operações independentes e paralelizáveis.
Por exemplo, processar imagens ou calcular estatísticas.
Além disso, tarefas Ray são ideais para map-reduce simples.
Exemplo básico de Ray:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
import ray import time # Inicializa o Ray (local, sem cluster) ray.init(ignore_reinit_error=True) @ray.remote def tarefa_pesada(n): """Função que será executada remotamente.""" time.sleep(1) # Simula trabalho pesado return n * n # Versão sequencial print("=== Execução sequencial ===") inicio_seq = time.time() resultados_seq = [tarefa_pesada(i) for i in range(10)] tempo_seq = time.time() - inicio_seq print(f"Tempo sequencial: {tempo_seq:.2f}s") # Versão com Ray (paralela) print("\n=== Execução com Ray ===") inicio_ray = time.time() # Cria tarefas remotas (não bloqueia) futures = [tarefa_pesada.remote(i) for i in range(10)] # Aguarda os resultados resultados_ray = ray.get(futures) tempo_ray = time.time() - inicio_ray print(f"Tempo com Ray: {tempo_ray:.2f}s") print(f"Aceleração: {tempo_seq / tempo_ray:.2f}x") # Verificando resultados print(f"Resultados: {resultados_ray[:5]}...") # Fecha o Ray ray.shutdown() |
Ray executa tarefas em paralelo usando todos os núcleos.
O ganho é próximo ao número de CPUs disponíveis.
Portanto, para tarefas independentes, Ray é imbatível.
Atores: estado compartilhado entre tarefas
Atores são objetos Ray que mantêm estado entre chamadas.
Use @ray.remote em uma classe para criar um ator.
Instancie com Classe.remote() e chame métodos com .metodo.remote().
Quando usar atores? Em cenários com estado compartilhado.
Por exemplo, contadores, acumuladores ou cache distribuído.
Além disso, atores são úteis para simulações ou parâmetros compartilhados.
Assim, você mantém consistência sem locks complexos.
Atores mantêm estado consistente mesmo com chamadas concorrentes.
Então, para sistemas com estado, prefira atores a tarefas puras.
Exemplo de ator contador distribuído:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
import ray import time ray.init(ignore_reinit_error=True) @ray.remote class ContadorDistribuido: """Ator que mantém um contador compartilhado.""" def __init__(self): self.valor = 0 def incrementar(self, quantidade=1): self.valor += quantidade return self.valor def obter_valor(self): return self.valor def resetar(self): self.valor = 0 return self.valor # Criando um ator (instância remota) contador = ContadorDistribuido.remote() # Chamando métodos do ator remotamente print("=== Ator Contador Distribuído ===") futures = [] for i in range(10): # Incrementa de forma distribuída futures.append(contador.incrementar.remote(1)) # Aguarda resultados parciais resultados = ray.get(futures) print(f"Incrementos: {resultados}") # Obtém valor final valor_final = ray.get(contador.obter_valor.remote()) print(f"Valor final do contador: {valor_final}") # Exemplo com múltiplos atores independentes print("\n=== Múltiplos Atores ===") atores = [ContadorDistribuido.remote() for _ in range(5)] # Incrementa cada ator com diferentes valores for i, ator in enumerate(atores): ator.incrementar.remote(i + 1) # Coleta resultados valores = ray.get([ator.obter_valor.remote() for ator in atores]) print(f"Valores dos atores: {valores}") # Exemplo com ator que acumula dados @ray.remote class Acumulador: def __init__(self): self.dados = [] def adicionar(self, item): self.dados.append(item) return len(self.dados) def obter_todos(self): return self.dados def media(self): if not self.dados: return 0 return sum(self.dados) / len(self.dados) acumulador = Acumulador.remote() for i in range(20): acumulador.adicionar.remote(i * 2) # Obtém resultados tamanho = ray.get(acumulador.adicionar.remote(100)) media = ray.get(acumulador.media.remote()) todos = ray.get(acumulador.obter_todos.remote()) print(f"\n=== Acumulador ===") print(f"Total de itens: {tamanho}") print(f"Média: {media:.2f}") print(f"Primeiros 5 itens: {todos[:5]}...") ray.shutdown() |
Atores mantêm estado entre chamadas remotas.
Isso é fundamental para muitos algoritmos distribuídos.
Assim, você implementa sistemas complexos com facilidade.
Ray para machine learning e tunning de hiperparâmetros
Ray tem bibliotecas especializadas como Ray Tune e Ray Train.
Ray Tune otimiza hiperparâmetros de modelos de ML.
Ray Train distribui treinamento de modelos grandes.
Quando usar essas ferramentas? Em projetos de ML profissional.
Também em otimização de modelos com muitos parâmetros.
Por exemplo, redes neurais ou algoritmos de ensemble.
Além disso, Ray Serve serve modelos em produção.
Exemplo de Ray Tune para otimização:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import ray from ray import tune from ray.tune.schedulers import ASHAScheduler import numpy as np import time # Inicializa Ray ray.init(ignore_reinit_error=True) def funcao_objetivo(config): """Função a ser otimizada (exemplo: SVM com parâmetros).""" # Simula treinamento de modelo x = config["x"] y = config["y"] z = config["z"] # Função de custo simulada custo = (x - 1)**2 + (y - 2)**2 + (z - 3)**2 time.sleep(0.1) # Simula tempo de treinamento # Reporta métricas para o Tune tune.report(loss=custo, accuracy=1.0 / (1.0 + custo)) def demonstrar_ray_tune(): """Demonstra otimização de hiperparâmetros com Ray Tune.""" print("=== Ray Tune - Otimização de Hiperparâmetros ===\n") # Espaço de busca espaco_busca = { "x": tune.uniform(0, 2), "y": tune.uniform(1, 3), "z": tune.uniform(2, 4), } # Executa otimização analise = tune.run( funcao_objetivo, config=espaco_busca, num_samples=20, # Número de combinações metric="loss", mode="min", progress_bar=True ) # Melhores parâmetros encontrados melhores_config = analise.get_best_config(metric="loss", mode="min") melhor_loss = analise.get_best_trial(metric="loss", mode="min").last_result["loss"] print(f"\nMelhores parâmetros: x={melhores_config['x']:.3f}, " f"y={melhores_config['y']:.3f}, z={melhores_config['z']:.3f}") print(f"Melhor loss: {melhor_loss:.6f}") return melhores_config # Exemplo com Ray para processamento paralelo de dados @ray.remote def processar_lote(dados_lote): """Processa um lote de dados em paralelo.""" resultado = [x * 2 for x in dados_lote] time.sleep(0.01) # Simula processamento return sum(resultado) def processamento_paralelo_dados(): """Divide dados em lotes e processa em paralelo.""" dados = list(range(1000)) num_lotes = 8 tamanho_lote = len(dados) // num_lotes lotes = [] for i in range(num_lotes): inicio = i * tamanho_lote fim = inicio + tamanho_lote if i < num_lotes - 1 else len(dados) lotes.append(dados[inicio:fim]) print("\n=== Processamento Paralelo de Dados ===") inicio = time.time() # Processa lotes em paralelo futures = [processar_lote.remote(lote) for lote in lotes] resultados = ray.get(futures) total = sum(resultados) tempo = time.time() - inicio print(f"Processados {len(dados)} itens em {tempo:.3f}s") print(f"Total: {total}") # Comparação sequencial inicio_seq = time.time() total_seq = sum(x * 2 for x in dados) tempo_seq = time.time() - inicio_seq print(f"Tempo sequencial: {tempo_seq:.3f}s") print(f"Aceleração: {tempo_seq / tempo:.2f}x") # Exemplo com Ray para simulação distribuída @ray.remote def simulacao(parametros): """Simula um sistema com parâmetros dados.""" # Simulação complexa resultado = np.sin(parametros[0]) * np.cos(parametros[1]) return resultado def simulacoes_distribuidas(): """Executa múltiplas simulações em paralelo.""" print("\n=== Simulações Distribuídas ===") parametros_lista = [(i * 0.1, j * 0.1) for i in range(10) for j in range(10)] inicio = time.time() futures = [simulacao.remote(p) for p in parametros_lista] resultados = ray.get(futures) tempo = time.time() - inicio print(f"Executadas {len(parametros_lista)} simulações em {tempo:.2f}s") print(f"Média dos resultados: {np.mean(resultados):.4f}") if __name__ == "__main__": # Demonstrações melhores = demonstrar_ray_tune() processamento_paralelo_dados() simulacoes_distribuidas() print("\n=== Resumo do Ray ===") print("Ray é ideal para:") print(" - Paralelização de loops pesados") print(" - Atores com estado compartilhado") print(" - Otimização de hiperparâmetros") print(" - Treinamento distribuído de ML") ray.shutdown() |
Ray Tune busca automaticamente os melhores parâmetros.
A fórmula do ganho em paralelismo é clara:
\(S = \frac{T_{\text{seq}}}{T_{\text{ray}}} \approx N_{\text{workers}} \times (1 – O)\)
Onde O é o overhead de comunicação (geralmente pequeno).
Portanto, Ray é a ferramenta moderna para computação distribuída.
Comece com tarefas simples e evolua para atores complexos.
Finalmente, seu código escalará de um laptop para um cluster inteiro.