Prerequisites
- Python 3.8 or higher
- pip
Installation
# Clone or navigate to the project directory
cd orion_framework
# Install basic dependencies
pip install pandas pyyaml click
# (Optional) If using Databricks
pip install databricks-sql-connector
Example 1 · Simple CSV Pipeline
1. Create directory structure
mkdir -p meu_pipeline/nodes
mkdir -p data/raw data/processed
2. Create catalog.yml
# meu_pipeline/catalog.yml
clientes_raw:
type: local_csv
path: data/raw/clientes.csv
clientes_processados:
type: local_csv
path: data/processed/clientes_processed.csv
3. Create sample data
echo "nome,email,idade
João Silva,joao@email.com,30
Maria Santos,maria@email.com,25
Pedro Costa,pedro@email.com,35" > data/raw/clientes.csv
4. Create nodes
meu_pipeline/nodes/extract.py
import pandas as pd
from ...infrastructure.config.context import OrionContext
def extract(context: OrionContext) -> pd.DataFrame:
df = context.catalog.load("clientes_raw")
context.logger.info(f"Extraídos {len(df)} registros")
return df
meu_pipeline/nodes/transform.py
import pandas as pd
from ...infrastructure.config.context import OrionContext
def transform(context: OrionContext, df: pd.DataFrame) -> pd.DataFrame:
context.logger.info(f"Transformando {len(df)} registros")
df.columns = [c.strip().lower() for c in df.columns]
df["idade_categoria"] = df["idade"].apply(
lambda x: "jovem" if x < 30 else "adulto"
)
return df
meu_pipeline/nodes/load.py
import pandas as pd
from ...infrastructure.config.context import OrionContext
def load(context: OrionContext, df: pd.DataFrame) -> None:
context.catalog.save("clientes_processados", df)
context.logger.info(f"Salvos {len(df)} registros processados")
5. Create pipeline
meu_pipeline/pipeline.py
from ...application.pipeline.builder import PipelineBuilder
from .nodes.extract import extract
from .nodes.transform import transform
from .nodes.load import load
def create_pipeline():
builder = PipelineBuilder("meu_pipeline_clientes")
builder.add_node(extract, inputs=[], outputs=["clientes_raw"])
builder.add_node(transform, inputs=["clientes_raw"], outputs=["clientes_processados"])
builder.add_node(load, inputs=["clientes_processados"], outputs=[])
return builder.build()
6. Execute
orion run \
--module meu_pipeline.pipeline \
--catalog meu_pipeline/catalog.yml
7. Check result
cat data/processed/clientes_processed.csv
Pré-requisitos
- Python 3.8 ou superior
- pip
Instalação
# Clone ou navegue até o diretório do projeto
cd orion_framework
# Instale dependências básicas
pip install pandas pyyaml click
# (Opcional) Se for usar Databricks
pip install databricks-sql-connector
Exemplo 1 · Pipeline CSV simples
1. Criar estrutura de diretórios
mkdir -p meu_pipeline/nodes
mkdir -p data/raw data/processed
2. Criar catalog.yml
# meu_pipeline/catalog.yml
clientes_raw:
type: local_csv
path: data/raw/clientes.csv
clientes_processados:
type: local_csv
path: data/processed/clientes_processed.csv
3. Criar dados de exemplo
echo "nome,email,idade
João Silva,joao@email.com,30
Maria Santos,maria@email.com,25
Pedro Costa,pedro@email.com,35" > data/raw/clientes.csv
4. Criar nodes
meu_pipeline/nodes/extract.py
import pandas as pd
from ...infrastructure.config.context import OrionContext
def extract(context: OrionContext) -> pd.DataFrame:
df = context.catalog.load("clientes_raw")
context.logger.info(f"Extraídos {len(df)} registros")
return df
meu_pipeline/nodes/transform.py
import pandas as pd
from ...infrastructure.config.context import OrionContext
def transform(context: OrionContext, df: pd.DataFrame) -> pd.DataFrame:
context.logger.info(f"Transformando {len(df)} registros")
df.columns = [c.strip().lower() for c in df.columns]
df["idade_categoria"] = df["idade"].apply(
lambda x: "jovem" if x < 30 else "adulto"
)
return df
meu_pipeline/nodes/load.py
import pandas as pd
from ...infrastructure.config.context import OrionContext
def load(context: OrionContext, df: pd.DataFrame) -> None:
context.catalog.save("clientes_processados", df)
context.logger.info(f"Salvos {len(df)} registros processados")
5. Criar pipeline
meu_pipeline/pipeline.py
from ...application.pipeline.builder import PipelineBuilder
from .nodes.extract import extract
from .nodes.transform import transform
from .nodes.load import load
def create_pipeline():
builder = PipelineBuilder("meu_pipeline_clientes")
builder.add_node(extract, inputs=[], outputs=["clientes_raw"])
builder.add_node(transform, inputs=["clientes_raw"], outputs=["clientes_processados"])
builder.add_node(load, inputs=["clientes_processados"], outputs=[])
return builder.build()
6. Executar
orion run \
--module meu_pipeline.pipeline \
--catalog meu_pipeline/catalog.yml
7. Verificar resultado
cat data/processed/clientes_processed.csv