Over the past year, I’ve been building a fair amount of Spark ETL pipelines at work (via pyspark). The complexity of the pipelines I build have been growing. Evidently, this complexity required a better understanding in Spark’s inner workings.
After a lot of reading, youtube videos, and docs, I think I have a beter grasp on Spark’s memory model. A lot of the information online can be quite confusing. And frankly, incorrect or out of date. That being said, I myself, may be incorrect and would love feedback. As I continue to work on optimizing my spark pipelines, hopefully I pick up more information and correct anything that may be wrong.
What does the executor container’s memory look like?
On a very high level, the executor container’s memory can be segmented into the following sections:
- JVM memory
- Heap
- Off-heap
- Non-JVM memory
Notice that in the above sentence, I italize the word “container”. A source of my confusion in the executor’s memory model was the spark.executor.memory
parameter in conjunction with the other memory parameters (e.g. spark.executor.memoryOverhead
). Specifying spark.executor.memory = 4g
results in allocating 4 GB of memory for the JVM heap.
JVM memory
JVM memory contains Heap and Off-Heap memory. Which looks as follows:
Heap
Spark utilizes the heap for most of its operations. Spark’s heap uasge looks as follows:
There are three main segments:
- Spark memory
- Execution - used for computation during task execution (e.g. for shuffles, joins, etc)
- Storage - used for storing cached data
- User memory - used for storing user defined data structures, UDFs, etc
- Reserved memory - reserved for the system to store Spark’s internal objects
Off-heap
Spark’s off heap memory is simply the jvm off-heap which is configured via spark.memory.offHeap.size
. It is disabled by default and can be enabled via the spark.memory.offHeap.enabled
param.
Although off-heap usage is disabled, it’s still possible for spark to use off-heap memory via the memory overhead (see below). This is due to spark.network.io.preferDirectBufs
which is enabled by default. What this flag does is allows off-heap buffers to be used during shuffles and cache block transfers.
Non-JVM memory
The non-JVM memory looks as follows:
There are two segments:
- memory overhead
- pyspark memory
I find this area the most confusing since the documentation is rather light and online resources don’t cover it well.
Memory Overhead
The memory overhead, according to the Spark docs, “accounts for things like VM overheads, interned strings, other native overheads, etc”. By default, the memory overhead grows in proportion to the executor memory.
Pyspark Memory
This is not set by default (via spark.executor.pyspark.memory). This means that the pyspark executor process does not have a memory limit and thus shares the same memory as the overhead.
This is an area of potential OOMs as the pyspark process may try to use more than the available memory overhead. It’s recommended to either set the pyspark memory yourself, or provide enough memory overhead.
So how does one actually set executor container memory correctly?
This is a whole ‘nother beast which I’ll cover in a later post.