data-engineering
from pluginagentmarketplace/custom-plugin-data-engineer
Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools
1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill data-engineeringSKILL.md
Data Engineering Fundamentals
Core data engineering concepts, patterns, and practices for building production data platforms.
Quick Start
# Production Data Pipeline Pattern
from dataclasses import dataclass
from datetime import datetime
from typing import Generator
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
source: str
destination: str
batch_size: int = 10000
retry_count: int = 3
class DataPipeline:
"""Production-ready data pipeline with error handling."""
def __init__(self, config: PipelineConfig):
self.config = config
self.metrics = {"extracted": 0, "transformed": 0, "loaded": 0, "errors": 0}
def extract(self) -> Generator[dict, None, None]:
"""Extract data in batches from source."""
logger.info(f"Extracting from {self.config.source}")
offset = 0
while True:
batch = self._fetch_batch(offset, self.config.batch_size)
if not batch:
break
self.metrics["extracted"] += len(batch)
yield batch
offset += self.config.batch_size
def transform(self, batch: list[dict]) -> list[dict]:
"""Apply transformations with validation."""
transformed = []
for record in batch:
try:
cleaned = self._clean_record(record)
enriched = self._enrich_record(cleaned)
transformed.append(enriched)
except Exception as e:
logger.warning(f"Transform error: {e}")
self.metrics["errors"] += 1
self.metrics["transformed"] += len(transformed)
return transformed
def load(self, batch: list[dict]) -> None:
"""Load to destination with retry logic."""
for attempt in range(self.config.retry_count):
try:
self._write_batch(batch)
self.metrics["loaded"] += len(batch)
return
except Exception as e:
if attempt == self.config.retry_count - 1:
raise
logger.warning(f"Load attempt {attempt + 1} failed: {e}")
def run(self) -> dict:
"""Execute full ETL pipeline."""
start_time = datetime.now()
logger.info("Pipeline started")
for batch in self.extract():
transformed = self.transform(batch)
if transformed:
self.load(transformed)
duration = (datetime.now() - start_time).total_seconds()
self.metrics["duration_seconds"] = duration
logger.info(f"Pipeline completed: {self.metrics}")
return self.metrics
Core Concepts
1. Data Architecture Patterns
┌─────────────────────────────────────────────────────────────────┐
│ Modern Data Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Sources Ingestion Storage Consumption │
│ ──────── ───────── ─────── ─────────── │
│ ┌──────┐ ┌─────────┐ ┌───────┐ ┌──────────┐ │
│ │ APIs │───────▶│ Airbyte │─────▶│ Raw │──────▶│ BI Tools │ │
│ │ DBs │ │ Fivetran│ │ Layer │ │ Dashboards│ │
│ │ Files│ │ Kafka │ │ (S3) │ └──────────┘ │
│ │ SaaS │ └─────────┘ └───┬───┘ │
│ └──────┘ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Transform │ │
│ │ (dbt/Spark) │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ ┌──────────┐ │
│ │ Warehouse │──▶│ ML/AI │ │
│ │ (Snowflake) │ │ Pipelines│ │
│ └───────────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
2. ETL vs ELT
# ETL Pattern (Transform before load)
# Best for: Sensitive data, complex transformations, limited storage
def etl_pipeline():
raw_data = extract_from_source()
cleaned_data = transform_and_clean(raw_data) # Transform first
load_to_destination(cleaned_data)
# ELT Pattern (Load then transform)
# Best for: Cloud warehouses, large scale, exploratory analysis
def elt_pipeline():
raw_data = extract_from_source()
load_to_staging(raw_data) # Load raw first
...
Repository Stats
Stars1
Forks1
LicenseOther