Data Warehouse (DW)
Modelagem Dimensional Proposto
Tabelas Fato
Fato_Combustivel
| Coluna | Tipo | Descrição | Relação |
|---|---|---|---|
| id_fato (PK) | INT | Chave primária | – |
| id_tempo (FK) | INT | Chave para dimensão de tempo | Dim_Tempo.id_tempo |
| id_localizacao (FK) | INT | Chave para dimensão de local | Dim_Localizacao.id |
| id_produto (FK) | INT | Chave para dimensão de produto | Dim_Produto.id |
| valor_medio_venda | DECIMAL | Valor médio de venda (ex: R$/litro) | – |
Dimensões
Dim_Tempo
| Coluna | Tipo | Descrição | Exemplo |
|---|---|---|---|
| id_tempo (PK) | INT | Chave primária | 1 |
| ano | INT | Ano completo | 2023 |
| mes | INT | Mês (1-12) | 7 |
| mes_ano | VARCHAR | Formato “MM/YYYY” | “07/2023” |
Dim_Localizacao
| Coluna | Tipo | Descrição | Exemplo |
|---|---|---|---|
| id (PK) | INT | Chave primária | 1 |
| pais | VARCHAR | País (ex: Brasil) | “Brasil” |
| regiao | VARCHAR | Região (ex: Sudeste) | “Sudeste” |
| estado | VARCHAR | UF (ex: SP) | “SP” |
| municipio | VARCHAR | Cidade | “São Paulo” |
Dim_Produto
| Coluna | Tipo | Descrição | Exemplo |
|---|---|---|---|
| id (PK) | INT | Chave primária | 1 |
| produto | VARCHAR | Nome do produto | “Gasolina” |
| unidade | VARCHAR | Unidade de medida | “Litro” |
Fluxo de Carga (ETL/ELT)
-
Extração:
-
APIs ou arquivos CSV, XLSX, JSON.
-
Ferramentas: Python (Pandas, Requests) para orquestração.
-
-
Transformação:
-
Limpeza (valores nulos, duplicatas).
-
Padronização (unidades de medida, nomes de campos).
-
Ferramentas: SQL.
-
-
Carga:
-
Load para o DW em cloud (BigQuery, Redshift, Snowflake) ou on-premise (SQL Server, PostgreSQL).
-
1. Tabela de Fato (Fato_Combustivel)
| Campo | Tipo | Descrição |
|---|---|---|
| id_fato | INT (PK) | Chave primária autoincremento. |
| id_tempo | INT (FK) | Chave para a dimensão de tempo. |
| id_localizacao | INT (FK) | Chave para a dimensão de local. |
| id_produto | INT (FK) | Chave para a dimensão de produto. |
| valor_medio_venda | DECIMAL(10,2) | Preço médio de venda (ex: 5.99). |
| volume_vendido | DECIMAL(10,2) | Opcional: Se tiver dados de volume. |
2. Dimensões
Dim_Tempo
| Campo | Tipo | Descrição |
|---|---|---|
| id_tempo | INT (PK) | Chave no formato AAAAMM (ex: 202307 = jul/2023). |
| ano | INT | Ano (2023, 2024…). |
| mes | INT | Mês (1-12). |
| mes_ano | VARCHAR(7) | Formato “MM/AAAA” (apresentação gráfico histórico). |
Dim_Localizacao
| Campo | Tipo | Descrição |
|---|---|---|
| id_localizacao | INT (PK) | Chave primária. |
| pais | VARCHAR(50) | Padronizado (ex: “Brasil”). |
| regiao | VARCHAR(50) | “Norte”, “Sudeste”, etc. |
| estado | VARCHAR(50) | “SP”, “RJ”, etc. |
Dim_Produto
| Campo | Tipo | Descrição |
|---|---|---|
| id_produto | INT (PK) | Chave primária. |
| produto | VARCHAR(50) | Nome (“Gasolina”, “Etanol”, …). |
| unidade | VARCHAR(10) | “R$/litro”, “R$/m³”, etc. |
Carga:
1. Carga Inicial (Full Load)
Objetivo: Popular todas as tabelas do DW com dados históricos completos pela primeira vez.
Passos:
Extrair dados brutos
Fontes: APIs, arquivos CSV/XLSX com histórico completo (ex: dados de 2010 a 2023).
|
1 2 3 4 5 6 7 8 9 10 11 12 |
import pandas as pd # Ler CSV df = pd.read_csv('dados_historicos.csv', parse_dates=['mes_ano']) # Limpeza e padronização df['mes_ano'] = df['mes_ano'].dt.strftime('%m/%Y') # Formato MM/YYYY df['unidade'] = df['unidade'].str.upper() # Padronizar (ex: Litro → LITRO) # Gerar IDs únicos para dimensões df['id_tempo'] = df['mes_ano'].str.replace('/', '').astype(int) # Ex: 012023 → 12023 df['id_localizacao'] = df['municipio'].str[:3] + df['estado'] |
Carregar Dimensões
Dim_Tempo:
|
1 2 3 4 5 6 7 8 |
INSERT INTO Dim_Tempo (id_tempo, ano, mes, mes_ano) SELECT DISTINCT CAST(REPLACE(mes_ano, '/', '') AS INT), -- Ex: 01/2020 → 12020 EXTRACT(YEAR FROM TO_DATE(mes_ano, 'MM/YYYY')), EXTRACT(MONTH FROM TO_DATE(mes_ano, 'MM/YYYY')), mes_ano FROM dados_historicos_temp; Dim_Localizacao e Dim_Produto: |
|
1 2 3 4 5 6 7 |
-- Evitar duplicatas com MERGE (ou INSERT ... ON CONFLICT) MERGE INTO Dim_Localizacao AS target USING (SELECT DISTINCT pais, regiao, estado, municipio FROM dados_historicos_temp) AS source ON (target.municipio = source.municipio AND target.estado = source.estado) WHEN NOT MATCHED THEN INSERT (pais, regiao, estado, municipio) VALUES (source.pais, source.regiao, source.estado, source.municipio); Carregar Fato_Combustivel |
|
1 2 3 4 5 6 7 8 9 10 11 |
INSERT INTO Fato_Combustivel (id_tempo, id_localizacao, id_produto, valor_medio_venda) SELECT dt.id_tempo, dl.id, dp.id, dht.valor_medio_venda FROM dados_historicos_temp dht JOIN Dim_Tempo dt ON dht.mes_ano = dt.mes_ano JOIN Dim_Localizacao dl ON dht.municipio = dl.municipio AND dht.estado = dl.estado JOIN Dim_Produto dp ON dht.produto = dp.produto; Registrar Log |
|
1 2 |
INSERT INTO Log_Carga (tabela_destino, data_execucao, linhas_afetadas, status) VALUES ('Fato_Combustivel', CURRENT_TIMESTAMP, (SELECT COUNT(*) FROM Fato_Combustivel), 'Sucesso'); |
2. Cargas Incrementais (Delta Load)
Objetivo: Atualizar o DW apenas com dados novos ou modificados (ex: novos meses).
Passos:
Identificar Deltas
Fonte: Novo arquivo CSV/XLSX ou API com dados apenas do último mês (ex: 06/2023).
Controle: Use uma tabela de controle para saber a última data carregada:
|
1 2 3 4 5 6 |
CREATE TABLE Controle_Carga ( ultima_data_carregada DATE ); -- Inserir após a carga inicial: INSERT INTO Controle_Carga VALUES ('2023-05-31'); Extrair e Transformar Dados Novos |
Filtrar apenas registros com datas posteriores a ultima_data_carregada.
Exemplo em Python:
|
1 2 3 4 |
# Ler novo arquivo df_novo = pd.read_csv('dados_junho_2023.csv') df_novo = df_novo[df_novo['mes_ano'] > ultima_data_carregada] # Filtrar deltas Carregar Dimensões (Se Necessário) |
Verificar se há novos municípios ou produtos:
|
1 2 3 4 5 6 7 8 9 |
-- Exemplo para Dim_Localizacao INSERT INTO Dim_Localizacao (pais, regiao, estado, municipio) SELECT DISTINCT pais, regiao, estado, municipio FROM dados_novos_temp WHERE NOT EXISTS ( SELECT 1 FROM Dim_Localizacao WHERE municipio = dados_novos_temp.municipio AND estado = dados_novos_temp.estado ); Carregar Fato_Combustivel |
|
1 2 3 4 5 6 7 8 9 10 11 |
INSERT INTO Fato_Combustivel (id_tempo, id_localizacao, id_produto, valor_medio_venda) SELECT dt.id_tempo, dl.id, dp.id, dnt.valor_medio_venda FROM dados_novos_temp dnt JOIN Dim_Tempo dt ON dnt.mes_ano = dt.mes_ano JOIN Dim_Localizacao dl ON dnt.municipio = dl.municipio AND dnt.estado = dl.estado JOIN Dim_Produto dp ON dnt.produto = dp.produto; Atualizar Controle |
|
1 2 |
UPDATE Controle_Carga SET ultima_data_carregada = (SELECT MAX(TO_DATE(mes_ano, 'MM/YYYY')) FROM dados_novos_temp); |
3. Automatização com Airflow (Opcional)
Para orquestrar as cargas incrementais mensais:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime def carregar_deltas(): # Lógica de carga incremental aqui pass dag = DAG('carga_combustivel', schedule_interval='0 0 1 * *') # Executar no 1º dia do mês task_carga = PythonOperator( task_id='carga_incremental', python_callable=carregar_deltas, dag=dag ) |
4. Boas Práticas
Teste de Idempotência: Garanta que a carga incremental possa ser reexecutada sem duplicar dados.
Backup: Antes da carga inicial, faça backup do DW.
Particionamento: Para grandes volumes, particione Fato_Combustivel por ano/mês.