- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 03-11-2017 07:20 AM - edited 08-17-2019 01:48 PM
In this article we will be creating a flow to read files from hdfs and insert the same into hive using the putHiveStreaming processor.
Before going to NiFi we need update some configurations in Hive.
To enable Hive streaming we need to update the following properties
- hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
- hive.compactor.initiator.on = true
- hive.compactor.worker.threads > 0
Coming to NiFi we will be making use of the following processors :
- 1.ListHdfs + FetchHdfs processor – While configuring the List and Fetch HDFS processors we need to make sure that both these processors run on the primary node only so that the flow files are not duplicated across nodes
- 2.Convert Json to Avro processor – PutHiveStreaming processor supports input in the Avro format only. So any Json input needs to be converted to avro format
- 3.PutHiveStreaming processor
Lets construct the Nifi flow as below : ListHDFS--> FetchHDFS--> ConvertJsonToAvro-->PutHiveStreaming
Configuring the PutHiveStreaming processor
Set the values for the above as follows
- The Hive meta store Uri --- Should be of the format thrift://<Hive Metastore host>:9083. Note that hive meta store host is not the same as the hive server host.
- Hive Configuration Resources – Paths to Hadoop and hive configuration files. We need to copy the Hadoop and hive configuration files i.e. Hadoop-site.xml, core-site.xml and hive-site.xml to all the NiFi hosts.
- Database Name – the database to which you want to connect
- Table name – Table name in which you
want to insert the data. Again note that the
- a.ORC is the only format supported currently. So your table must have "stored as orc"
- b.transactional = "true" should be set in the table create statement
- c.Bucketed but not sorted. So your table must have "clustered by (colName) into (n) buckets"
- Auto-create partitions – If set to true hive partitions will be auto created
- Kerberos Principal – The Kerberos principal name
- Kerberos keytab – the path to the Kerberos keytab
This completes the configuration part. Now we can start the processors to insert data into hive from hdfs.
Created on 05-22-2017 03:01 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Thanks for the nice article !
I would like to use the PutHiveStreaming processor but I always get the error below. My flow is pretty simple: I'm reading from an SQL database (SQL server) using the QueryDatabaseTable processor and writes the result to Hive using the PutHiveStreaming processor.
I'm using Nifi 1.2.0.
The error:
2017-05-22 13:46:38,518 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] failed to process due to java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed(); rolling back session: {} java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-05-22 13:46:38,519 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] failed to process session due to java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed(): {} java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] Processor Administratively Yielded for 1 sec due to processing failure 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] due to uncaught Exception: java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Any help would be much appreciated
Created on 05-23-2017 03:22 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hey
Can you please tell me which version of NiFi are you using? Also you made the required changes to hive to enable streaming support?
Created on 05-23-2017 05:55 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hi,
the problem was indeed with Hive's streaming config . Thanks a lot ! Now I'm able to push data to Hive.
Created on 06-26-2017 09:00 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hi,
I'm using kerberos for mapreduce, what principal is needed ?
Created on 10-23-2017 04:35 PM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
@mkalyanpur how would this be different for a Kerberized HDP environment; I'm having so much trouble connecting to Kerberized HDP 2.5 and 2.6, from NiFi 1.2.0; both PutHiveStreaming and PutHiveQL are not working. For PutHiveQL here is the detail on the error I get - https://community.hortonworks.com/questions/142110/nifi-processor-puthiveql-cannot-connect-to-kerber...
For PutHiveStreaming, I get the error below:
2017-10-23 11:32:23,841 INFO [put-hive-streaming-0] hive.metastore Trying to connect to metastore with URI thrift://server.domain.com:9083 2017-10-23 11:32:23,856 INFO [put-hive-streaming-0] hive.metastore Connected to metastore. 2017-10-23 11:32:23,885 INFO [Timer-Driven Process Thread-7] hive.metastore Trying to connect to metastore with URI thrift://server.domain.com:9083 2017-10-23 11:32:23,895 INFO [Timer-Driven Process Thread-7] hive.metastore Connected to metastore. 2017-10-23 11:32:24,730 WARN [put-hive-streaming-0] o.a.h.h.m.RetryingMetaStoreClient MetaStoreClient lost connection. Attempting to reconnect. org.apache.thrift.TApplicationException: Internal error processing open_txns at org.apache.thrift.TApplicationException.read(TApplicationException.java:111) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:3834) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:3821) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.openTxns(HiveMetaStoreClient.java:1841) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152) at com.sun.proxy.$Proxy231.openTxns(Unknown Source) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl$1.run(HiveEndPoint.java:525) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.openTxnImpl(HiveEndPoint.java:522) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:504) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:461) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatchImpl(HiveEndPoint.java:345) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.access$500(HiveEndPoint.java:243) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl$2.run(HiveEndPoint.java:332) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl$2.run(HiveEndPoint.java:329) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatch(HiveEndPoint.java:328) at org.apache.nifi.util.hive.HiveWriter.lambda$nextTxnBatch$2(HiveWriter.java:259) at org.apache.nifi.util.hive.HiveWriter.lambda$null$3(HiveWriter.java:368) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.nifi.util.hive.HiveWriter.lambda$callWithTimeout$4(HiveWriter.java:368) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) <br>
The strange thing is, using the same core-site, hdfs-site, hive-site config files and the same principal and keytab, NiFi can connect to HDFS and HBase without any issues, it's only Hive connection that errors; even using a sample java program to connect to Hive using Kerberos principal and keytab works fine.
Thanks for your time.
Created on 05-27-2019 07:27 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hey @Jean Rossier, I am stuck on the same issue. Can you please share what you changed in the config to get it up and running? Thanks.
Created on 05-27-2019 07:28 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Hey @Raj B, I am stuck on the same error. Did you find a solution to this? Kindly share any insights.
Created on 05-27-2019 09:48 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
@Sumit Das in my case, the problem was that Hive was not properly configured to support streaming. Basically transactions must be enabled but some others properties must be set as well. More info here: https://community.hortonworks.com/articles/49949/test-7.html
The table must also respect some conditions (stored as ORC, transactional, bucketed).