Created 01-27-2017 06:23 PM
I want to use NiFi to ingest CSV file (getFile) to Hive (PutHiveQL).
Since HiveQL need the SQL statement, how can I generate it ?
A solution would be getFile -> InferAvroSchema -> ConvertCSVToAvro -> ConvertAvroToJSon -> ConvertJSONtoSQL -> PutHiveQL.
This looks so complex and resource consuming. Any suggestions ?
Created 01-27-2017 10:11 PM
If the CSV doesn't need any work done to it and you just want to put a Hive table over the CSV(s), you can use the following flow:
GetFile -> PutHDFS -> ReplaceText -> PutHiveQL
GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.
PutHDFS: Puts the CSV file(s) onto the HDFS file system
ReplaceText: Replace the content of the flow file with a HiveQL DDL statement, such as "LOAD DATA INFILE ..." or "CREATE TABLE IF NOT EXISTS ..."
PutHiveQL: Execute the DDL command.
Alternatively, if you want to insert each row individually (note: this is slower), and you know the schema of the CSV file, you could use the following flow:
GetFile -> SplitText -> ExtractText -> ReplaceText -> PutHiveQL
GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.
SplitText: Split the CSV file into one line/row per flow file
ExtractText: Extract each column value into an attribute. There is an example of this in the Working_With_CSV template.
ReplaceText: Replace the content of the flow file with a HiveQL statement, using NiFi Expression Language to insert the column values, such as a Replacement Value of "INSERT INTO myTable VALUES ('${col1}', '${col2}', ${col3} )". Note the use of quotes to surround columns whose values are string literals. You could also use JDBC parameters and flow file attributes, see the PutHiveQL documentation for more details (i.e. your Replacement Value would be INSERT INTO myTable VALUES (?,?,?) and you'd need attributes for the JDBC types and values for your columns).
PutHiveQL: Execute the INSERT command(s).
If instead you need the data in a different format (Avro, JSON, ORC, etc.), then your flow will be more complex (as your example is above). NiFi is highly modular, so although a flow to do something "simple" like get CSV into Hive, there are actually a number of smaller operations to be performed (conversions, input/output, etc.), and thus there may be several processors in your flow. Your example illustrates this modularity in terms of what format(s) the processors are expecting, so if you want to auto-generate the SQL (versus hand-generating it with ReplaceText), then ConvertJsonToSQL is your option, but that requires JSON, and there's no ConvertCSVtoJSON processor at present, so you need the additional conversion processors. There is a Jira case to add the ability to do arbitrary format/type conversions, to avoid the need for multiple conversion processors in a chain (as you have above).
Created 01-27-2017 06:49 PM
Nifi is best used for ingesting live streaming data with 1000's of records per second. For your use case, why not simply import the file in Hadoop in a staging area, create temp table, and then do insert select using Hive. While inserting, simply change the format to ORC.
Created 01-27-2017 06:52 PM
Hi @mqureshi
I don't agree with your statement. If it's the case, why there's processors like getFile, getHDFS, QueyTable etc ?
Created 01-27-2017 07:11 PM
@Joe Harvy There are many use cases where you will need to get file, change format, extract/drop records, filter json and so on. Your use case does not seem to be one. But fair enough, if you don't agree and would still like to go the path you want. I am sure somebody would you a better answer that can validate your approach.
Created 01-27-2017 07:18 PM
Do you need to convert to JSON? From Avro you can use the Kite DataSet processor and store in Hive as Parquet:
Created 01-27-2017 07:24 PM
Created 01-27-2017 07:33 PM
Configure the 'StoreInKiteDataset' processor to set the URI for your Hive table. Your Avro formatted data will be converted to a Parquet formatted Hive table. In the processor, set the property 'Target dataset URI' to your Hive table. For example, I'm writing to a Hive table named weblogs_parquet - dataset:hive://ip-172-31-2-102.us-west-2.compute.internal:9083/weblogs_parquet
Learn more about Kite Datasets at http://kitesdk.org/docs/current/
Created 01-27-2017 10:11 PM
If the CSV doesn't need any work done to it and you just want to put a Hive table over the CSV(s), you can use the following flow:
GetFile -> PutHDFS -> ReplaceText -> PutHiveQL
GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.
PutHDFS: Puts the CSV file(s) onto the HDFS file system
ReplaceText: Replace the content of the flow file with a HiveQL DDL statement, such as "LOAD DATA INFILE ..." or "CREATE TABLE IF NOT EXISTS ..."
PutHiveQL: Execute the DDL command.
Alternatively, if you want to insert each row individually (note: this is slower), and you know the schema of the CSV file, you could use the following flow:
GetFile -> SplitText -> ExtractText -> ReplaceText -> PutHiveQL
GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.
SplitText: Split the CSV file into one line/row per flow file
ExtractText: Extract each column value into an attribute. There is an example of this in the Working_With_CSV template.
ReplaceText: Replace the content of the flow file with a HiveQL statement, using NiFi Expression Language to insert the column values, such as a Replacement Value of "INSERT INTO myTable VALUES ('${col1}', '${col2}', ${col3} )". Note the use of quotes to surround columns whose values are string literals. You could also use JDBC parameters and flow file attributes, see the PutHiveQL documentation for more details (i.e. your Replacement Value would be INSERT INTO myTable VALUES (?,?,?) and you'd need attributes for the JDBC types and values for your columns).
PutHiveQL: Execute the INSERT command(s).
If instead you need the data in a different format (Avro, JSON, ORC, etc.), then your flow will be more complex (as your example is above). NiFi is highly modular, so although a flow to do something "simple" like get CSV into Hive, there are actually a number of smaller operations to be performed (conversions, input/output, etc.), and thus there may be several processors in your flow. Your example illustrates this modularity in terms of what format(s) the processors are expecting, so if you want to auto-generate the SQL (versus hand-generating it with ReplaceText), then ConvertJsonToSQL is your option, but that requires JSON, and there's no ConvertCSVtoJSON processor at present, so you need the additional conversion processors. There is a Jira case to add the ability to do arbitrary format/type conversions, to avoid the need for multiple conversion processors in a chain (as you have above).
Created 01-29-2017 11:03 AM
Thanks for your detailed answer. Your first suggestion looks interesting. I'll give it a try.
I still have a question on ConvertJsonToSQL if you can help
https://community.hortonworks.com/questions/80362/jdbc-connection-pool-for-convertjsontosql.html
Created 05-22-2018 12:05 PM
I added an answer to that question, but it is likely unsatisfying as it is an open issue. The Hive driver used in the Hive processors is based on Apache Hive 1.2.x which does not support a handful of JDBC API methods used by those processors.