Support Questions

Find answers, ask questions, and share your expertise

NiFi: Pull data from Hive External table and load into Hive Managed table?

avatar
Rising Star

Hi,

How to load Hive managed table from Hive external table using NiFi?

We have used NiFi --> Processor --> SelectHiveQL to pull data from Hive External table. One of its property i.e. Output Format have only 2 options Avro & CSV, we selected Avro. If we use CSV, it ask all the ingredients to generate a CSV, which we don't want to, as we need to load into Hive, so we selected Avro.

Secondly, we are used NiFi --> Processor --> PutHiveQL to load into Hive, by giving all the required details, but it give error: Can't recognise Avro format.

Looking forward.

Cheers.

8 REPLIES 8

avatar
Master Guru

@Mustafa Ali Qizilbash

You don't have to use SelectHiveQL processor.

Flow:

1. GenerateFlowfile

configure the processor with your insert statement

insert into <db.name>.<internal.tab.name> select * from <db.name>.<external.tab.name>

77491-gnf.png

2.PutHiveql

Configure/enable hive controller service and this processor expects flowfile content to be the HiveQL command to execute.

77490-flow.png

avatar
Rising Star

Thanks for quick response but as per this processors details, its not recommended for production, plus it will not help to do CDC (Change Data Capture).

- GenerateFlowFile 1.2.0.3.0.2.0-76 org.apache.nifi - nifi-standard-nar
This processor creates FlowFiles with random data or custom content. GenerateFlowFile is useful for load testing, configuration, and simulation.

There is an other processor: GenerateTableFetch, it will do the same, asks for database name and tables names via Controller Services, but without any query and will also help in incremental data pull, which is great to have for CDC.

Next and last is processor: PutHiveQL, which does not ask fpr target database or table name. So how to push data into target hive tables using source processor as GenerateTableFetch.

Please advice.

avatar
Rising Star
Bit more info;
  • Source table, is simple Hive External table
  • Target table, is Hive Partitioned+Clustered with Bucketing and ORC format.

avatar
Master Guru
@Mustafa Ali Qizilbash

In the above flow GenerateFlowfile processor will be act as trigger to the flow and runs based on the scheduling strategy and generates your user defined custom text as the content of flowfile and in our case that is valid hql statement which is going to be executed by puthiveql processor and you can use this processor in production env also.

PutHiveQL processor is designed to run hive.ddl/dml statements so the processor wont offer any target database (or) table name and expects the incoming flowfile content would be valid hiveql statement then processor executes that statement.

If you want to store the data into final table incrementally then look into below methods

Method1:

You can use GenerateTableFetch(GTF) processor to fetch only the newly added rows from the table based on the Maximum value column property,but here is the issue sql statements that are generated by GTF processor some thing like below

SELECT * FROM test_table WHERE ts > '2018-06-02 14:24:17.242' AND ts <= '2018-06-03 14:24:17.242' ORDER BY ts LIMIT 10000

As my max value column is ts and the sql statement generated by GTF processor having order by clause if your table is big then preparing the query and executing the query to pull incremental rows will be running long.

Once you got the sql statements generated then use SelectHiveQL processor to execute them and the result set should be in Avro format.

Use PutHiveStreaming processor and configure the processor with your TargetDB, Table Name ,Partition column info.

Flow:

77492-flow.png

By using this method you are going to incrementally pull the data from external table then store into Final managed tables.

(or)

Method2:

Use hiveQL language queries i.e

insert into managed table select * from external table where ts >'${last_state_value}' and ts <'${new_state_value}' 

then store new state value in HDFS/Hive/Hbase/DistributedCache then in the next run fetch the stored state value keep that as last state value and define your new state value run the insert statement to do dynamic partition into Managed table.

in this step we are not going to do order by clause and if the ts column is partitioned then you are going to skip the table scan also.

Refer to this link regarding storing the state, fetching the state again and storing data to hive manged table.

avatar
Master Guru

@Mustafa Ali Qizilbash

I think in NiFi 1.2 the pagination is based on limit and offset but hive won't work with limit and offset.

sample hql statements would be some thing like below

SELECT * FROM test_table WHERE ts > '2018-06-02 14:24:17.242' AND ts <= '2018-06-03 14:24:17.242' ORDER BY ts LIMIT 10000

i don't have nifi 1.2 setup to recreate your scenario but the statements generated by GenerateTableFetch processor are having max value column last state and new state in them like above statement then use Replace Text processor to replace the sql statements and make them to work with hive syntax.

if the statements wont have max value column statements in them like

select * from test_table order by ts limit 10000 offset 20000 

then,

I have used NiFi 1.5 to generate table statements and works fine with hive either you have to update the NiFi version (or) implement the method 2 to run incrementally (or) get the last state and new state from the processor using Rest-API then use ReplaceText processor to prepare your statements to run.

Second issue is showing your data doesn't exist partition column make sure your feeding data set to puthivestreaming having the partition column in it.

avatar
Rising Star

Followed the same but facing issues.

GetFetchTable_Propoerties

77509-getfetchtable-propoerties.jpeg

SelectHiveQL_Properties

77510-selecthiveql-properties.jpeg

PutHiveStreaming_Properties

77512-puthivestreaming-properties.jpeg

Errors are SelectHiveQL and PutHiveStreaming

77513-selecthiveql.jpeg

77514-puthivestreaming.jpeg

avatar
Rising Star

Yes, the issue was resolved, as we were missing to mention, select query in SelectHiveQL.

avatar
Rising Star

Issue Resolved for me.

In HDP 3.0, please use PutHive3Streaming, PutHive3QL and SelectHiveQL.

Cheers.