Support Questions

Find answers, ask questions, and share your expertise

Need to parse the updated records from RDBMS to Hive using NiFi.

avatar
Contributor

My Use Case is I need to load the data onto hive tables initially from Source RDBMS. And there after I will be getting updated records with Last Modified Date as a key from the source to Hive. Now I want to parse just the updated records and update it in the hive table. I don't need any history data at destination. And I want NiFi to do this stuff but not sqoop. Can you please recommend the best practice to do this ?

@Matt Burgess .

1 ACCEPTED SOLUTION

avatar
Master Guru

@Sai Krishna Makineni

For this use case there are couple of ways we can acheive

Method 1:-

Storing data into Hbase table using Primary key of RDBMS table as Row Key:-

Once you pull all the data from RDBMS with NiFi processors(executesql,Querydatabasetable..etc) we are going to have output from the processors in Avro format.

You can use ConvertAvroToJson processor and then use SplitJson Processor to split each record from array of json records.

Store all the records in Hbase table having Rowkey as the Primary key in the RDBMS table.

As when we get incremental load based on Last Modified Date field we are going to have updated records and newly added records from the RDBMS table.

If we got update for the existing rowkey then Hbase will overwrite the existing data for that record, for newly added records Hbase will add them as a new record in the table.

Then by using Hive-Hbase integration you can get the Hbase table data exposed using Hive.

https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

By using this method we are going to have Hbase table that will take care of all the upsert operations and we cannot expect same performance from hive-hbase table vs native hive table will perform faster,as hbase tables are not meant for sql kind of queries, hbase table is most efficient if you are accessing data based on Rowkey,

if we are going to have millions of records then we need to do some tuning to the hive queries

https://stackoverflow.com/questions/30074734/tuning-hive-queries-that-uses-underlying-hbase-table

Method 2:-

Store data into Hive then run deduplication:-

In this method you can pull all the incremental data from RDBMS and store the data into HDFS location in ORC format.

Create table on top the HDFS location after ingestion completes then run DeDuplication every time after ingestion gets completed, by using window function based on Last Modified Date and get only the new records for the specific key.

Write the final deduplicated data into target table, So our final table will be having only the newest set of records.

if we are going to have millions of records then this deduplication will be heavy process because everytime we are going to do deduplication on full dataset.

Once we deduplicated and stored in target table, then you will get good performance for the table compared to Hive-Hbase table.

As you can choose which way will be better suits for your case..!!

View solution in original post

4 REPLIES 4

avatar
Master Guru

@Sai Krishna Makineni

For this use case there are couple of ways we can acheive

Method 1:-

Storing data into Hbase table using Primary key of RDBMS table as Row Key:-

Once you pull all the data from RDBMS with NiFi processors(executesql,Querydatabasetable..etc) we are going to have output from the processors in Avro format.

You can use ConvertAvroToJson processor and then use SplitJson Processor to split each record from array of json records.

Store all the records in Hbase table having Rowkey as the Primary key in the RDBMS table.

As when we get incremental load based on Last Modified Date field we are going to have updated records and newly added records from the RDBMS table.

If we got update for the existing rowkey then Hbase will overwrite the existing data for that record, for newly added records Hbase will add them as a new record in the table.

Then by using Hive-Hbase integration you can get the Hbase table data exposed using Hive.

https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

By using this method we are going to have Hbase table that will take care of all the upsert operations and we cannot expect same performance from hive-hbase table vs native hive table will perform faster,as hbase tables are not meant for sql kind of queries, hbase table is most efficient if you are accessing data based on Rowkey,

if we are going to have millions of records then we need to do some tuning to the hive queries

https://stackoverflow.com/questions/30074734/tuning-hive-queries-that-uses-underlying-hbase-table

Method 2:-

Store data into Hive then run deduplication:-

In this method you can pull all the incremental data from RDBMS and store the data into HDFS location in ORC format.

Create table on top the HDFS location after ingestion completes then run DeDuplication every time after ingestion gets completed, by using window function based on Last Modified Date and get only the new records for the specific key.

Write the final deduplicated data into target table, So our final table will be having only the newest set of records.

if we are going to have millions of records then this deduplication will be heavy process because everytime we are going to do deduplication on full dataset.

Once we deduplicated and stored in target table, then you will get good performance for the table compared to Hive-Hbase table.

As you can choose which way will be better suits for your case..!!

avatar
Contributor

