data-engineering

from pluginagentmarketplace/custom-plugin-data-engineer

Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools

1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill data-engineering

SKILL.md

Data Engineering Fundamentals

Core data engineering concepts, patterns, and practices for building production data platforms.

Quick Start

# Production Data Pipeline Pattern
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    source: str
    destination: str
    batch_size: int = 10000
    retry_count: int = 3

class DataPipeline:
    """Production-ready data pipeline with error handling."""

    def __init__(self, config: PipelineConfig):
        self.config = config
        self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}

    def extract(self) -> Generator[dict, None, None]:
        """Extract data in batches from source."""
        logger.info(f"Extracting from {self.config.source}")
        offset = 0
        while True:
            batch = self._fetch_batch(offset, self.config.batch_size)
            if not batch:
                break
            self.metrics["extracted"] += len(batch)
            yield batch
            offset += self.config.batch_size

    def transform(self, batch: list[dict]) -> list[dict]:
        """Apply transformations with validation."""
        transformed = []
        for record in batch:
            try:
                cleaned = self._clean_record(record)
                enriched = self._enrich_record(cleaned)
                transformed.append(enriched)
            except Exception as e:
                logger.warning(f"Transform error: {e}")
                self.metrics["errors"] += 1
        self.metrics["transformed"] += len(transformed)
        return transformed

    def load(self, batch: list[dict]) -> None:
        """Load to destination with retry logic."""
        for attempt in range(self.config.retry_count):
            try:
                self._write_batch(batch)
                self.metrics["loaded"] += len(batch)
                return
            except Exception as e:
                if attempt == self.config.retry_count - 1:
                    raise
                logger.warning(f"Load attempt {attempt + 1} failed: {e}")

    def run(self) -> dict:
        """Execute full ETL pipeline."""
        start_time = datetime.now()
        logger.info("Pipeline started")

        for batch in self.extract():
            transformed = self.transform(batch)
            if transformed:
                self.load(transformed)

        duration = (datetime.now() - start_time).total_seconds()
        self.metrics["duration_seconds"] = duration
        logger.info(f"Pipeline completed: {self.metrics}")
        return self.metrics

Core Concepts

1. Data Architecture Patterns

┌─────────────────────────────────────────────────────────────────┐
│                    Modern Data Architecture                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  Sources          Ingestion        Storage         Consumption  │
│  ────────         ─────────        ───────         ───────────  │
│  ┌──────┐        ┌─────────┐      ┌───────┐       ┌──────────┐ │
│  │ APIs │───────▶│ Airbyte │─────▶│ Raw   │──────▶│ BI Tools │ │
│  │ DBs  │        │ Fivetran│      │ Layer │       │ Dashboards│ │
│  │ Files│        │ Kafka   │      │ (S3)  │       └──────────┘ │
│  │ SaaS │        └─────────┘      └───┬───┘                    │
│  └──────┘                             │                         │
│                                       ▼                         │
│                               ┌───────────────┐                 │
│                               │  Transform    │                 │
│                               │  (dbt/Spark)  │                 │
│                               └───────┬───────┘                 │
│                                       │                         │
│                                       ▼                         │
│                               ┌───────────────┐   ┌──────────┐ │
│                               │   Warehouse   │──▶│ ML/AI    │ │
│                               │  (Snowflake)  │   │ Pipelines│ │
│                               └───────────────┘   └──────────┘ │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

2. ETL vs ELT

# ETL Pattern (Transform before load)
# Best for: Sensitive data, complex transformations, limited storage

def etl_pipeline():
    raw_data = extract_from_source()
    cleaned_data = transform_and_clean(raw_data)  # Transform first
    load_to_destination(cleaned_data)

# ELT Pattern (Load then transform)
# Best for: Cloud warehouses, large scale, exploratory analysis

def elt_pipeline():
    raw_data = extract_from_source()
    load_to_staging(raw_data)  # Load raw first

...
Read full content

Repository Stats

Stars1
Forks1
LicenseOther