Conceitos fundamentais: broker, worker e tasks
O broker é o intermediário entre produtores e consumidores. Redis e RabbitMQ são os brokers mais comuns para Celery. Workers são processos que executam as tarefas da fila. Tasks são funções decoradas com@app.task.
Quando usar cada componente? Broker para comunicação entre serviços.
Worker para processamento paralelo de tarefas.
A voz passiva é aplicada: “as tarefas são serializadas em JSON”.
Exemplo básico de configuração e tarefa Celery:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# tasks.py from celery import Celery import time # Configuração do Celery com Redis como broker app = Celery( 'tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' ) @app.task def tarefa_demorada(segundos): """Tarefa que simula processamento demorado.""" print(f"Iniciando tarefa por {segundos}s") for i in range(segundos): time.sleep(1) print(f"Progresso: {i+1}/{segundos}") return f"Tarefa concluída após {segundos}s" @app.task def enviar_email(destinatario, assunto, corpo): """Simula envio de e-mail.""" print(f"Enviando e-mail para {destinatario}") time.sleep(2) # Simula envio return f"E-mail enviado para {destinatario}" @app.task def processar_imagem(caminho, operacao): """Simula processamento de imagem.""" print(f"Processando {caminho} com operação '{operacao}'") time.sleep(3) return f"Imagem {caminho} processada" # Tarefa com agendamento @app.task def tarefa_periodica(): """Tarefa executada periodicamente.""" print("Executando tarefa periódica...") return "Tarefa periódica concluída" # Como executar: # Terminal 1: celery -A tasks worker --loglevel=info # Terminal 2: python -c "from tasks import tarefa_demorada; tarefa_demorada.delay(5)" |
celery -A tasks worker inicia o worker.
Enfileirando e monitorando tarefas
Para enfileirar uma tarefa, use o método.delay().
Ele retorna um objeto AsyncResult para acompanhamento.
Use .get() para aguardar o resultado (bloqueante).
Use .ready() para verificar se a tarefa terminou.
Quando usar essas funções? Em aplicações web assíncronas.
Também em scripts que precisam monitorar progresso.
A voz passiva é aplicada: “os resultados são armazenados no backend”.
Exemplo de enfileiramento e monitoramento:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# enfileirar.py from tasks import tarefa_demorada, enviar_email from celery.result import AsyncResult import time # Enfileirando tarefas (não bloqueia) print("=== Enfileirando tarefas ===") resultado1 = tarefa_demorada.delay(3) resultado2 = enviar_email.delay("user@exemplo.com", "Olá", "Conteúdo do e-mail") resultado3 = tarefa_demorada.delay(2) print(f"ID da tarefa 1: {resultado1.id}") print(f"ID da tarefa 2: {resultado2.id}") print(f"ID da tarefa 3: {resultado3.id}") # Monitorando tarefas print("\n=== Monitorando progresso ===") tarefas = [resultado1, resultado2, resultado3] while any(not t.ready() for t in tarefas): for t in tarefas: status = "pronta" if t.ready() else "pendente" print(f"Tarefa {t.id[:8]}...: {status}") time.sleep(1) print("\n=== Resultados ===") for t in tarefas: print(f"Tarefa {t.id[:8]}...: {t.result}") # Exemplo com Chain (encadeamento de tarefas) print("\n=== Encadeamento de tarefas (Chain) ===") from celery import chain @celery_app.task def dobrar(x): return x * 2 @celery_app.task def adicionar_um(x): return x + 1 # Executa: dobrar -> adicionar_um cadeia = chain(dobrar.s(5), adicionar_um.s()) resultado = cadeia() print(f"Resultado do chain: {resultado.get()}") # (5*2)+1 = 11 # Exemplo com Group (tarefas paralelas) print("\n=== Grupo de tarefas paralelas (Group) ===") from celery import group tarefas_paralelas = group( tarefa_demorada.s(2), tarefa_demorada.s(3), tarefa_demorada.s(1) ) resultado_grupo = tarefas_paralelas.apply_async() resultados = resultado_grupo.get() print(f"Resultados do grupo: {resultados}") |
.s() cria uma assinatura de tarefa.
Assinaturas permitem composição e encadeamento.
Celery para tarefas periódicas (celery beat)
Celery Beat é o scheduler para tarefas periódicas. Ele enfileira tarefas em intervalos regulares automaticamente. Quando usar Beat? Em backups, limpeza de dados ou relatórios. Por exemplo, enviar relatório todo dia às 8h. A voz passiva é aplicada: “os intervalos são definidos no arquivo de configuração”. Exemplo de configuração de tarefas periódicas:|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# celery_config.py from celery.schedules import crontab # Configuração do Beat beat_schedule = { # A cada 30 segundos 'a-cada-30-segundos': { 'task': 'tasks.tarefa_periodica', 'schedule': 30.0, # segundos 'args': (), }, # Todo dia à meia-noite 'limpeza-diaria': { 'task': 'tasks.limpar_cache', 'schedule': crontab(hour=0, minute=0), 'args': (), }, # Toda segunda-feira às 10h 'relatorio-semanal': { 'task': 'tasks.gerar_relatorio', 'schedule': crontab(day_of_week=1, hour=10, minute=0), 'args': ('semanal',), }, # A cada 5 minutos 'monitoramento': { 'task': 'tasks.verificar_health', 'schedule': 300.0, 'args': (), }, } # Aplicando a configuração app.conf.beat_schedule = beat_schedule app.conf.timezone = 'America/Sao_Paulo' # tasks.py com tarefas periódicas @app.task def limpar_cache(): """Limpa cache do sistema.""" print("Executando limpeza de cache...") # lógica de limpeza return "Cache limpo" @app.task def gerar_relatorio(tipo): """Gera relatório periódico.""" print(f"Gerando relatório {tipo}...") return f"Relatório {tipo} gerado" @app.task def verificar_health(): """Verifica saúde do sistema.""" print("Verificando health check...") return "Sistema saudável" # Exemplo de tarefa com retry automático @app.task(bind=True, max_retries=3, default_retry_delay=60) def tarefa_com_retry(self, url): """Tarefa que tenta novamente em caso de falha.""" import requests try: response = requests.get(url, timeout=10) return response.status_code except Exception as e: print(f"Falha ao acessar {url}: {e}") # Tenta novamente após 60 segundos raise self.retry(exc=e) # Exemplo de tarefa com timeout @app.task(time_limit=30, soft_time_limit=25) def tarefa_limitada(): """Tarefa que para após 30 segundos.""" import time time.sleep(40) # Excede o timeout return "Nunca chegará aqui" |
celery -A tasks beat --loglevel=info.
Para rodar worker: celery -A tasks worker --loglevel=info.
A fórmula de throughput da fila:
\(T = \frac{N_{\text{tarefas}}}{N_{\text{workers}} \times \text{velocidade}}\)
Celery é essencial para aplicações web robustas.
Comece com tarefas simples e evolua para workflows complexos.
Sua aplicação ficará mais rápida e responsiva.