Support Questions

Find answers, ask questions, and share your expertise

How to copy data from a Hive table recurrently using NIFI?

avatar

Hello Everyone,

I'm want to copy all the content from a Hive table and tranform it to a JSON file, but must recurrently in order to copy new content that the Hive table could have.

I managed to use the processor "SelectHiveQL" to extract the data. The problem is that I can't collect the data that was only created after the last collection of data. Everytime that I access the Hive is collecting all the data duplicating the information.


I also tried using the "QueryDatabaseTable" and "GenerateTableFetch" processors but could not get it to work. Does anyone have a hint how I can do this?

Thank you.

1 ACCEPTED SOLUTION

avatar
Master Guru

@Rui Pinheiro

The data duplication is because of SelectHiveQL processor won't store the state. So every time when you execute the hive query it will result same data and adding newly added records(if records got added to the table) every time that's the reason why you are getting duplicated data.

For this use case we need to store the last value some where in Hive/Hbase/HDFS/DistributeCache, then when you run SelectHiveQL statement you need to pull the state value and keep the state value as attribute, then use the value in your SelectHiveQL statement.

Example:-

let's take i'm having a table with following columns in it id(int),name(string),ts(timestamp) and i want to run SelectHiveQL process incrementally

My hive statement would be like below

select * from table-name where ts > '${start_value}' and ts < '${current_time}'

We need to prepare start_value and current_time attributes before selecting hive data.

Flow:-

1.GenerateFlowfile //add current_time attribute to the flowfile,Right Click on processor goto configure then click on + sign and add current_time value as ${now():format("yyyy-MM-dd HH:mm:ss"}
2.SelectHiveQL/FetchHbaserow/FetchHdfs/FetchDistributecache processors depending on where you have stored your state value
3.ExtractText/EvaluateJsonPath/UpdateAttribute processors //Extract the value got stored and keep as start_value attribute, in this step we are preparing start_value attribute and this start_value will be base value.
4.UpdateAttribute processor //to check the start_value attribute for the first run value will be empty assign some value to it(like 1900-01-01 00:00:00..etc).
4.selectHiveQL processor //select * from table-name where ts>'${start_value}' and ts <${current_time}, as we are having start_value and current_time attributes to the flowfile now we are running hive statements using those attribute values.
5.Fork the data set <br>    5.1. do your processing with the incremental hive dataset 
    5.2. store the current_time attribute value by using PutHiveStreaming/PutHbaseCell/PutHDFS/PutDistributeCa	cheMap Processors //once you store the current_time value then when your process starts again will pul	l this state value and this will be your start_value.

.

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

View solution in original post

6 REPLIES 6

avatar
Master Guru

@Rui Pinheiro

The data duplication is because of SelectHiveQL processor won't store the state. So every time when you execute the hive query it will result same data and adding newly added records(if records got added to the table) every time that's the reason why you are getting duplicated data.

For this use case we need to store the last value some where in Hive/Hbase/HDFS/DistributeCache, then when you run SelectHiveQL statement you need to pull the state value and keep the state value as attribute, then use the value in your SelectHiveQL statement.

Example:-

let's take i'm having a table with following columns in it id(int),name(string),ts(timestamp) and i want to run SelectHiveQL process incrementally

My hive statement would be like below

select * from table-name where ts > '${start_value}' and ts < '${current_time}'

We need to prepare start_value and current_time attributes before selecting hive data.

Flow:-

1.GenerateFlowfile //add current_time attribute to the flowfile,Right Click on processor goto configure then click on + sign and add current_time value as ${now():format("yyyy-MM-dd HH:mm:ss"}
2.SelectHiveQL/FetchHbaserow/FetchHdfs/FetchDistributecache processors depending on where you have stored your state value
3.ExtractText/EvaluateJsonPath/UpdateAttribute processors //Extract the value got stored and keep as start_value attribute, in this step we are preparing start_value attribute and this start_value will be base value.
4.UpdateAttribute processor //to check the start_value attribute for the first run value will be empty assign some value to it(like 1900-01-01 00:00:00..etc).
4.selectHiveQL processor //select * from table-name where ts>'${start_value}' and ts <${current_time}, as we are having start_value and current_time attributes to the flowfile now we are running hive statements using those attribute values.
5.Fork the data set <br>    5.1. do your processing with the incremental hive dataset 
    5.2. store the current_time attribute value by using PutHiveStreaming/PutHbaseCell/PutHDFS/PutDistributeCa	cheMap Processors //once you store the current_time value then when your process starts again will pul	l this state value and this will be your start_value.

.

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

avatar
Master Mentor

@Shu

Thank you for sharing so detailed and great answers as always. 😄

avatar
Master Guru
@Jay Kumar SenSharma

Thank you, I'm glad you enjoyed it. 🙂

I’d be more than happy to help..!!

avatar
Master Guru
@Rui Pinheiro

For this to be done in the Hive table must have a timestamp for the moment that was created every single row right?

Yes,if your hive table having a timestamp field then the process is pretty easy to get the last state and store the last state.

(or)

If your hive table won't have a timestamp field then we need to have some increase number column value(like id column value with always increases in number like row number) in the dataset.

After selectHiveQL processor fork the result set and keep one fork for processing and another fork to QueryRecord processor add max row query property in the QueryRecord processor as

max_row

select * from FLOWFILE where id=(select MAX(id) from FLOWFILE)

Now we are selecting max id value from flowfile content and extract the id value then store that value as your state.

QueryRecord processor references:-

https://community.hortonworks.com/articles/121794/running-sql-on-flowfiles-using-queryrecord-process...

In addition you can look into Wait and Notify processors to make sure once you store the state then only start processing the first forked dataset.

Wait and Notify processors references:-

http://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/

(or)

if you don't have any column value which gets increasing number then you need to look into Hive Pagination

i.e

SELECT *,ROW_NUMBER over (Order by id)  as rowid FROM mytable
where rowid > 0 and rowid <=20

then store the max rowid value in your state in this case i.e 20, for the next run we are going to have 20 as base value and add how many records you want to add to the base value, use update attribute processor plus function to add number for the upper limit.

3.ExtractText (Extract the value got stored and keep as start_value) the value that I'm going to copy to start_value come from the current time?

for the first run we can define start_Value i.e check if the start value is presented(then leave as is) or not(assign default value like 1900-01-01 00:00:00), now start_value is 1900-01-01 00:00:00 and current_time is 2018-04-06 12:00:00. Then we are storing our current_time(i.e 2018-04-06 12:00:00) into HDFS/Distributecache ..etc

For the next run we are pulling the stored state(i.e 2018-04-06 12:00:00) and assign the value as start_value.

avatar

@Shu thank you for this great explanation.
For this to be done in the Hive table must have a timestamp for the moment that was created every single row right?

other question is, the point 3.ExtractText (Extract the value got stored and keep as start_value) the value that I'm going to copy to start_value come from the current time?

avatar
Master Guru
@Rui Pinheiro

Please see my answer in comments..!!