Support Questions

Find answers, ask questions, and share your expertise

How do I filter stream by a column using Spark

avatar
Contributor

I have a stream of records that I want to write into HBase. The stream contains different types of records that I need to write to different HBase tables (e.g. a patient record goes to the patient table, an order record goes to the order table). Since the number of tables is finite, I could brute force the application by creating a filter for each table and then handling the resulting streams of records individually. However, I'd like to do something a little more elegant where I create something akin to a Map<ColumnValue, List<Record>>.

1 ACCEPTED SOLUTION

avatar
Guru

@Adam Doyle

If I understand your question correctly, you could try to use a state management function with UpdateStateByKey (http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) where the key is the recordType field (I am assuming this is a String). You would need to have all of the target HBase tables initialized at startup and then you would put the handle object for each table into a map where the key is also the recordType string. The function itself would just have the logic to persist the record by looking up the Hbase table object in the map you created earlier. The you just create the Hbase Put object by looping through the columns and values of the record and then executing the Put to the table you got from the Table Map. The stateful function is typically used to keep a running aggregate. However, because it actually partitions the DStream (I believe by creating separate DStreams) based on the key you provide it should allow you to write generic logic where you lookup the specifics (like target table and columns) at run time.

View solution in original post

2 REPLIES 2

avatar
Guru

Ultimately in Spark if you want to write to different tables you will probably end up writing different DataFrames / RDDs from the filter. Alternatively, you could write to multiple Hbase tables based on a column within a mapPartitions call, but you may have issues with achieving nice batch sizes for each Hbase write. That way you can have a generic HBase function that is parameterised by column.

If your use case is simply routing a stream of records from another source to paramterised locations in Hbase, NiFi may actually be a better fit, but if you are doing significant or complex batch processing before landing into HBase the Spark approach will help.

avatar
Guru

@Adam Doyle

If I understand your question correctly, you could try to use a state management function with UpdateStateByKey (http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams) where the key is the recordType field (I am assuming this is a String). You would need to have all of the target HBase tables initialized at startup and then you would put the handle object for each table into a map where the key is also the recordType string. The function itself would just have the logic to persist the record by looking up the Hbase table object in the map you created earlier. The you just create the Hbase Put object by looping through the columns and values of the record and then executing the Put to the table you got from the Table Map. The stateful function is typically used to keep a running aggregate. However, because it actually partitions the DStream (I believe by creating separate DStreams) based on the key you provide it should allow you to write generic logic where you lookup the specifics (like target table and columns) at run time.