Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Contributor

The topic of the document

This document describes how the states are stored in memory per each operator, to determine how much memory would be needed to run the application and plan appropriate heap memory for executors.

Memory usage of UnsafeRow object

UnsafeRow is an unsafe implementation of Row, which is backed by raw memory instead of Java objects. The raw object of key and value of state is UnsafeRow, so we may want to understand how UnsafeRow object consumes memory in order to forecast how much the overall states will consume the memory.

Structure of the raw memory

Raw memory is composed to such format: [null bit set] [values] [variable length portion]

  • null bit set
    • used for null tracking
    • aligned to 8 byte word boundaries
    • stores one bit per field
  • values
    • store one 8 byte word per field
    • if the type of field is a fixed-length primitive type (long, double, int, etc)
      • the value is stored directly
    • if the type of field is non-primitive or variable-length values
      • the value is reference information for actual values in raw memory
      • higher 32 bits: relative offset (starting from base offset)
      • lower 32 bits: size of actual values
  • variable length portion
    • raw format of values on non-primitive or variable-length fields are placed here

For example, suppose the fields in schema are (int, double, string backed by 1000 bytes of byte array), then the length of underlying raw memory would be (8 + 8 + 8 + 8 + 1000) = 1032.

I have tested various kinds of schema of UnsafeRow with Spark 2.4.0-SNAPSHOT (based on https://github.com/apache/spark/commit/fa2ae9d2019f839647d17932d8fea769e7622777😞

https://gist.github.com/HeartSaVioR/556ea7db6740fa2fce7dae72a75d9618

Please note the difference between the size of row object and size of copied row object. While UnsafeRow doesn't allow updating variable-length values so there's no issue regarding this, some internal row implementations expands underlying raw memory to multiply of current size when there's no enough space on that.

Key/value format of state for operators

NOTE: The format is based on Spark 2.3.0, and related to implementation details so it may subject to change.

StateStoreSaveExec

  • Operations: agg(), count(), mean(), max(), avg(), min(), sum()
  • Format of key-value
    • key: UnsafeRow containing group-by fields
    • value: UnsafeRow containing key fields and another fields for aggregation results
  • Note: avg() (and aliases of avg) will take 2 fields - sum and count - to store to value UnsafeRow

FlatMapGroupsWithStateExec

  • Operations: mapGroupsWithState() / flatMapGroupsWithState()
  • Format of key-value
    • key: UnsafeRow containing group-by fields
    • value: UnsafeRow containing fields for intermediate data schema defined as end users
    • no longer containing redundant key part in value

StreamingDeduplicateExec

  • Operations: dropDuplicates()
  • Format of key-value
    • key: UnsafeRow containing fields specified for deduplication (parameters)
    • value: EMPTY_ROW defined in StreamingDeduplicateExec
    • UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
    • null reference: 16 bytes

StreamingSymmetricHashJoinExec

Only applied to stream - stream join, cause stream - static join doesn't require state.

Spark maintains two individual states per each input side: keyWithIndexToValue, and keyWithNumValues, to stores multiple input rows which have same values for joining keys.

  • Operations: stream - stream join
  • Format of key-value for keyWithIndexToValue
    • key: UnsafeRow containing joining fields + index number (starting from 0)
    • value: input row itself
  • Format of key-value for keyWithNumValues
    • key: UnsafeRow containing joining fields
    • value: UnsafeRow containing count (only one field)

The impact of storing multiple versions from HDFSBackedStateStoreProvider

HDFSBackedStateStoreProvider (the only implementation of state provider which Apache Spark provides) stores multiple version of states in in-memory hash map. By default, it stores more than 100 versions of states based on the condition how maintenance operation runs.

When map for version N is being loaded, it loads state map for version N - 1, and do shallow copy from map for version N - 1. This behavior brings a couple of things to note:

  1. Actual key and value object (UnsafeRow) will be shared between maps for multiple batches, unless the key is updated to the new value.
    1. We normally expect only value is going to be updated when the key already exists in map, so though the key and value is copied before putting to map, the copied value of key will be put to map only when the key doesn't exist on map.
  2. Heavy updates on state between batches will prevent row objects to be shared, and incurs heavy memory usages due to storing multiple versions.
  3. Map entities and references should be maintained per each version. If there're huge number of key-value pairs in state, they might be possible some kinds of overhead, maybe due to count, not due to size. They are expected to be still much smaller than actual key and value objects.

UPDATE on Spark 2.4

  • SPARK-24717 addressed the concern of storing multiple versions in memory: the default value of state versions in memory will be 2 instead of 100. The default value of state versions in storage will still be 100.
  • SPARK-24763 addressed the concern of storing redundant columns in value for StateStoreSaveExec: the key columns will be removed from the value part in state, which makes state size smaller.
4,483 Views