Created on 06-25-2018 03:11 AM
This document describes how the states are stored in memory per each operator, to determine how much memory would be needed to run the application and plan appropriate heap memory for executors.
UnsafeRow is an unsafe implementation of Row, which is backed by raw memory instead of Java objects. The raw object of key and value of state is UnsafeRow, so we may want to understand how UnsafeRow object consumes memory in order to forecast how much the overall states will consume the memory.
Raw memory is composed to such format: [null bit set] [values] [variable length portion]
For example, suppose the fields in schema are (int, double, string backed by 1000 bytes of byte array), then the length of underlying raw memory would be (8 + 8 + 8 + 8 + 1000) = 1032.
I have tested various kinds of schema of UnsafeRow with Spark 2.4.0-SNAPSHOT (based on https://github.com/apache/spark/commit/fa2ae9d2019f839647d17932d8fea769e7622777😞
https://gist.github.com/HeartSaVioR/556ea7db6740fa2fce7dae72a75d9618
Please note the difference between the size of row object and size of copied row object. While UnsafeRow doesn't allow updating variable-length values so there's no issue regarding this, some internal row implementations expands underlying raw memory to multiply of current size when there's no enough space on that.
NOTE: The format is based on Spark 2.3.0, and related to implementation details so it may subject to change.
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
Only applied to stream - stream join, cause stream - static join doesn't require state.
Spark maintains two individual states per each input side: keyWithIndexToValue, and keyWithNumValues, to stores multiple input rows which have same values for joining keys.
HDFSBackedStateStoreProvider (the only implementation of state provider which Apache Spark provides) stores multiple version of states in in-memory hash map. By default, it stores more than 100 versions of states based on the condition how maintenance operation runs.
When map for version N is being loaded, it loads state map for version N - 1, and do shallow copy from map for version N - 1. This behavior brings a couple of things to note: