Quick Start Guide

Get Started with Orion

Learn how to set up your environment, create a simple pipeline and run your first data flow with Orion Framework in minutes.

Guia de Início Rápido

Começando com Orion

Aprenda a configurar o ambiente, criar uma pipeline simples e executar seu primeiro fluxo de dados com o Orion Framework em poucos minutos.

Prerequisites

  • Python 3.8 or higher
  • pip

Installation

# Clone or navigate to the project directory
cd orion_framework

# Install basic dependencies
pip install pandas pyyaml click

# (Optional) If using Databricks
pip install databricks-sql-connector

Example 1 · Simple CSV Pipeline

1. Create directory structure

mkdir -p meu_pipeline/nodes
mkdir -p data/raw data/processed

2. Create catalog.yml

# meu_pipeline/catalog.yml
clientes_raw:
  type: local_csv
  path: data/raw/clientes.csv

clientes_processados:
  type: local_csv
  path: data/processed/clientes_processed.csv

3. Create sample data

echo "nome,email,idade
João Silva,joao@email.com,30
Maria Santos,maria@email.com,25
Pedro Costa,pedro@email.com,35" > data/raw/clientes.csv

4. Create nodes

meu_pipeline/nodes/extract.py

import pandas as pd
from ...infrastructure.config.context import OrionContext

def extract(context: OrionContext) -> pd.DataFrame:
    df = context.catalog.load("clientes_raw")
    context.logger.info(f"Extraídos {len(df)} registros")
    return df

meu_pipeline/nodes/transform.py

import pandas as pd
from ...infrastructure.config.context import OrionContext

def transform(context: OrionContext, df: pd.DataFrame) -> pd.DataFrame:
    context.logger.info(f"Transformando {len(df)} registros")
    df.columns = [c.strip().lower() for c in df.columns]
    df["idade_categoria"] = df["idade"].apply(
        lambda x: "jovem" if x < 30 else "adulto"
    )
    return df

meu_pipeline/nodes/load.py

import pandas as pd
from ...infrastructure.config.context import OrionContext

def load(context: OrionContext, df: pd.DataFrame) -> None:
    context.catalog.save("clientes_processados", df)
    context.logger.info(f"Salvos {len(df)} registros processados")

5. Create pipeline

meu_pipeline/pipeline.py

from ...application.pipeline.builder import PipelineBuilder
from .nodes.extract import extract
from .nodes.transform import transform
from .nodes.load import load

def create_pipeline():
    builder = PipelineBuilder("meu_pipeline_clientes")
    builder.add_node(extract, inputs=[], outputs=["clientes_raw"])
    builder.add_node(transform, inputs=["clientes_raw"], outputs=["clientes_processados"])
    builder.add_node(load, inputs=["clientes_processados"], outputs=[])
    return builder.build()

6. Execute

orion run \
  --module meu_pipeline.pipeline \
  --catalog meu_pipeline/catalog.yml

7. Check result

cat data/processed/clientes_processed.csv

Pré-requisitos

  • Python 3.8 ou superior
  • pip

Instalação

# Clone ou navegue até o diretório do projeto
cd orion_framework

# Instale dependências básicas
pip install pandas pyyaml click

# (Opcional) Se for usar Databricks
pip install databricks-sql-connector

Exemplo 1 · Pipeline CSV simples

1. Criar estrutura de diretórios

mkdir -p meu_pipeline/nodes
mkdir -p data/raw data/processed

2. Criar catalog.yml

# meu_pipeline/catalog.yml
clientes_raw:
  type: local_csv
  path: data/raw/clientes.csv

clientes_processados:
  type: local_csv
  path: data/processed/clientes_processed.csv

3. Criar dados de exemplo

echo "nome,email,idade
João Silva,joao@email.com,30
Maria Santos,maria@email.com,25
Pedro Costa,pedro@email.com,35" > data/raw/clientes.csv

4. Criar nodes

meu_pipeline/nodes/extract.py

import pandas as pd
from ...infrastructure.config.context import OrionContext

def extract(context: OrionContext) -> pd.DataFrame:
    df = context.catalog.load("clientes_raw")
    context.logger.info(f"Extraídos {len(df)} registros")
    return df

meu_pipeline/nodes/transform.py

import pandas as pd
from ...infrastructure.config.context import OrionContext

def transform(context: OrionContext, df: pd.DataFrame) -> pd.DataFrame:
    context.logger.info(f"Transformando {len(df)} registros")
    df.columns = [c.strip().lower() for c in df.columns]
    df["idade_categoria"] = df["idade"].apply(
        lambda x: "jovem" if x < 30 else "adulto"
    )
    return df

meu_pipeline/nodes/load.py

import pandas as pd
from ...infrastructure.config.context import OrionContext

def load(context: OrionContext, df: pd.DataFrame) -> None:
    context.catalog.save("clientes_processados", df)
    context.logger.info(f"Salvos {len(df)} registros processados")

5. Criar pipeline

meu_pipeline/pipeline.py

from ...application.pipeline.builder import PipelineBuilder
from .nodes.extract import extract
from .nodes.transform import transform
from .nodes.load import load

def create_pipeline():
    builder = PipelineBuilder("meu_pipeline_clientes")
    builder.add_node(extract, inputs=[], outputs=["clientes_raw"])
    builder.add_node(transform, inputs=["clientes_raw"], outputs=["clientes_processados"])
    builder.add_node(load, inputs=["clientes_processados"], outputs=[])
    return builder.build()

6. Executar

orion run \
  --module meu_pipeline.pipeline \
  --catalog meu_pipeline/catalog.yml

7. Verificar resultado

cat data/processed/clientes_processed.csv