Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

From CSV to Hive via NiFi

avatar
Contributor

I want to use NiFi to ingest CSV file (getFile) to Hive (PutHiveQL).

Since HiveQL need the SQL statement, how can I generate it ?

A solution would be getFile -> InferAvroSchema -> ConvertCSVToAvro -> ConvertAvroToJSon -> ConvertJSONtoSQL -> PutHiveQL.

This looks so complex and resource consuming. Any suggestions ?

1 ACCEPTED SOLUTION

avatar
Master Guru

If the CSV doesn't need any work done to it and you just want to put a Hive table over the CSV(s), you can use the following flow:

GetFile -> PutHDFS -> ReplaceText -> PutHiveQL

GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.

PutHDFS: Puts the CSV file(s) onto the HDFS file system

ReplaceText: Replace the content of the flow file with a HiveQL DDL statement, such as "LOAD DATA INFILE ..." or "CREATE TABLE IF NOT EXISTS ..."

PutHiveQL: Execute the DDL command.

Alternatively, if you want to insert each row individually (note: this is slower), and you know the schema of the CSV file, you could use the following flow:

GetFile -> SplitText -> ExtractText -> ReplaceText -> PutHiveQL

GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.

SplitText: Split the CSV file into one line/row per flow file

ExtractText: Extract each column value into an attribute. There is an example of this in the Working_With_CSV template.

ReplaceText: Replace the content of the flow file with a HiveQL statement, using NiFi Expression Language to insert the column values, such as a Replacement Value of "INSERT INTO myTable VALUES ('${col1}', '${col2}', ${col3} )". Note the use of quotes to surround columns whose values are string literals. You could also use JDBC parameters and flow file attributes, see the PutHiveQL documentation for more details (i.e. your Replacement Value would be INSERT INTO myTable VALUES (?,?,?) and you'd need attributes for the JDBC types and values for your columns).

PutHiveQL: Execute the INSERT command(s).

If instead you need the data in a different format (Avro, JSON, ORC, etc.), then your flow will be more complex (as your example is above). NiFi is highly modular, so although a flow to do something "simple" like get CSV into Hive, there are actually a number of smaller operations to be performed (conversions, input/output, etc.), and thus there may be several processors in your flow. Your example illustrates this modularity in terms of what format(s) the processors are expecting, so if you want to auto-generate the SQL (versus hand-generating it with ReplaceText), then ConvertJsonToSQL is your option, but that requires JSON, and there's no ConvertCSVtoJSON processor at present, so you need the additional conversion processors. There is a Jira case to add the ability to do arbitrary format/type conversions, to avoid the need for multiple conversion processors in a chain (as you have above).

View solution in original post

11 REPLIES 11

avatar
Super Guru

@Joe Harvy

Nifi is best used for ingesting live streaming data with 1000's of records per second. For your use case, why not simply import the file in Hadoop in a staging area, create temp table, and then do insert select using Hive. While inserting, simply change the format to ORC.

avatar
Contributor

Hi @mqureshi

I don't agree with your statement. If it's the case, why there's processors like getFile, getHDFS, QueyTable etc ?

avatar
Super Guru

@Joe Harvy There are many use cases where you will need to get file, change format, extract/drop records, filter json and so on. Your use case does not seem to be one. But fair enough, if you don't agree and would still like to go the path you want. I am sure somebody would you a better answer that can validate your approach.

avatar

Do you need to convert to JSON? From Avro you can use the Kite DataSet processor and store in Hive as Parquet:

https://community.hortonworks.com/articles/70257/hdfnifi-to-convert-row-formatted-text-files-to-col....

avatar
Contributor

@Binu Mathew

Thanks for your reply. No I don't need data in JSON, I just need to ingest directly into Hive. How do you ingest data in Hive in your suggestion ? PutHiveQL waits for an SQL statement

avatar

Configure the 'StoreInKiteDataset' processor to set the URI for your Hive table. Your Avro formatted data will be converted to a Parquet formatted Hive table. In the processor, set the property 'Target dataset URI' to your Hive table. For example, I'm writing to a Hive table named weblogs_parquet - dataset:hive://ip-172-31-2-102.us-west-2.compute.internal:9083/weblogs_parquet

Learn more about Kite Datasets at http://kitesdk.org/docs/current/

avatar
Master Guru

If the CSV doesn't need any work done to it and you just want to put a Hive table over the CSV(s), you can use the following flow:

GetFile -> PutHDFS -> ReplaceText -> PutHiveQL

GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.

PutHDFS: Puts the CSV file(s) onto the HDFS file system

ReplaceText: Replace the content of the flow file with a HiveQL DDL statement, such as "LOAD DATA INFILE ..." or "CREATE TABLE IF NOT EXISTS ..."

PutHiveQL: Execute the DDL command.

Alternatively, if you want to insert each row individually (note: this is slower), and you know the schema of the CSV file, you could use the following flow:

GetFile -> SplitText -> ExtractText -> ReplaceText -> PutHiveQL

GetFile: Retrieves the CSV file(s). You could also use ListFile -> FetchFile for this.

SplitText: Split the CSV file into one line/row per flow file

ExtractText: Extract each column value into an attribute. There is an example of this in the Working_With_CSV template.

ReplaceText: Replace the content of the flow file with a HiveQL statement, using NiFi Expression Language to insert the column values, such as a Replacement Value of "INSERT INTO myTable VALUES ('${col1}', '${col2}', ${col3} )". Note the use of quotes to surround columns whose values are string literals. You could also use JDBC parameters and flow file attributes, see the PutHiveQL documentation for more details (i.e. your Replacement Value would be INSERT INTO myTable VALUES (?,?,?) and you'd need attributes for the JDBC types and values for your columns).

PutHiveQL: Execute the INSERT command(s).

If instead you need the data in a different format (Avro, JSON, ORC, etc.), then your flow will be more complex (as your example is above). NiFi is highly modular, so although a flow to do something "simple" like get CSV into Hive, there are actually a number of smaller operations to be performed (conversions, input/output, etc.), and thus there may be several processors in your flow. Your example illustrates this modularity in terms of what format(s) the processors are expecting, so if you want to auto-generate the SQL (versus hand-generating it with ReplaceText), then ConvertJsonToSQL is your option, but that requires JSON, and there's no ConvertCSVtoJSON processor at present, so you need the additional conversion processors. There is a Jira case to add the ability to do arbitrary format/type conversions, to avoid the need for multiple conversion processors in a chain (as you have above).

avatar
Contributor

Hi @Matt Burgess

Thanks for your detailed answer. Your first suggestion looks interesting. I'll give it a try.

I still have a question on ConvertJsonToSQL if you can help

https://community.hortonworks.com/questions/80362/jdbc-connection-pool-for-convertjsontosql.html

avatar
Master Guru

I added an answer to that question, but it is likely unsatisfying as it is an open issue. The Hive driver used in the Hive processors is based on Apache Hive 1.2.x which does not support a handful of JDBC API methods used by those processors.