@Shu Do you have any idea about Joins in HBase without moving the data to Hive. From the above said scenario 1 I want to join different tables in HBase only but not going to Hive. Seriously method 1 is an awesome approach so far.

avatar
Master Guru

@Sai Krishna Makineni
Option1:-

In hbase as we can have lookup joins by denormalizing all the table while storing into Hbase.

Let's take you are having employee table as employeeid 1, name hcc, salary 1000, departmentid 100 and we need to have department name needs to be added to the employee record.

so we need to have departments table will having departmentid as rowkey

let's consider we are having departmentid 100 departmentname kb,

Once we got employee record in avro format then use convertAvroToJson processor, we can extract all the json values and keep as attributes and by using fetchHbase processor we can look for departmentid and extract the departmentname keep it as attribute. then by using AttributesToJson processor we can recreate the final record as follows

{"employeeid" :"1", "name": "hcc", "salary" :"1000","departmentid": "100","departmentname" :"kb"}

For more reference:-

http://hbase.apache.org/0.94/book/joins.html

Option2:-

Phoenix:-

You can create phoenix table will uses indexing on top of Hbase table,queries will be performed way faster compared to hive as you can compare performance between Hive-Hbase vs Phoenix-Hbase.

https://phoenix.apache.org/performance.html

https://stackoverflow.com/questions/29405629/apache-phoenix-vs-hive-spark

https://www.quora.com/How-does-Apache-Phoenix-reduce-latency-over-Hive

https://stackoverflow.com/questions/28360888/apache-phoenix-vs-hbase-native-api

Option3:-

Spark On Hbase:-

Even we can load Hbase tables directly by not creating any phoenix (or) hive tables on top into spark by using Spark on Hbase connector. By using this connector you can load all the required tables into spark then perform all joins in spark.

Please refer below link for more details

https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/

avatar

Hello @Sai Krishna Makineni

Updating the existing records on HDFS/HBase has always been a trivial use case and the implementation vary greatly depending on per use case basis. There have been multiple options mentioned in previous answers and have there pros and cons. Follows my inputs.

1. Storing the data in HBase and exposing it using Hive is a very bad idea! Suddenly firing queries without using RowKey column is a sin and joins are a strict no-no. You need to understand NoSQL databases are not meant for your usual RDBMS like operations. In RDBMS world, you first think of your design and later the queries come into the picture. In NoSQL, first, you think of your queries and then design your schema accordingly. So having your data in HBase and then relying on HBase to deduplicate the data using "upserts" and keeping your data there is the only good thing about that solution. Your "random" SQL operations and joins requirements will be marginalized.

Is there still hope using HBase?

Maybe may not be. This really depends on your data size. And even more on your requirements. For example, are your table a few gigs in size? Is your business OK with a certain level of stale data? If the answers to above questions are YES, you may still can go ahead with using HBase for deduplication, you don't need to do anything, HBase will automatically take care of it by its upsert logic, and can them dump the contents of that HBase table(s) onto HDFS, creating a Hive table on top of that, let's say, once a day. Your data will be a day stale but you will get a far better performance than using HBase+Hive or even HBase+Phoenix solutions.


2. Deduplication using Hive can have its own consequences. Beware! Things can really go out of hand very easily here. Let's say if you are using a window function like RankOver() every time you dump some new file into HDFS and using the "Last Modified Date", picking up the "latest" record and end up keeping it and deleting rest of them. The problem is, with every passing day, your data will only grow in size and the time/resources taken to get this job is going to increase and at a certain point, you may not want that operation to even trigger since either it will eat a substantial amount of your cluster resources or will end up taking a lot of time to get processed, which may not make any business sense to you. And if you have a large table beforehand, I don't think this is even an option.

Can you use Hive for deduplication at all?

Like the first point of using HBase. Totally depends on your use case. If you have a few columns, which are good candidates for being considered as Partitioning Columns or Bucketing columns, you should for sure use them. Let me give you an example! You identify latest records using LastModifiedTS. Also, your table has a column called CreatedTS. You should always use the CreatedTS column as partitioning column if it fits well in the use case. You say what's the benefit? Next time you have a data dump on HDFS from NiFi, simply identify unique CreatedTS values from that table. Now you only pick-up partitions from the existing table which corresponds to these CreatedTS values. You will realize that you are using only a fraction of the existing data for "Upsert" operations using windowing operations as compared to using the entire table for a RankOver. The operation is sustainable over a longer period of time and will take way less time/resources for getting the job done. This is the best option that you can use, if applicable, in your ingestion pattern.