Gil prejudicando: cpu-bound com threads
Quando o código é intensivo em CPU, threads não ajudam. O GIL força as threads a executarem uma por vez. Portanto, o ganho de performance é zero ou negativo. Por exemplo, calcular números primos em várias threads não acelera. A voz passiva é aplicada: “o tempo total é similar ao sequencial”. Veja um exemplo prático dessa limitação:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
import threading import time def tarefa_cpu_pesada(): """Função que consome muita CPU.""" total = 0 for i in range(50_000_000): total += i * i return total def executar_sequencial(): """Executa a tarefa várias vezes em sequência.""" inicio = time.time() for _ in range(4): tarefa_cpu_pesada() return time.time() - inicio def executar_com_threads(): """Executa a tarefa em threads paralelas.""" threads = [] inicio = time.time() for _ in range(4): t = threading.Thread(target=tarefa_cpu_pesada) threads.append(t) t.start() for t in threads: t.join() return time.time() - inicio if __name__ == "__main__": print("Executando sequencialmente...") tempo_seq = executar_sequencial() print(f"Sequencial: {tempo_seq:.2f} segundos") print("\nExecutando com 4 threads...") tempo_thread = executar_com_threads() print(f"Com threads: {tempo_thread:.2f} segundos") print(f"\nAceleração: {tempo_seq / tempo_thread:.2f}x") print("(Esperado próximo de 1.0, não 4.0)") |
Gil ajudando: i/o-bound com threads
Para operações de I/O, o GIL é liberado durante a espera. Enquanto uma thread aguarda dados da rede, outra executa. Portanto, threads trazem ganhos enormes para I/O-bound. Por exemplo, baixar 10 arquivos simultaneamente é muito mais rápido. A voz passiva é aplicada: “as operações de I/O são realizadas fora do GIL”. Exemplo demonstrando a eficiência para I/O:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
import threading import time import requests def tarefa_io_pesada(url, nome): """Simula uma operação de I/O (requisição de rede).""" inicio = time.time() resposta = requests.get(url) tempo = time.time() - inicio print(f"[{nome}] {url} -> {resposta.status_code} em {tempo:.2f}s") return tempo def executar_io_sequencial(): """Executa requisições em sequência.""" urls = ['https://httpbin.org/delay/1'] * 5 inicio = time.time() for i, url in enumerate(urls): tarefa_io_pesada(url, f"Seq-{i}") return time.time() - inicio def executar_io_com_threads(): """Executa requisições em threads concorrentes.""" urls = ['https://httpbin.org/delay/1'] * 5 threads = [] inicio = time.time() for i, url in enumerate(urls): t = threading.Thread(target=tarefa_io_pesada, args=(url, f"Thread-{i}")) threads.append(t) t.start() for t in threads: t.join() return time.time() - inicio if __name__ == "__main__": print("=== Teste com I/O (requisições de 1 segundo cada) ===\n") tempo_seq = executar_io_sequencial() print(f"\nTotal sequencial: {tempo_seq:.2f}s") tempo_thread = executar_io_com_threads() print(f"Total com threads: {tempo_thread:.2f}s") print(f"\nAceleração: {tempo_seq / tempo_thread:.2f}x") print("(Esperado próximo de 5.0 - ganho real!)") |
Alternativas para contornar o gil
Para CPU-bound, use multiprocessing em vez de threading. Cada processo tem seu próprio GIL e memória separada. Assim, você aproveita múltiplos núcleos de verdade. Outra alternativa é usar bibliotecas em C como NumPy. Elas liberam o GIL durante operações pesadas. Para I/O-bound, threading e asyncio são excelentes. A escolha certa depende do seu problema específico. A voz passiva é aplicada: “decisões informadas são tomadas após medição”. Exemplo de multiprocessing para contornar o GIL:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
import multiprocessing import time def trabalho_cpu_pesado(n): """Função CPU-bound que libera o GIL via multiprocessing.""" total = 0 for i in range(50_000_000): total += i * i return total def com_multiprocessing(): """Usa múltiplos processos para contornar o GIL.""" with multiprocessing.Pool(processes=4) as pool: resultados = pool.map(trabalho_cpu_pesado, range(4)) return resultados def com_threads_falho(): """Tenta usar threads (falha devido ao GIL).""" import threading resultados = [0] * 4 def wrapper(idx): resultados[idx] = trabalho_cpu_pesado(idx) threads = [] for i in range(4): t = threading.Thread(target=wrapper, args=(i,)) threads.append(t) t.start() for t in threads: t.join() return resultados if __name__ == "__main__": print("Multiprocessing (contorna o GIL):") inicio = time.time() com_multiprocessing() print(f"Tempo: {time.time() - inicio:.2f}s") print("\nThreads (limitado pelo GIL):") inicio = time.time() com_threads_falho() print(f"Tempo: {time.time() - inicio:.2f}s") |