Apache Spark Fundamentals
Spark is a distributed compute engine that parallelizes operations over data partitions. It uses an abstraction called the **DataFrame** (built on top of RDDs) which allows for declarative query optimization.
The Execution Model: Catalyst and Tungsten
Spark does not execute code as written. It passes transformations through the **Catalyst Optimizer**:
1. **Logical Plan**: A tree representation of the computation.
2. **Physical Plan**: Selection of the best strategy (e.g., Broadcast Hash Join vs. Sort Merge Join).
3. **Whole-Stage Code Generation (Tungsten)**: Generates optimized JVM bytecode to collapse multiple operators into a single function, reducing function call overhead and improving CPU cache locality.
Partitions and the Shuffle Problem
- **Partitions**: The unit of parallelism. Data is divided into chunks (typically 128MB).
- **Narrow Transformations**: Operations like `map`, `filter`, or `select` that happen within a partition. Low cost.
- **Wide Transformations (Shuffles)**: Operations like `groupBy`, `join`, or `distinct` that require moving data across the network to new partitions. This is the primary performance bottleneck.
Concrete Example: Handling Data Skew with Salting
Data skew occurs when one partition has significantly more records than others (e.g., joining orders on a popular `product_id`). This leads to a single executor OOMing while others sit idle.
**The Salting Strategy**:
1. Add a random "salt" to the join key on the skewed side.
2. Replicate the non-skewed side to match the salt range.
```python
from pyspark.sql import functions as F
import random
Skewed Table: orders (key: product_id)
Non-Skewed Table: products (key: product_id)
SALT_RANGE = 10
1. Salt the skewed side
skewed_df = orders.withColumn("salt", (F.rand() * SALT_RANGE).cast("int"))
skewed_df = skewed_df.withColumn("salted_key", F.concat(F.col("product_id"), F.lit("_"), F.col("salt")))
2. Replicate the non-skewed side
Create a dataframe with numbers 0 to SALT_RANGE-1
salt_df = spark.range(SALT_RANGE).withColumnRenamed("id", "salt")
Explode the products table so every product exists for every salt
replicated_products = products.crossJoin(salt_df)
replicated_products = replicated_products.withColumn("salted_key",
F.concat(F.col("product_id"), F.lit("_"), F.col("salt")))
3. Join on the salted key
result = skewed_df.join(replicated_products, "salted_key")
```
Performance Tuning Checklist
- **Broadcast Joins**: Use `F.broadcast(small_df)` for tables under ~100MB to avoid shuffles.
- **Adaptive Query Execution (AQE)**: Ensure `spark.sql.adaptive.enabled=true`. It dynamically coalesces partitions and optimizes join strategies at runtime.
- **Shuffle Partition Tuning**: Set `spark.sql.shuffle.partitions` to 2-3x the number of cores, or let AQE handle it via `spark.sql.adaptive.coalescePartitions.enabled`.
- **Serialization**: Use Kryo serialization (`spark.serializer=org.apache.spark.serializer.KryoSerializer`) for faster data movement.
Memory Management
Spark splits executor memory into:
- **Storage Memory**: For cached data (`.cache()`, `.persist()`).
- **Execution Memory**: For shuffles, joins, and aggregations.
- **User Memory**: For user-defined objects and data structures.
- **Reserved Memory**: Fixed overhead (300MB).
If you see `ExecutorLost` or `OOM` errors, check the **Spark UI Storage Tab** to see if cached data is starving the execution memory.
Summary of Technical implementation added
- Added internal details on Catalyst and Tungsten.
- Detailed the Shuffle mechanic.
- Provided a full Python (PySpark) example of the **Salting** pattern to fix data skew.
- Included specific configuration keys for performance tuning (`AQE`, `Kryo`).
- Defined the memory layout components.