Created 06-01-2018 06:49 PM
Hi,
How to load Hive managed table from Hive external table using NiFi?
We have used NiFi --> Processor --> SelectHiveQL to pull data from Hive External table. One of its property i.e. Output Format have only 2 options Avro & CSV, we selected Avro. If we use CSV, it ask all the ingredients to generate a CSV, which we don't want to, as we need to load into Hive, so we selected Avro.
Secondly, we are used NiFi --> Processor --> PutHiveQL to load into Hive, by giving all the required details, but it give error: Can't recognise Avro format.
Looking forward.
Cheers.
Created on 06-01-2018 07:10 PM - edited 08-17-2019 08:34 PM
You don't have to use SelectHiveQL processor.
Flow:
1. GenerateFlowfile
configure the processor with your insert statement
insert into <db.name>.<internal.tab.name> select * from <db.name>.<external.tab.name>
2.PutHiveql
Configure/enable hive controller service and this processor expects flowfile content to be the HiveQL command to execute.
Created 06-01-2018 07:47 PM
Thanks for quick response but as per this processors details, its not recommended for production, plus it will not help to do CDC (Change Data Capture).
- GenerateFlowFile 1.2.0.3.0.2.0-76 org.apache.nifi - nifi-standard-nar
This
processor creates FlowFiles with random data or custom content.
GenerateFlowFile is useful for load testing, configuration, and
simulation.
There is an other processor: GenerateTableFetch, it will do the same, asks for database name and tables names via Controller Services, but without any query and will also help in incremental data pull, which is great to have for CDC.
Next and last is processor: PutHiveQL, which does not ask fpr target database or table name. So how to push data into target hive tables using source processor as GenerateTableFetch.
Please advice.
Created 06-01-2018 08:06 PM
Created on 06-01-2018 10:44 PM - edited 08-17-2019 08:34 PM
In the above flow GenerateFlowfile processor will be act as trigger to the flow and runs based on the scheduling strategy and generates your user defined custom text as the content of flowfile and in our case that is valid hql statement which is going to be executed by puthiveql processor and you can use this processor in production env also.
PutHiveQL processor is designed to run hive.ddl/dml statements so the processor wont offer any target database (or) table name and expects the incoming flowfile content would be valid hiveql statement then processor executes that statement.
If you want to store the data into final table incrementally then look into below methods
Method1:
You can use GenerateTableFetch(GTF) processor to fetch only the newly added rows from the table based on the Maximum value column property,but here is the issue sql statements that are generated by GTF processor some thing like below
SELECT * FROM test_table WHERE ts > '2018-06-02 14:24:17.242' AND ts <= '2018-06-03 14:24:17.242' ORDER BY ts LIMIT 10000
As my max value column is ts and the sql statement generated by GTF processor having order by clause if your table is big then preparing the query and executing the query to pull incremental rows will be running long.
Once you got the sql statements generated then use SelectHiveQL processor to execute them and the result set should be in Avro format.
Use PutHiveStreaming processor and configure the processor with your TargetDB, Table Name ,Partition column info.
Flow:
By using this method you are going to incrementally pull the data from external table then store into Final managed tables.
(or)
Method2:
Use hiveQL language queries i.e
insert into managed table select * from external table where ts >'${last_state_value}' and ts <'${new_state_value}'
then store new state value in HDFS/Hive/Hbase/DistributedCache then in the next run fetch the stored state value keep that as last state value and define your new state value run the insert statement to do dynamic partition into Managed table.
in this step we are not going to do order by clause and if the ts column is partitioned then you are going to skip the table scan also.
Refer to this link regarding storing the state, fetching the state again and storing data to hive manged table.
Created 06-02-2018 12:09 PM
I think in NiFi 1.2 the pagination is based on limit and offset but hive won't work with limit and offset.
sample hql statements would be some thing like below
SELECT * FROM test_table WHERE ts > '2018-06-02 14:24:17.242' AND ts <= '2018-06-03 14:24:17.242' ORDER BY ts LIMIT 10000
i don't have nifi 1.2 setup to recreate your scenario but the statements generated by GenerateTableFetch processor are having max value column last state and new state in them like above statement then use Replace Text processor to replace the sql statements and make them to work with hive syntax.
if the statements wont have max value column statements in them like
select * from test_table order by ts limit 10000 offset 20000
then,
I have used NiFi 1.5 to generate table statements and works fine with hive either you have to update the NiFi version (or) implement the method 2 to run incrementally (or) get the last state and new state from the processor using Rest-API then use ReplaceText processor to prepare your statements to run.
Second issue is showing your data doesn't exist partition column make sure your feeding data set to puthivestreaming having the partition column in it.
Created on 06-02-2018 09:25 AM - edited 08-17-2019 08:34 PM
Followed the same but facing issues.
GetFetchTable_Propoerties
SelectHiveQL_Properties
PutHiveStreaming_Properties
Errors are SelectHiveQL and PutHiveStreaming
Created 06-12-2018 11:36 AM
Yes, the issue was resolved, as we were missing to mention, select query in SelectHiveQL.
Created 08-21-2018 04:35 PM
Issue Resolved for me.
In HDP 3.0, please use PutHive3Streaming, PutHive3QL and SelectHiveQL.
Cheers.