etl-tools

from pluginagentmarketplace/custom-plugin-data-engineer

Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools

1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill etl-tools

SKILL.md

ETL Tools & Data Orchestration

Production-grade data pipeline development with Apache Airflow, dbt, and modern orchestration patterns.

Quick Start

# Apache Airflow 2.8+ TaskFlow API
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pandas as pd

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["alerts@company.com"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
}

@dag(
    dag_id="etl_pipeline_v2",
    schedule="0 2 * * *",  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["production", "etl"],
    default_args=default_args,
    doc_md="""
    ## Daily Sales ETL Pipeline

    Extracts from PostgreSQL, transforms, loads to S3.

    ### Data Quality Checks
    - Row count validation
    - Schema validation
    - Freshness check
    """
)
def etl_pipeline():

    @task
    def extract_sales(execution_date: str = None) -> dict:
        """Extract daily sales from PostgreSQL."""
        hook = PostgresHook(postgres_conn_id="postgres_prod")
        query = """
            SELECT order_id, customer_id, product_id,
                   quantity, unit_price, order_date
            FROM orders
            WHERE order_date = %(date)s
        """
        df = hook.get_pandas_df(query, parameters={"date": execution_date})

        if df.empty:
            raise ValueError(f"No data for {execution_date}")

        return {"path": f"/tmp/extract_{execution_date}.parquet", "count": len(df)}

    @task
    def transform_sales(extract_result: dict) -> dict:
        """Apply business transformations."""
        df = pd.read_parquet(extract_result["path"])

        # Business logic
        df["total_amount"] = df["quantity"] * df["unit_price"]
        df["discount_tier"] = pd.cut(
            df["total_amount"],
            bins=[0, 100, 500, float("inf")],
            labels=["small", "medium", "large"]
        )

        output_path = extract_result["path"].replace("extract", "transform")
        df.to_parquet(output_path, index=False)

        return {"path": output_path, "count": len(df)}

    @task
    def load_to_s3(transform_result: dict, execution_date: str = None) -> str:
        """Load to S3 with partitioning."""
        s3_hook = S3Hook(aws_conn_id="aws_prod")

        s3_key = f"sales/year={execution_date[:4]}/month={execution_date[5:7]}/day={execution_date[8:10]}/data.parquet"

        s3_hook.load_file(
            filename=transform_result["path"],
            key=s3_key,
            bucket_name="data-lake-prod",
            replace=True
        )

        return f"s3://data-lake-prod/{s3_key}"

    @task
    def validate_load(s3_path: str) -> bool:
        """Validate data was loaded correctly."""
        s3_hook = S3Hook(aws_conn_id="aws_prod")

        # Check file exists and has content
        key = s3_path.replace("s3://data-lake-prod/", "")
        metadata = s3_hook.get_key(key, bucket_name="data-lake-prod")

        if metadata.content_length < 100:
            raise ValueError(f"File too small: {metadata.content_length} bytes")

        return True

    # DAG flow
    extracted = extract_sales()
    transformed = transform_sales(extracted)
    loaded = load_to_s3(transformed)
    validate_load(loaded)

# Instantiate DAG
etl_pipeline()

Core Concepts

1. Airflow Architecture

┌─────────────────────────────────────────────────────────────┐
│                        Airflow Architecture                  │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │  Scheduler   │───▶│   Executor   │───▶│   Workers    │  │
│  │              │    │ (Celery/K8s) │    │              │  │
│  └──────────────┘    └──────────────┘    └──────────────┘  │
│         │                                       │           │
│         ▼                                       ▼           │
│  ┌──────────────┐                       ┌──────────────┐   │
│  │   Metadata   │                       │    Logs      │   │
│  │   Database   │                       │   Storage    │   │
│  │  (Postgres)  │                       │    (S3)      │   │
│  └──────────────┘                       └──────────────┘   │
│         │                                                   │
│         ▼                                                   │
│  ┌──────────────┐                                          │
│  │   Webserver  │  ← UI for monitoring                     │
│  └──────────────┘                                          │
│                                                              │
└──────────────────────────

...
Read full content

Repository Stats

Stars1
Forks1
LicenseOther