Member since
01-12-2019
23
Posts
15
Kudos Received
5
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1345 | 06-29-2017 01:02 AM | |
534 | 03-09-2017 05:07 PM | |
966 | 02-02-2017 03:19 PM | |
1128 | 02-02-2017 03:14 PM | |
903 | 07-07-2016 04:13 PM |
07-03-2018
02:14 AM
Repo Description This project reimplements HDF IoT Trucking Apps with Spark Structured Streaming.
trucking-iot, Hadoop Summit 2017 This project provides couple of sample applications which leverage stream-stream join, window aggregation, deduplication respectively. This project depends on HDP version of Spark, so you may want to modify Spark version before building the project to accomodate your installation of Spark. Apps depend on Truck Geo Event as well as Truck Speed Event, which are well explained in README.md in sam-trucking-data-utils. Apps also assume both truck geo events and truck speed events are ingested into Kafka topics, which can be easily done via sam-trucking-data-utils. Detailed explanation on how to build and run is described in repo's README.md file. Repo Info Github Repo URL https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming Github account name HeartSaVioR Repo name iot-trucking-app-spark-structured-streaming
... View more
Labels:
06-25-2018
03:11 AM
1 Kudo
The topic of the document
This document describes how the states are stored in memory per each operator, to determine how much memory would be needed to run the application and plan appropriate heap memory for executors.
Memory usage of UnsafeRow object
UnsafeRow is an unsafe implementation of Row, which is backed by raw memory instead of Java objects. The raw object of key and value of state is UnsafeRow, so we may want to understand how UnsafeRow object consumes memory in order to forecast how much the overall states will consume the memory. Structure of the raw memory
Raw memory is composed to such format: [null bit set] [values] [variable length portion]
null bit set
used for null tracking
aligned to 8 byte word boundaries
stores one bit per field
values
store one 8 byte word per field
if the type of field is a fixed-length primitive type (long, double, int, etc)
the value is stored directly
if the type of field is non-primitive or variable-length values
the value is reference information for actual values in raw memory
higher 32 bits: relative offset (starting from base offset)
lower 32 bits: size of actual values
variable length portion
raw format of values on non-primitive or variable-length fields are placed here
For example, suppose the fields in schema are (int, double, string backed by 1000 bytes of byte array), then the length of underlying raw memory would be (8 + 8 + 8 + 8 + 1000) = 1032.
I have tested various kinds of schema of UnsafeRow with Spark 2.4.0-SNAPSHOT (based on https://github.com/apache/spark/commit/fa2ae9d2019f839647d17932d8fea769e7622777 😞
https://gist.github.com/HeartSaVioR/556ea7db6740fa2fce7dae72a75d9618
Please note the difference between the size of row object and size of copied row object. While UnsafeRow doesn't allow updating variable-length values so there's no issue regarding this, some internal row implementations expands underlying raw memory to multiply of current size when there's no enough space on that. Key/value format of state for operators NOTE: The format is based on Spark 2.3.0, and related to implementation details so it may subject to change. StateStoreSaveExec
Operations: agg(), count(), mean(), max(), avg(), min(), sum()
Format of key-value
key: UnsafeRow containing group-by fields
value: UnsafeRow containing key fields and another fields for aggregation results
Note: avg() (and aliases of avg) will take 2 fields - sum and count - to store to value UnsafeRow FlatMapGroupsWithStateExec
Operations: mapGroupsWithState() / flatMapGroupsWithState()
Format of key-value
key: UnsafeRow containing group-by fields
value: UnsafeRow containing fields for intermediate data schema defined as end users
no longer containing redundant key part in value StreamingDeduplicateExec
Operations: dropDuplicates()
Format of key-value
key: UnsafeRow containing fields specified for deduplication (parameters)
value: EMPTY_ROW defined in StreamingDeduplicateExec
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
null reference: 16 bytes StreamingSymmetricHashJoinExec
Only applied to stream - stream join, cause stream - static join doesn't require state.
Spark maintains two individual states per each input side: keyWithIndexToValue, and keyWithNumValues, to stores multiple input rows which have same values for joining keys.
Operations: stream - stream join
Format of key-value for keyWithIndexToValue
key: UnsafeRow containing joining fields + index number (starting from 0)
value: input row itself
Format of key-value for keyWithNumValues
key: UnsafeRow containing joining fields
value: UnsafeRow containing count (only one field) The impact of storing multiple versions from HDFSBackedStateStoreProvider
HDFSBackedStateStoreProvider (the only implementation of state provider which Apache Spark provides) stores multiple version of states in in-memory hash map. By default, it stores more than 100 versions of states based on the condition how maintenance operation runs.
When map for version N is being loaded, it loads state map for version N - 1, and do shallow copy from map for version N - 1. This behavior brings a couple of things to note:
Actual key and value object (UnsafeRow) will be shared between maps for multiple batches, unless the key is updated to the new value.
We normally expect only value is going to be updated when the key already exists in map, so though the key and value is copied before putting to map, the copied value of key will be put to map only when the key doesn't exist on map.
Heavy updates on state between batches will prevent row objects to be shared, and incurs heavy memory usages due to storing multiple versions.
Map entities and references should be maintained per each version. If there're huge number of key-value pairs in state, they might be possible some kinds of overhead, maybe due to count, not due to size. They are expected to be still much smaller than actual key and value objects. UPDATE on Spark 2.4
SPARK-24717 addressed the concern of storing multiple versions in memory: the default value of state versions in memory will be 2 instead of 100. The default value of state versions in storage will still be 100. SPARK-24763 addressed the concern of storing redundant columns in value for StateStoreSaveExec: the key columns will be removed from the value part in state, which makes state size smaller.
... View more
Labels:
02-28-2018
03:21 AM
I didn't mean you need to use REST API. As well as storm-hbase dealing with kerberos authentication, custom implementation can deal with it. I just meant that it may not a good thing to generalize in storm-hbase, so you may need to implement custom one based on copying the code and modify.
... View more
02-27-2018
07:44 AM
Current implementation of HBaseBolt requires table name to handle delegation token, hence we may not want to modify storm-hbase to have flexibility of table. If you don't use security, it should be straightforward to implement your own based on current HBaseBolt. Both HBaseBolt and AbstractHBaseBolt require table name as well as mapper which doesn't handle table name, so you may want to just copy and modify the code a bit to remove requirement of table name and let mapper also handles table name as well as initializing HBaseClient instance per table name. If you plan to deal with lots of table, you may also want to ensure that there're only couple of HBaseClient retained at once. Actually this may be the signal you would want to play with HBase API and implement custom Bolt.
... View more
02-27-2018
07:31 AM
Could you share a error part of SAM log? It would be much helpful if you could open the developer console in browser and see which line raises error in console tab. Thanks in advance.
... View more
11-16-2017
12:24 AM
Sorry I missed this, and we've announced HDF 3.0.2 4 days ago. - Announce: https://community.hortonworks.com/articles/147515/hortonworks-data-flow-hdf-version-302-release-anno.html - Document: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.2/index.html Hope this helps! ps. Please vote my answer and mark as selected to make question answered. Thanks!
... View more
10-10-2017
11:13 PM
@yang xiao @Gunar Mueller I provided the patch and now I believe the fix will be provided to HDF 3.0.2. You can still try out workaround via my above instruction, but during placing artifacts you need to check artifacts (jar/pom) are placed along with metafile (_remote.repositories), and the repository name is either 'hwx-public' or 'hwx-private' unless it came from maven public repository. The best bet is pulling artifacts via SAM with internet access, so that artifacts are downloaded to maven local and repository name in _remote.repositories is properly set (important!). Then copies downloaded artifacts (in maven local) to the machines which don’t have internet access.
... View more
10-10-2017
11:10 PM
I believe the fix will be provided to HDF 3.0.2. Before the version comes out, you can try out workaround below:
Please check that there's .m2 directory in 'storm' user's home directory. 1.a. If it exists, please place artifacts to that directory. 1.b. If it doesn't exist, go to 2. Please check that there's 'local-repo' in either installation of SAM (streamline) or Storm directory. 2.a. If it exists, please place artifacts to that directory. 2.b. Please create .m2 directory in 'storm' user's home directory, and apply 1.a. Please note that artifacts (jar/pom) should be placed along with metafile (_remote.repositories), and the repository name should be either 'hwx-public' or 'hwx-private' unless it came from maven public repository. The best bet is pulling artifacts via SAM with internet access, so that artifacts are downloaded to maven local and repository name in _remote.repositories is properly set (important!). Then copies downloaded artifacts (in maven local) to the machines which don’t have internet access.
... View more
09-04-2017
03:10 AM
2 Kudos
Sorry there was a miss on Storm side to not pulling necessary patch (STORM-2598) to HDF 3.0.0 version line of Storm. The patch is also not released via Apache Storm officially (targeted to 1.2.0). Please try out workaround below: Please check that there's .m2 directory in 'storm' user's home directory. 1.a. If it exists, please place artifacts to that directory. 1.b. If it doesn't exist, go to 2. Please check that there's 'local-repo' in either installation of SAM (streamline) or Storm directory. 2.a. If it exists, please place artifacts to that directory. 2.b. Please create .m2 directory in 'storm' user's home directory, and apply 1.a. I just filed the issue and try to provide a patch shortly.
... View more
06-29-2017
01:02 AM
1 Kudo
That was a bug on Storm side, tracked and fixed in HDP 2.6.1.1 and HDF 3.0.0.0. Please upgrade your HDP cluster or use HDF Storm cluster and let me know the problem persists. Unfortunately this is a bug regarding "uploading" artifact which uploaded artifact blobs have bad ACL, so you actually need to delete artifact blobs manually after upgrading.
... View more