big-data
from pluginagentmarketplace/custom-plugin-data-engineer
Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools
1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill big-dataSKILL.md
Big Data & Distributed Computing
Production-grade big data processing with Apache Spark, distributed systems patterns, and petabyte-scale data engineering.
Quick Start
# PySpark 3.5+ modern DataFrame API
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Initialize Spark with optimal settings
spark = (SparkSession.builder
.appName("ProductionETL")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate())
# Efficient data loading with schema enforcement
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", LongType(), False),
StructField("event_type", StringType(), False),
StructField("timestamp", TimestampType(), False),
StructField("properties", StringType(), True)
])
df = (spark.read
.schema(schema)
.parquet("s3://bucket/events/")
.filter(F.col("timestamp") >= F.current_date() - 30))
# Complex aggregation with window functions
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
result = (df
.withColumn("event_rank", F.row_number().over(window_spec))
.withColumn("session_id", F.sum(
F.when(
F.col("timestamp") - F.lag("timestamp").over(window_spec) > F.expr("INTERVAL 30 MINUTES"),
1
).otherwise(0)
).over(window_spec))
.groupBy("user_id", "session_id")
.agg(
F.count("*").alias("event_count"),
F.min("timestamp").alias("session_start"),
F.max("timestamp").alias("session_end")
))
result.write.mode("overwrite").parquet("s3://bucket/sessions/")
Core Concepts
1. Spark Architecture Deep Dive
┌─────────────────────────────────────────────────────────┐
│ Driver Program │
│ ┌─────────────────────────────────────────────────┐ │
│ │ SparkContext/SparkSession │ │
│ │ - Creates execution plan (DAG) │ │
│ │ - Coordinates with Cluster Manager │ │
│ │ - Schedules tasks │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Cluster Manager (YARN/K8s/Standalone) │
└─────────────────────────────────────────────────────────┘
│
┌────────────────┼────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Executor │ │ Executor │ │ Executor │
│ ┌──────┐ │ │ ┌──────┐ │ │ ┌──────┐ │
│ │Task 1│ │ │ │Task 2│ │ │ │Task 3│ │
│ │Task 4│ │ │ │Task 5│ │ │ │Task 6│ │
│ └──────┘ │ │ └──────┘ │ │ └──────┘ │
│ Cache │ │ Cache │ │ Cache │
└──────────┘ └──────────┘ └──────────┘
2. Partition Optimization
from pyspark.sql import functions as F
# Check current partitioning
print(f"Partitions: {df.rdd.getNumPartitions()}")
# Rule of thumb: 128MB per partition, 2-4 partitions per core
# For 100GB data on 10 executors with 4 cores each:
# 100GB / 128MB ≈ 800 partitions, or 40 cores * 4 = 160 partitions
# Use: 200-400 partitions
# Repartition by key (for joins)
df_repartitioned = df.repartition(200, "user_id")
# Coalesce (reduce partitions without shuffle)
df_coalesced = df.coalesce(100)
# Optimal write partitioning
df.repartition(F.year("date"), F.month("date")) \
.write \
.partitionBy("year", "month") \
.mode("overwrite") \
.parquet("s3://bucket/output/")
# Bucketing for repeated joins
df.write \
.bucketBy(256, "user_id") \
.sortBy("user_id") \
.saveAsTable("bucketed_events")
3. Join Optimization Strategies
from pyspark.sql import functions as F
# Broadcast join (small table < 10MB default, configurable to 100MB)
small_df = spark.read.parquet("s3://bucket/dim_product/") # 5MB
large_df = spark.read.parquet("s3://bucket/fact_sales/") # 500GB
# Explicit broadcast hint
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "product_id")
# Increase broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB
# Sort-Merge Join (for large tables)
# Both tables sorted and partitioned by join key
users = spark.read.parquet("users/").repartition(200, "user_id").sortWithinPartitions("user_id")
orders = spark.read.parquet("orders/").repartition(200, "user_id").sortWithinPartitions("user_id")
result = users.join(orders, "user_id")
# Skewed join handling (salting technique)
# If user_id has sk
...
Repository Stats
Stars1
Forks1
LicenseOther