Processos paralelos executam código simultaneamente em múltiplos núcleos.
Isso é chamado de true parallelism ou paralelismo verdadeiro.
Primeiramente, cada processo tem seu próprio interpretador e memória.
Por exemplo, 8 processos podem rodar em 8 núcleos de CPU ao mesmo tempo.
Além disso, processos não compartilham o GIL (Global Interpreter Lock).
A voz passiva é usada aqui: “o sistema operacional gerencia a distribuição entre núcleos”.
Quando utilizar processos paralelos? Em tarefas com CPU intensiva.
Por exemplo, processamento de imagens, simulações científicas ou machine learning.
Também em qualquer tarefa que exija todo o poder da máquina.
Python oferece o módulo multiprocessing e o concurrent.futures.
Vamos explorar características, ganhos e limitações.
Três subtítulos guiarão você pelo verdadeiro paralelismo.
Ao final, você dominará a execução paralela em Python.
Multiprocessing vs. threading: a diferença crucial
Threads são leves, mas limitadas pelo GIL em CPU-bound.
Processos são mais pesados, mas executam em paralelo verdadeiro.
Quando escolher processos? Quando o gargalo é CPU, não I/O.
A criação de um processo é mais cara que uma thread.
No entanto, o ganho em paralelismo compensa para tarefas longas.
A voz passiva é aplicada: “a memória é duplicada para cada processo”.
Exemplo comparativo entre threads e processos para CPU-bound:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
import multiprocessing import threading import time import os def trabalho_cpu_pesado(segundos): """Função que consome muita CPU.""" pid = os.getpid() inicio = time.time() # Loop pesado para simular trabalho total = 0 for i in range(30_000_000): total += i * i fim = time.time() print(f"Processo/Thread PID {pid} concluiu em {fim - inicio:.2f}s") return total def executar_com_processos(n_tarefas): """Executa com processos paralelos.""" with multiprocessing.Pool(processes=n_tarefas) as pool: resultados = pool.map(trabalho_cpu_pesado, [1] * n_tarefas) return resultados def executar_com_threads(n_tarefas): """Executa com threads (limitado pelo GIL).""" threads = [] resultados = [0] * n_tarefas def wrapper(idx): resultados[idx] = trabalho_cpu_pesado(1) for i in range(n_tarefas): t = threading.Thread(target=wrapper, args=(i,)) threads.append(t) t.start() for t in threads: t.join() return resultados if __name__ == "__main__": n = 4 # Número de tarefas (igual aos núcleos) print(f"Executando {n} tarefas CPU-bound com PROCESSOS:") inicio = time.time() executar_com_processos(n) tempo_processos = time.time() - inicio print(f"Tempo total com processos: {tempo_processos:.2f}s") print(f"\nExecutando {n} tarefas CPU-bound com THREADS:") inicio = time.time() executar_com_threads(n) tempo_threads = time.time() - inicio print(f"Tempo total com threads: {tempo_threads:.2f}s") print(f"\nProcessos foram {tempo_threads / tempo_processos:.1f}x mais rápidos!") print("(Processos executam em paralelo verdadeiro, threads não)") |
Processos mostram aceleração próxima ao número de núcleos.
Threads mostram aceleração próxima de 1.0 devido ao GIL.
Essa é a diferença fundamental entre concorrência e paralelismo.
Comunicação e sincronização entre processos
Processos paralelos precisam se comunicar para trocar resultados.
Diferente de threads, eles não compartilham memória automaticamente.
Portanto, usamos filas (Queue), pipes ou memória compartilhada.
A serialização (pickle) é necessária para enviar dados entre processos.
Isso adiciona overhead, mas é inevitável para isolamento.
Quando usar memória compartilhada? Para grandes arrays numéricos.
A voz passiva é usada aqui: “os dados são copiados por valor entre processos”.
Exemplo de comunicação com Queue entre processos paralelos:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import multiprocessing import time import random def produtor(queue, id_produtor, n_itens): """Produz itens e coloca na fila.""" for i in range(n_itens): item = f"Produtor-{id_produtor}-Item-{i}" queue.put(item) print(f"[Produtor {id_produtor}] Produziu: {item}") time.sleep(random.uniform(0.1, 0.3)) queue.put(None) # Sinal de fim para este produtor def consumidor(queue, id_consumidor): """Consome itens da fila até receber Nones suficientes.""" n_finalizados = 0 while True: item = queue.get() if item is None: n_finalizados += 1 if n_finalizados == 2: # 2 produtores break continue print(f"[Consumidor {id_consumidor}] Consumiu: {item}") # Simula processamento do item time.sleep(random.uniform(0.2, 0.5)) if __name__ == "__main__": queue = multiprocessing.Queue() # Criando produtores paralelos produtores = [] for i in range(2): p = multiprocessing.Process(target=produtor, args=(queue, i, 5)) produtores.append(p) p.start() # Criando consumidores paralelos consumidores = [] for i in range(2): c = multiprocessing.Process(target=consumidor, args=(queue, i)) consumidores.append(c) c.start() # Aguardar todos terminarem for p in produtores: p.join() for c in consumidores: c.join() print("Todos os processos finalizaram") # Exemplo com memória compartilhada (Array) print("\n=== Memória Compartilhada com Array ===") from multiprocessing import Array, Lock arr = Array('i', [0] * 10) # Array de 10 inteiros lock = Lock() def atualizar_array(indice, valor): with lock: arr[indice] = valor processos = [] for i in range(10): p = multiprocessing.Process(target=atualizar_array, args=(i, i * 10)) processos.append(p) p.start() for p in processos: p.join() print(f"Array final: {list(arr)}") |
Queues são seguras e ideais para padrões produtor-consumidor.
Arrays compartilhados são eficientes para dados numéricos.
Sempre use locks ao modificar memória compartilhada.
Escalabilidade e lei de amdahl
O ganho com processos paralelos não é linear infinito.
A lei de Amdahl descreve o limite teórico de aceleração.
A fórmula é: \(S = \frac{1}{(1 – P) + \frac{P}{N}}\)
Onde P é a fração paralelizável e N é o número de núcleos.
Por exemplo, com 90% paralelizável e 8 núcleos, o ganho máximo é 4.7x.
Portanto, nem todo código pode ser perfeitamente paralelizado.
A voz passiva é aplicada: “partes sequenciais são executadas em apenas um núcleo”.
Exemplo demonstrando a lei de Amdahl na prática:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
import multiprocessing import time def trabalho_paralelizavel(n): """Trabalho que pode ser dividido entre processos.""" total = 0 for i in range(n): total += i * i return total def trabalho_sequencial(n): """Trabalho que precisa rodar em um único núcleo.""" total = 0 for i in range(n): total += i ** 0.5 return total def executar_com_paralelismo(n_tarefas, tamanho_par, tamanho_seq): """Executa parte paralela + parte sequencial.""" # Parte sequencial (um núcleo) resultado_seq = trabalho_sequencial(tamanho_seq) # Parte paralela (múltiplos núcleos) with multiprocessing.Pool(processes=n_tarefas) as pool: resultados_par = pool.map(trabalho_paralelizavel, [tamanho_par] * n_tarefas) return resultado_seq, sum(resultados_par) if __name__ == "__main__": n_nucleos = multiprocessing.cpu_count() print(f"Máquina com {n_nucleos} núcleos\n") # Experimento 1: 90% paralelizável tamanho_par = 40_000_000 tamanho_seq = int(tamanho_par * 0.111) # ~10% sequencial inicio = time.time() executar_com_paralelismo(1, tamanho_par, tamanho_seq) tempo_1_nucleo = time.time() - inicio inicio = time.time() executar_com_paralelismo(n_nucleos, tamanho_par, tamanho_seq) tempo_n_nucleos = time.time() - inicio aceleracao = tempo_1_nucleo / tempo_n_nucleos print(f"90% paralelizável:") print(f" 1 núcleo: {tempo_1_nucleo:.2f}s") print(f" {n_nucleos} núcleos: {tempo_n_nucleos:.2f}s") print(f" Aceleração real: {aceleracao:.2f}x") # Experimento 2: 50% paralelizável tamanho_par = 20_000_000 tamanho_seq = tamanho_par # 50% cada inicio = time.time() executar_com_paralelismo(1, tamanho_par, tamanho_seq) tempo_1_nucleo = time.time() - inicio inicio = time.time() executar_com_paralelismo(n_nucleos, tamanho_par, tamanho_seq) tempo_n_nucleos = time.time() - inicio aceleracao = tempo_1_nucleo / tempo_n_nucleos print(f"\n50% paralelizável:") print(f" 1 núcleo: {tempo_1_nucleo:.2f}s") print(f" {n_nucleos} núcleos: {tempo_n_nucleos:.2f}s") print(f" Aceleração real: {aceleracao:.2f}x") |
Observe que quanto maior a fração sequencial, menor o ganho.
Portanto, identifique e otimize os gargalos sequenciais primeiro.
Processos paralelos são ferramentas poderosas, mas não mágicas.
Use-os com sabedoria e meça sempre o ganho real.
Para muitos problemas, o paralelismo verdadeiro transforma horas em minutos.
Experimente e veja seu código voar em múltiplos núcleos.