Dask arrays: numpy que escala
Dask arrays imitam NumPy, mas trabalham com blocos (chunks). Eles dividem grandes arrays em pedaços menores. Cada bloco processa separadamente e em paralelo. Quando usar Dask arrays? Em operações matemáticas em dados grandes. Por exemplo, multiplicação de matrizes com bilhões de elementos. Além disso, você controla o tamanho dos chunks manualmente. Exemplo de Dask array:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import dask.array as da import numpy as np import time # Criando um array gigante (10.000 x 10.000 = 100 milhões de elementos) # Isso não caberia na memória se fosse NumPy puro tamanho = 10000 print(f"Criando array de {tamanho}x{tamanho} ({tamanho*tamanho/1e6:.0f}M elementos)") # Com NumPy (carrega tudo na memória) inicio_np = time.time() arr_np = np.random.random((tamanho, tamanho)) soma_np = arr_np.sum() tempo_np = time.time() - inicio_np print(f"NumPy: soma={soma_np:.2f}, tempo={tempo_np:.2f}s, memória={arr_np.nbytes/1e6:.0f}MB") # Com Dask (processa em blocos) inicio_dask = time.time() # Blocos de 1000x1000 (100 blocos no total) arr_dask = da.random.random((tamanho, tamanho), chunks=(1000, 1000)) soma_dask = arr_dask.sum().compute() # compute() executa o grafo tempo_dask = time.time() - inicio_dask print(f"Dask: soma={soma_dask:.2f}, tempo={tempo_dask:.2f}s") # Operações matemáticas com Dask arr1 = da.ones((10000, 10000), chunks=(1000, 1000)) arr2 = da.ones((10000, 10000), chunks=(1000, 1000)) # Multiplicação elemento a elemento (lazy) resultado = (arr1 + arr2) * 3 - 1 print(f"\nOperação lazy criada, shape: {resultado.shape}") # Calcula a média (executa o grafo) media = resultado.mean().compute() print(f"Média do resultado: {media:.2f}") # Demonstração do grafo print(f"Gráfico de tarefas: {resultado}") |
Dask dataframes: pandas que escala
Dask DataFrames imitam pandas, mas particionados em pedaços. Cada partição contém um DataFrame pandas comum. Operações como groupby, join e merge são distribuídas. Quando usar Dask DataFrames? Em dados tabulares gigantes. Por exemplo, logs de servidor com bilhões de linhas. Além disso, a leitura de múltiplos arquivos ocorre em paralelo. Exemplo de Dask DataFrame:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import dask.dataframe as dd import pandas as pd import numpy as np import time # Criando um dataset grande simulado print("=== Dask DataFrame vs Pandas ===\n") n_linhas = 10_000_000 # 10 milhões de linhas print(f"Simulando {n_linhas:,} linhas de dados") # Versão Pandas (carrega tudo na memória) try: inicio_pd = time.time() df_pd = pd.DataFrame({ 'id': range(n_linhas), 'valor': np.random.random(n_linhas), 'categoria': np.random.choice(['A', 'B', 'C', 'D'], n_linhas) }) media_pd = df_pd.groupby('categoria')['valor'].mean() tempo_pd = time.time() - inicio_pd print(f"Pandas: média por categoria em {tempo_pd:.2f}s") print(f" Resultado: {media_pd.to_dict()}") except MemoryError: print("Pandas: Memória insuficiente para 10M linhas!") # Versão Dask (processa em partições) inicio_dd = time.time() # Cria DataFrame Dask a partir de um gerador df_dd = dd.from_pandas( pd.DataFrame({ 'id': range(100_000), 'valor': np.random.random(100_000), 'categoria': np.random.choice(['A', 'B', 'C', 'D'], 100_000) }), npartitions=4 ) # Simula 100 repetições (total 10M linhas) for _ in range(99): df_temp = dd.from_pandas( pd.DataFrame({ 'id': range(100_000), 'valor': np.random.random(100_000), 'categoria': np.random.choice(['A', 'B', 'C', 'D'], 100_000) }), npartitions=4 ) df_dd = dd.concat([df_dd, df_temp]) # Operação groupby lazy media_dd = df_dd.groupby('categoria')['valor'].mean() # Executa o cálculo resultado_dd = media_dd.compute() tempo_dd = time.time() - inicio_dd print(f"Dask: média por categoria em {tempo_dd:.2f}s") print(f" Resultado: {resultado_dd.to_dict()}") # Exemplo com leitura de CSV grande (simulado) print("\n=== Leitura de CSV Particionado ===") # Criando arquivos CSV simulados import tempfile import os with tempfile.TemporaryDirectory() as tmpdir: for i in range(5): df_part = pd.DataFrame({ 'id': range(100_000), 'valor': np.random.random(100_000), 'categoria': np.random.choice(['X', 'Y', 'Z'], 100_000) }) df_part.to_csv(f"{tmpdir}/part_{i}.csv", index=False) # Lê todos os CSVs de uma vez com Dask df_dask = dd.read_csv(f"{tmpdir}/part_*.csv") print(f"Partições: {df_dask.npartitions}") print(f"Colunas: {df_dask.columns.tolist()}") # Calcula estatísticas stats = df_dask.groupby('categoria')['valor'].agg(['mean', 'std', 'count']) resultado = stats.compute() print("\nEstatísticas por categoria:") print(resultado) |
Dask delayed: paralelização customizada
Dask delayed decora funções para execução lazy e paralela. Você controla exatamente o grafo de dependências. Quando usar delayed? Em fluxos de trabalho complexos. Por exemplo, pipelines de ETL com etapas interdependentes. Além disso, delayed permite paralelismo granular e fino. Exemplo de Dask delayed:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import dask from dask.distributed import Client import time import random # Inicia cliente Dask local client = Client(n_workers=4, threads_per_worker=1) print(f"Dashboard Dask: {client.dashboard_link}") # Funções de exemplo (simulam trabalho) @dask.delayed def baixar_dados(url): """Simula download de dados.""" time.sleep(random.uniform(0.5, 1.5)) return f"Dados de {url}" @dask.delayed def processar_dados(dados): """Processa os dados baixados.""" time.sleep(0.5) return f"Processado: {dados}" @dask.delayed def analisar_dados(dados_processados): """Analisa os dados processados.""" time.sleep(0.3) return f"Análise: {len(dados_processados)} caracteres" @dask.delayed def salvar_resultado(analise): """Salva o resultado final.""" time.sleep(0.2) return f"Salvo: {analise}" # Pipeline completo com Dask delayed print("=== Pipeline de Dados com Dask Delayed ===\n") urls = [f"http://api.com/dados/{i}" for i in range(10)] # Cria o grafo lazy (nenhuma execução ainda) inicio_pipeline = time.time() # Passo 1: Baixar todos os dados em paralelo downloads = [baixar_dados(url) for url in urls] # Passo 2: Processar cada download processados = [processar_dados(d) for d in downloads] # Passo 3: Analisar cada resultado analises = [analisar_dados(p) for p in processados] # Passo 4: Salvar todos os resultados salvos = [salvar_resultado(a) for a in analises] # Executa o grafo (agora sim) resultados = dask.compute(*salvos) tempo_pipeline = time.time() - inicio_pipeline print(f"Pipeline concluído em {tempo_pipeline:.2f}s") for i, r in enumerate(resultados[:5]): print(f" Resultado {i}: {r}") # Exemplo com dependência entre tarefas print("\n=== Dependências entre Tarefas ===") @dask.delayed def etapa1(): time.sleep(0.5) return 10 @dask.delayed def etapa2(x): time.sleep(0.5) return x * 2 @dask.delayed def etapa3(x, y): time.sleep(0.5) return x + y # Grafo com dependências (etapa2 depende de etapa1) a = etapa1() b = etapa2(a) # depende de a c = etapa2(5) # independente d = etapa3(b, c) # depende de b e c # Executa resultado_final = dask.compute(d) print(f"Resultado final: {resultado_final[0]}") # Exemplo de visualização do grafo (opcional) print("\n=== Visualização (opcional) ===") print("Para ver o grafo: dask.visualize(d, filename='grafo.png')") # Fecha o cliente client.close() |