This document describes how the states are stored in memory per each operator, to determine how much memory would be needed to run the application and plan appropriate heap memory for executors.
Memory usage of UnsafeRow object
UnsafeRow is an unsafe implementation of Row, which is backed by raw memory instead of Java objects. The raw object of key and value of state is UnsafeRow, so we may want to understand how UnsafeRow object consumes memory in order to forecast how much the overall states will consume the memory.
Structure of the raw memory
Raw memory is composed to such format: [null bit set] [values] [variable length portion]
null bit set
used for null tracking
aligned to 8 byte word boundaries
stores one bit per field
store one 8 byte word per field
if the type of field is a fixed-length primitive type (long, double, int, etc)
the value is stored directly
if the type of field is non-primitive or variable-length values
the value is reference information for actual values in raw memory
higher 32 bits: relative offset (starting from base offset)
lower 32 bits: size of actual values
variable length portion
raw format of values on non-primitive or variable-length fields are placed here
For example, suppose the fields in schema are (int, double, string backed by 1000 bytes of byte array), then the length of underlying raw memory would be (8 + 8 + 8 + 8 + 1000) = 1032.
Please note the difference between the size of row object and size of copied row object. While UnsafeRow doesn't allow updating variable-length values so there's no issue regarding this, some internal row implementations expands underlying raw memory to multiply of current size when there's no enough space on that.
Key/value format of state for operators
NOTE: The format is based on Spark 2.3.0, and related to implementation details so it may subject to change.
Only applied to stream - stream join, cause stream - static join doesn't require state.
Spark maintains two individual states per each input side: keyWithIndexToValue, and keyWithNumValues, to stores multiple input rows which have same values for joining keys.
Operations: stream - stream join
Format of key-value for keyWithIndexToValue
key: UnsafeRow containing joining fields + index number (starting from 0)
value: input row itself
Format of key-value for keyWithNumValues
key: UnsafeRow containing joining fields
value: UnsafeRow containing count (only one field)
The impact of storing multiple versions from HDFSBackedStateStoreProvider
HDFSBackedStateStoreProvider (the only implementation of state provider which Apache Spark provides) stores multiple version of states in in-memory hash map. By default, it stores more than 100 versions of states based on the condition how maintenance operation runs.
When map for version N is being loaded, it loads state map for version N - 1, and do shallow copy from map for version N - 1. This behavior brings a couple of things to note:
Actual key and value object (UnsafeRow) will be shared between maps for multiple batches, unless the key is updated to the new value.
We normally expect only value is going to be updated when the key already exists in map, so though the key and value is copied before putting to map, the copied value of key will be put to map only when the key doesn't exist on map.
Heavy updates on state between batches will prevent row objects to be shared, and incurs heavy memory usages due to storing multiple versions.
Map entities and references should be maintained per each version. If there're huge number of key-value pairs in state, they might be possible some kinds of overhead, maybe due to count, not due to size. They are expected to be still much smaller than actual key and value objects.
UPDATE on Spark 2.4
SPARK-24717 addressed the concern of storing multiple versions in memory: the default value of state versions in memory will be 2 instead of 100. The default value of state versions in storage will still be 100.
SPARK-24763 addressed the concern of storing redundant columns in value for StateStoreSaveExec: the key columns will be removed from the value part in state, which makes state size smaller.