big-data

from pluginagentmarketplace/custom-plugin-data-engineer

Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools

1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill big-data

SKILL.md

Big Data & Distributed Computing

Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.

Quick Start

# PySpark 3.5+ modern DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark with optimal settings
spark = (SparkSession.builder
    .appName("ProductionETL")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate())

# Efficient data loading with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", LongType(), False),
    StructField("event_type", StringType(), False),
    StructField("timestamp", TimestampType(), False),
    StructField("properties", StringType(), True)
])

df = (spark.read
    .schema(schema)
    .parquet("s3://bucket/events/")
    .filter(F.col("timestamp") >= F.current_date() - 30))

# Complex aggregation with window functions
window_spec = Window.partitionBy("user_id").orderBy("timestamp")

result = (df
    .withColumn("event_rank", F.row_number().over(window_spec))
    .withColumn("session_id", F.sum(
        F.when(
            F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"),
            1
        ).otherwise(0)
    ).over(window_spec))
    .groupBy("user_id", "session_id")
    .agg(
        F.count("*").alias("event_count"),
        F.min("timestamp").alias("session_start"),
        F.max("timestamp").alias("session_end")
    ))

result.write.mode("overwrite").parquet("s3://bucket/sessions/")

Core Concepts

1. Spark Architecture Deep Dive

┌─────────────────────────────────────────────────────────┐
│                    Driver Program                        │
│  ┌─────────────────────────────────────────────────┐   │
│  │              SparkContext/SparkSession           │   │
│  │  - Creates execution plan (DAG)                  │   │
│  │  - Coordinates with Cluster Manager              │   │
│  │  - Schedules tasks                               │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────┐
│              Cluster Manager (YARN/K8s/Standalone)      │
└─────────────────────────────────────────────────────────┘
                           │
          ┌────────────────┼────────────────┐
          ▼                ▼                ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Executor │    │ Executor │    │ Executor │
    │ ┌──────┐ │    │ ┌──────┐ │    │ ┌──────┐ │
    │ │Task 1│ │    │ │Task 2│ │    │ │Task 3│ │
    │ │Task 4│ │    │ │Task 5│ │    │ │Task 6│ │
    │ └──────┘ │    │ └──────┘ │    │ └──────┘ │
    │  Cache   │    │  Cache   │    │  Cache   │
    └──────────┘    └──────────┘    └──────────┘

2. Partition Optimization

from pyspark.sql import functions as F

# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")

# Rule of thumb: 128MB per partition, 2-4 partitions per core
# For 100GB data on 10 executors with 4 cores each:
# 100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions
# Use: 200-400 partitions

# Repartition by key (for joins)
df_repartitioned = df.repartition(200, "user_id")

# Coalesce (reduce partitions without shuffle)
df_coalesced = df.coalesce(100)

# Optimal write partitioning
df.repartition(F.year("date"), F.month("date")) \
  .write \
  .partitionBy("year", "month") \
  .mode("overwrite") \
  .parquet("s3://bucket/output/")

# Bucketing for repeated joins
df.write \
  .bucketBy(256, "user_id") \
  .sortBy("user_id") \
  .saveAsTable("bucketed_events")

3. Join Optimization Strategies

from pyspark.sql import functions as F

# Broadcast join (small table < 10MB default, configurable to 100MB)
small_df = spark.read.parquet("s3://bucket/dim_product/")  # 5MB
large_df = spark.read.parquet("s3://bucket/fact_sales/")   # 500GB

# Explicit broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "product_id")

# Increase broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)  # 100MB

# Sort-Merge Join (for large tables)
# Both tables sorted and partitioned by join key
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id")
orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id")
result = users.join(orders, "user_id")

# Skewed join handling (salting technique)
# If user_id has sk

...
Read full content

Repository Stats

Stars1
Forks1
LicenseOther