etl-tools
from pluginagentmarketplace/custom-plugin-data-engineer
Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools
1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill etl-toolsSKILL.md
ETL Tools & Data Orchestration
Production-grade data pipeline development with Apache Airflow, dbt, and modern orchestration patterns.
Quick Start
# Apache Airflow 2.8+ TaskFlow API
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pandas as pd
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email_on_failure": True,
"email": ["alerts@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
}
@dag(
dag_id="etl_pipeline_v2",
schedule="0 2 * * *", # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["production", "etl"],
default_args=default_args,
doc_md="""
## Daily Sales ETL Pipeline
Extracts from PostgreSQL, transforms, loads to S3.
### Data Quality Checks
- Row count validation
- Schema validation
- Freshness check
"""
)
def etl_pipeline():
@task
def extract_sales(execution_date: str = None) -> dict:
"""Extract daily sales from PostgreSQL."""
hook = PostgresHook(postgres_conn_id="postgres_prod")
query = """
SELECT order_id, customer_id, product_id,
quantity, unit_price, order_date
FROM orders
WHERE order_date = %(date)s
"""
df = hook.get_pandas_df(query, parameters={"date": execution_date})
if df.empty:
raise ValueError(f"No data for {execution_date}")
return {"path": f"/tmp/extract_{execution_date}.parquet", "count": len(df)}
@task
def transform_sales(extract_result: dict) -> dict:
"""Apply business transformations."""
df = pd.read_parquet(extract_result["path"])
# Business logic
df["total_amount"] = df["quantity"] * df["unit_price"]
df["discount_tier"] = pd.cut(
df["total_amount"],
bins=[0, 100, 500, float("inf")],
labels=["small", "medium", "large"]
)
output_path = extract_result["path"].replace("extract", "transform")
df.to_parquet(output_path, index=False)
return {"path": output_path, "count": len(df)}
@task
def load_to_s3(transform_result: dict, execution_date: str = None) -> str:
"""Load to S3 with partitioning."""
s3_hook = S3Hook(aws_conn_id="aws_prod")
s3_key = f"sales/year={execution_date[:4]}/month={execution_date[5:7]}/day={execution_date[8:10]}/data.parquet"
s3_hook.load_file(
filename=transform_result["path"],
key=s3_key,
bucket_name="data-lake-prod",
replace=True
)
return f"s3://data-lake-prod/{s3_key}"
@task
def validate_load(s3_path: str) -> bool:
"""Validate data was loaded correctly."""
s3_hook = S3Hook(aws_conn_id="aws_prod")
# Check file exists and has content
key = s3_path.replace("s3://data-lake-prod/", "")
metadata = s3_hook.get_key(key, bucket_name="data-lake-prod")
if metadata.content_length < 100:
raise ValueError(f"File too small: {metadata.content_length} bytes")
return True
# DAG flow
extracted = extract_sales()
transformed = transform_sales(extracted)
loaded = load_to_s3(transformed)
validate_load(loaded)
# Instantiate DAG
etl_pipeline()
Core Concepts
1. Airflow Architecture
┌─────────────────────────────────────────────────────────────┐
│ Airflow Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Scheduler │───▶│ Executor │───▶│ Workers │ │
│ │ │ │ (Celery/K8s) │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Metadata │ │ Logs │ │
│ │ Database │ │ Storage │ │
│ │ (Postgres) │ │ (S3) │ │
│ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Webserver │ ← UI for monitoring │
│ └──────────────┘ │
│ │
└──────────────────────────
...
Repository Stats
Stars1
Forks1
LicenseOther