Created on 08-18-2016 09:45 AM - edited 08-19-2019 04:30 AM
Hello NiFi 0.70 I have 2 questions regarding Hive processors in NiFi.
1. I created a basic flow and using "SelectHiveQL" processor i'm able to extract data from hive. However, i have no idea where the output resides. When i choose csv as output format the execution fails. When i choose Avro it succeeds but i don't know what Avro is and where to find it on my NiFi machine. So where exactly can i find the output of my query ?
2. After executing the "SelectHiveQL" processor i wanted to insert the data to a different table in Hive. I tried using "PutHiveQL" processor but it doesn't have any field for writing my insert command. So i used the "SelectHiveQL" processor for inserting the data by using "insert select" statement (see attached screenshot)
This works - but it seems kind of a workaround. There must be a proper way to insert data into hive. Any ideas ? Adi
Created 08-18-2016 04:04 PM
That output format refers to the data format in intermediary stage (FlowFile) and not an output to a file put somewhere in the OS. If you want to output to a file then you need to continue your flow with a different processor, e.g. PutFile. You should use ReplaceText to build a HiveQL statement (INSERT, e.g.) either with parameters (if you want prepared statements) or hard-coded values. PutHiveQL is the processor to use and the SQL needs to be prepared and passed to this processor from another processor.
References:
Created on 08-21-2016 03:50 PM - edited 08-19-2019 04:30 AM
@Constantin Stanca Hi and thank you for replying. Just to be clear - i have two tables: staging and permanent. I want to select from the staging table and insert the results to the permanent table. So i created a process group with SelectHiveQL processor that queries a specific column (event_time)
In that process group i created a sub process group for the insert process.
I started the process group with an Input port and connected it to a ReplaceText processor that has the following syntax for replacement value: INSERT INTO nifitest (event_time) VALUES ('${event_time}') and of course connected it to a "PutHiveQL" processor.
The flow ends successfully - but the column is blank. Not NULL. But blank. No values what so ever. I tried all kinds of syntax and even using "UpdateAttribute" processor - but nothing seems to work. I am able to insert hard coded values into a the column using:
INSERT INTO nifitest (event_time) VALUES ('2222222')
It seems as if the select output isn't transferred to the sub-process group or that my syntax is incorrect.
Could you advise ?
Thanks in advance! Adi
Created 08-23-2016 07:00 PM
@Adi Jabkowsky You are trying to convert Avro data directly to text. You need to first convert Avro to a text format and then extract text value before using ReplacementText processor.
You can use processors pipeline as shown.
1. Convert Avro output to JSON
2. Split JSON to handle multiple lines from output.
3. Use EvaluateJSON processor to get individual column from output and set it to attributes in flowfile.
4. Use ReplacementText processor to generate an insert statement and then use putHiveQL processor.
Other option is to generate output in CSV format and then use regular expression to read column values.
Created 08-24-2016 05:50 AM
@Shishir Saxena Thank you so much for the answer and examples - i will try it.
Created 10-02-2016 03:30 AM
how to handle chinese via this method?
Created 12-02-2016 06:57 PM
What error(s) are you seeing? If it mentions Avro, then if your column names are in Chinese, it's likely that Avro does not accept them. This may be alleviated in NiFi 1.1.0 with NIFI-2262, but it would just replace non-Avro-compatible characters with underscores, so you may face a "duplicate field" exception. In that case you would need column aliases in your SELECT statement to use Avro-compatible names for the columns.
Created 12-02-2016 05:35 AM
SelectHiveQL->ReplaceText ->PutHiveQL
SelectHiveQL => HiveQL Select Query -> select * from tablename limit 1
ReplaceText = > Search Value -> (.*)
Replacement Value -> insert into newtable select * from oldtable