Dask é uma biblioteca para computação paralela em Python.
Ele escala de um laptop para um cluster de milhares de máquinas.
Primeiramente, Dask integra-se perfeitamente com NumPy e Pandas.
Por exemplo, você processa DataFrames maiores que a memória RAM.
Além disso, Dask usa execução lazy (preguiçosa) para otimização.
Assim, o sistema constrói um grafo antes de qualquer cálculo.
Consequentemente, operações desnecessárias são evitadas.
Quando utilizar Dask? Em dados que não cabem na memória.
Também em computação paralela com APIs familiares.
Por outro lado, para dados pequenos, Dask adiciona overhead.
Dask oferece arrays, dataframes, bags e delayed.
Então, vamos explorar cada um com exemplos práticos.
Três subtítulos guiarão você pelo universo Dask.
Portanto, ao final, você processará terabytes de dados.
Dask arrays: numpy que escala
Dask arrays imitam NumPy, mas trabalham com blocos (chunks).
Eles dividem grandes arrays em pedaços menores.
Cada bloco processa separadamente e em paralelo.
Quando usar Dask arrays? Em operações matemáticas em dados grandes.
Por exemplo, multiplicação de matrizes com bilhões de elementos.
Além disso, você controla o tamanho dos chunks manualmente.
Exemplo de Dask array:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
import dask.array as da import numpy as np import time # Criando um array gigante (10.000 x 10.000 = 100 milhões de elementos) # Isso não caberia na memória se fosse NumPy puro tamanho = 10000 print(f"Criando array de {tamanho}x{tamanho} ({tamanho*tamanho/1e6:.0f}M elementos)") # Com NumPy (carrega tudo na memória) inicio_np = time.time() arr_np = np.random.random((tamanho, tamanho)) soma_np = arr_np.sum() tempo_np = time.time() - inicio_np print(f"NumPy: soma={soma_np:.2f}, tempo={tempo_np:.2f}s, memória={arr_np.nbytes/1e6:.0f}MB") # Com Dask (processa em blocos) inicio_dask = time.time() # Blocos de 1000x1000 (100 blocos no total) arr_dask = da.random.random((tamanho, tamanho), chunks=(1000, 1000)) soma_dask = arr_dask.sum().compute() # compute() executa o grafo tempo_dask = time.time() - inicio_dask print(f"Dask: soma={soma_dask:.2f}, tempo={tempo_dask:.2f}s") # Operações matemáticas com Dask arr1 = da.ones((10000, 10000), chunks=(1000, 1000)) arr2 = da.ones((10000, 10000), chunks=(1000, 1000)) # Multiplicação elemento a elemento (lazy) resultado = (arr1 + arr2) * 3 - 1 print(f"\nOperação lazy criada, shape: {resultado.shape}") # Calcula a média (executa o grafo) media = resultado.mean().compute() print(f"Média do resultado: {media:.2f}") # Demonstração do grafo print(f"Gráfico de tarefas: {resultado}") |
Dask processa arrays maiores que a memória RAM.
Ele usa disco ou cluster conforme a necessidade.
Portanto, você nunca enfrenta MemoryError novamente.
Dask dataframes: pandas que escala
Dask DataFrames imitam pandas, mas particionados em pedaços.
Cada partição contém um DataFrame pandas comum.
Operações como groupby, join e merge são distribuídas.
Quando usar Dask DataFrames? Em dados tabulares gigantes.
Por exemplo, logs de servidor com bilhões de linhas.
Além disso, a leitura de múltiplos arquivos ocorre em paralelo.
Exemplo de Dask DataFrame:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import dask.dataframe as dd import pandas as pd import numpy as np import time # Criando um dataset grande simulado print("=== Dask DataFrame vs Pandas ===\n") n_linhas = 10_000_000 # 10 milhões de linhas print(f"Simulando {n_linhas:,} linhas de dados") # Versão Pandas (carrega tudo na memória) try: inicio_pd = time.time() df_pd = pd.DataFrame({ 'id': range(n_linhas), 'valor': np.random.random(n_linhas), 'categoria': np.random.choice(['A', 'B', 'C', 'D'], n_linhas) }) media_pd = df_pd.groupby('categoria')['valor'].mean() tempo_pd = time.time() - inicio_pd print(f"Pandas: média por categoria em {tempo_pd:.2f}s") print(f" Resultado: {media_pd.to_dict()}") except MemoryError: print("Pandas: Memória insuficiente para 10M linhas!") # Versão Dask (processa em partições) inicio_dd = time.time() # Cria DataFrame Dask a partir de um gerador df_dd = dd.from_pandas( pd.DataFrame({ 'id': range(100_000), 'valor': np.random.random(100_000), 'categoria': np.random.choice(['A', 'B', 'C', 'D'], 100_000) }), npartitions=4 ) # Simula 100 repetições (total 10M linhas) for _ in range(99): df_temp = dd.from_pandas( pd.DataFrame({ 'id': range(100_000), 'valor': np.random.random(100_000), 'categoria': np.random.choice(['A', 'B', 'C', 'D'], 100_000) }), npartitions=4 ) df_dd = dd.concat([df_dd, df_temp]) # Operação groupby lazy media_dd = df_dd.groupby('categoria')['valor'].mean() # Executa o cálculo resultado_dd = media_dd.compute() tempo_dd = time.time() - inicio_dd print(f"Dask: média por categoria em {tempo_dd:.2f}s") print(f" Resultado: {resultado_dd.to_dict()}") # Exemplo com leitura de CSV grande (simulado) print("\n=== Leitura de CSV Particionado ===") # Criando arquivos CSV simulados import tempfile import os with tempfile.TemporaryDirectory() as tmpdir: for i in range(5): df_part = pd.DataFrame({ 'id': range(100_000), 'valor': np.random.random(100_000), 'categoria': np.random.choice(['X', 'Y', 'Z'], 100_000) }) df_part.to_csv(f"{tmpdir}/part_{i}.csv", index=False) # Lê todos os CSVs de uma vez com Dask df_dask = dd.read_csv(f"{tmpdir}/part_*.csv") print(f"Partições: {df_dask.npartitions}") print(f"Colunas: {df_dask.columns.tolist()}") # Calcula estatísticas stats = df_dask.groupby('categoria')['valor'].agg(['mean', 'std', 'count']) resultado = stats.compute() print("\nEstatísticas por categoria:") print(resultado) |
Dask DataFrames processam dados maiores que a memória.
Eles também leem múltiplos arquivos em paralelo.
Assim, você trabalha com terabytes como se fossem gigabytes.
Dask delayed: paralelização customizada
Dask delayed decora funções para execução lazy e paralela.
Você controla exatamente o grafo de dependências.
Quando usar delayed? Em fluxos de trabalho complexos.
Por exemplo, pipelines de ETL com etapas interdependentes.
Além disso, delayed permite paralelismo granular e fino.
Exemplo de Dask delayed:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import dask from dask.distributed import Client import time import random # Inicia cliente Dask local client = Client(n_workers=4, threads_per_worker=1) print(f"Dashboard Dask: {client.dashboard_link}") # Funções de exemplo (simulam trabalho) @dask.delayed def baixar_dados(url): """Simula download de dados.""" time.sleep(random.uniform(0.5, 1.5)) return f"Dados de {url}" @dask.delayed def processar_dados(dados): """Processa os dados baixados.""" time.sleep(0.5) return f"Processado: {dados}" @dask.delayed def analisar_dados(dados_processados): """Analisa os dados processados.""" time.sleep(0.3) return f"Análise: {len(dados_processados)} caracteres" @dask.delayed def salvar_resultado(analise): """Salva o resultado final.""" time.sleep(0.2) return f"Salvo: {analise}" # Pipeline completo com Dask delayed print("=== Pipeline de Dados com Dask Delayed ===\n") urls = [f"http://api.com/dados/{i}" for i in range(10)] # Cria o grafo lazy (nenhuma execução ainda) inicio_pipeline = time.time() # Passo 1: Baixar todos os dados em paralelo downloads = [baixar_dados(url) for url in urls] # Passo 2: Processar cada download processados = [processar_dados(d) for d in downloads] # Passo 3: Analisar cada resultado analises = [analisar_dados(p) for p in processados] # Passo 4: Salvar todos os resultados salvos = [salvar_resultado(a) for a in analises] # Executa o grafo (agora sim) resultados = dask.compute(*salvos) tempo_pipeline = time.time() - inicio_pipeline print(f"Pipeline concluído em {tempo_pipeline:.2f}s") for i, r in enumerate(resultados[:5]): print(f" Resultado {i}: {r}") # Exemplo com dependência entre tarefas print("\n=== Dependências entre Tarefas ===") @dask.delayed def etapa1(): time.sleep(0.5) return 10 @dask.delayed def etapa2(x): time.sleep(0.5) return x * 2 @dask.delayed def etapa3(x, y): time.sleep(0.5) return x + y # Grafo com dependências (etapa2 depende de etapa1) a = etapa1() b = etapa2(a) # depende de a c = etapa2(5) # independente d = etapa3(b, c) # depende de b e c # Executa resultado_final = dask.compute(d) print(f"Resultado final: {resultado_final[0]}") # Exemplo de visualização do grafo (opcional) print("\n=== Visualização (opcional) ===") print("Para ver o grafo: dask.visualize(d, filename='grafo.png')") # Fecha o cliente client.close() |
Dask delayed é perfeito para pipelines personalizados.
A fórmula da aceleração teórica funciona bem:
\(S = \frac{T_{\text{seq}}}{T_{\text{dask}}} \approx N_{\text{workers}} \times (1 – O)\)
Dask é a ferramenta ideal para Big Data em Python.
Comece com Dask arrays para dados numéricos.
Use Dask DataFrames para dados tabulares.
Para fluxos complexos, Dask delayed é a escolha certa.
Portanto, escalabilidade sem complicação é o poder do Dask.