Celery é um sistema de filas para processamento assíncrono.
Ele executa tarefas em segundo plano fora do ciclo principal.
Primeiramente, Celery usa um broker (Redis ou RabbitMQ) para comunicação.
Por exemplo, você envia um e-mail sem bloquear a resposta HTTP.
Além disso, Celery escala horizontalmente com muitos workers.
A voz passiva é usada aqui: “as tarefas são enfileiradas e processadas depois”.
Quando utilizar Celery? Em operações demoradas e assíncronas.
Por exemplo, processamento de imagens, envio de e-mails ou web scraping.
Também em tarefas periódicas (cron jobs) e pipelines de dados.
Vamos explorar conceitos, instalação e exemplos práticos.
Três subtítulos guiarão você pelo universo Celery.
Ao final, você implementará filas robustas em seus projetos.
Conceitos fundamentais: broker, worker e tasks
O broker é o intermediário entre produtores e consumidores.
Redis e RabbitMQ são os brokers mais comuns para Celery.
Workers são processos que executam as tarefas da fila.
Tasks são funções decoradas com @app.task.
Quando usar cada componente? Broker para comunicação entre serviços.
Worker para processamento paralelo de tarefas.
A voz passiva é aplicada: “as tarefas são serializadas em JSON”.
Exemplo básico de configuração e tarefa Celery:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# tasks.py from celery import Celery import time # Configuração do Celery com Redis como broker app = Celery( 'tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' ) @app.task def tarefa_demorada(segundos): """Tarefa que simula processamento demorado.""" print(f"Iniciando tarefa por {segundos}s") for i in range(segundos): time.sleep(1) print(f"Progresso: {i+1}/{segundos}") return f"Tarefa concluída após {segundos}s" @app.task def enviar_email(destinatario, assunto, corpo): """Simula envio de e-mail.""" print(f"Enviando e-mail para {destinatario}") time.sleep(2) # Simula envio return f"E-mail enviado para {destinatario}" @app.task def processar_imagem(caminho, operacao): """Simula processamento de imagem.""" print(f"Processando {caminho} com operação '{operacao}'") time.sleep(3) return f"Imagem {caminho} processada" # Tarefa com agendamento @app.task def tarefa_periodica(): """Tarefa executada periodicamente.""" print("Executando tarefa periódica...") return "Tarefa periódica concluída" # Como executar: # Terminal 1: celery -A tasks worker --loglevel=info # Terminal 2: python -c "from tasks import tarefa_demorada; tarefa_demorada.delay(5)" |
O worker precisa estar rodando para executar as tarefas.
O comando celery -A tasks worker inicia o worker.
Enfileirando e monitorando tarefas
Para enfileirar uma tarefa, use o método .delay().
Ele retorna um objeto AsyncResult para acompanhamento.
Use .get() para aguardar o resultado (bloqueante).
Use .ready() para verificar se a tarefa terminou.
Quando usar essas funções? Em aplicações web assíncronas.
Também em scripts que precisam monitorar progresso.
A voz passiva é aplicada: “os resultados são armazenados no backend”.
Exemplo de enfileiramento e monitoramento:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# enfileirar.py from tasks import tarefa_demorada, enviar_email from celery.result import AsyncResult import time # Enfileirando tarefas (não bloqueia) print("=== Enfileirando tarefas ===") resultado1 = tarefa_demorada.delay(3) resultado2 = enviar_email.delay("user@exemplo.com", "Olá", "Conteúdo do e-mail") resultado3 = tarefa_demorada.delay(2) print(f"ID da tarefa 1: {resultado1.id}") print(f"ID da tarefa 2: {resultado2.id}") print(f"ID da tarefa 3: {resultado3.id}") # Monitorando tarefas print("\n=== Monitorando progresso ===") tarefas = [resultado1, resultado2, resultado3] while any(not t.ready() for t in tarefas): for t in tarefas: status = "pronta" if t.ready() else "pendente" print(f"Tarefa {t.id[:8]}...: {status}") time.sleep(1) print("\n=== Resultados ===") for t in tarefas: print(f"Tarefa {t.id[:8]}...: {t.result}") # Exemplo com Chain (encadeamento de tarefas) print("\n=== Encadeamento de tarefas (Chain) ===") from celery import chain @celery_app.task def dobrar(x): return x * 2 @celery_app.task def adicionar_um(x): return x + 1 # Executa: dobrar -> adicionar_um cadeia = chain(dobrar.s(5), adicionar_um.s()) resultado = cadeia() print(f"Resultado do chain: {resultado.get()}") # (5*2)+1 = 11 # Exemplo com Group (tarefas paralelas) print("\n=== Grupo de tarefas paralelas (Group) ===") from celery import group tarefas_paralelas = group( tarefa_demorada.s(2), tarefa_demorada.s(3), tarefa_demorada.s(1) ) resultado_grupo = tarefas_paralelas.apply_async() resultados = resultado_grupo.get() print(f"Resultados do grupo: {resultados}") |
O método .s() cria uma assinatura de tarefa.
Assinaturas permitem composição e encadeamento.
Celery para tarefas periódicas (celery beat)
Celery Beat é o scheduler para tarefas periódicas.
Ele enfileira tarefas em intervalos regulares automaticamente.
Quando usar Beat? Em backups, limpeza de dados ou relatórios.
Por exemplo, enviar relatório todo dia às 8h.
A voz passiva é aplicada: “os intervalos são definidos no arquivo de configuração”.
Exemplo de configuração de tarefas periódicas:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# celery_config.py from celery.schedules import crontab # Configuração do Beat beat_schedule = { # A cada 30 segundos 'a-cada-30-segundos': { 'task': 'tasks.tarefa_periodica', 'schedule': 30.0, # segundos 'args': (), }, # Todo dia à meia-noite 'limpeza-diaria': { 'task': 'tasks.limpar_cache', 'schedule': crontab(hour=0, minute=0), 'args': (), }, # Toda segunda-feira às 10h 'relatorio-semanal': { 'task': 'tasks.gerar_relatorio', 'schedule': crontab(day_of_week=1, hour=10, minute=0), 'args': ('semanal',), }, # A cada 5 minutos 'monitoramento': { 'task': 'tasks.verificar_health', 'schedule': 300.0, 'args': (), }, } # Aplicando a configuração app.conf.beat_schedule = beat_schedule app.conf.timezone = 'America/Sao_Paulo' # tasks.py com tarefas periódicas @app.task def limpar_cache(): """Limpa cache do sistema.""" print("Executando limpeza de cache...") # lógica de limpeza return "Cache limpo" @app.task def gerar_relatorio(tipo): """Gera relatório periódico.""" print(f"Gerando relatório {tipo}...") return f"Relatório {tipo} gerado" @app.task def verificar_health(): """Verifica saúde do sistema.""" print("Verificando health check...") return "Sistema saudável" # Exemplo de tarefa com retry automático @app.task(bind=True, max_retries=3, default_retry_delay=60) def tarefa_com_retry(self, url): """Tarefa que tenta novamente em caso de falha.""" import requests try: response = requests.get(url, timeout=10) return response.status_code except Exception as e: print(f"Falha ao acessar {url}: {e}") # Tenta novamente após 60 segundos raise self.retry(exc=e) # Exemplo de tarefa com timeout @app.task(time_limit=30, soft_time_limit=25) def tarefa_limitada(): """Tarefa que para após 30 segundos.""" import time time.sleep(40) # Excede o timeout return "Nunca chegará aqui" |
Para rodar o Beat: celery -A tasks beat --loglevel=info.
Para rodar worker: celery -A tasks worker --loglevel=info.
A fórmula de throughput da fila:
\(T = \frac{N_{\text{tarefas}}}{N_{\text{workers}} \times \text{velocidade}}\)
Celery é essencial para aplicações web robustas.
Comece com tarefas simples e evolua para workflows complexos.
Sua aplicação ficará mais rápida e responsiva.