Created on 03-11-2017 07:20 AM - edited 08-17-2019 01:48 PM
In this article we will be creating a flow to read files from hdfs and insert the same into hive using the putHiveStreaming processor.
Before going to NiFi we need update some configurations in Hive.
To enable Hive streaming we need to update the following properties
Coming to NiFi we will be making use of the following processors :
Lets construct the Nifi flow as below : ListHDFS--> FetchHDFS--> ConvertJsonToAvro-->PutHiveStreaming
Configuring the PutHiveStreaming processor
Set the values for the above as follows
This completes the configuration part. Now we can start the processors to insert data into hive from hdfs.
Created on 05-22-2017 03:01 PM
Thanks for the nice article !
I would like to use the PutHiveStreaming processor but I always get the error below. My flow is pretty simple: I'm reading from an SQL database (SQL server) using the QueryDatabaseTable processor and writes the result to Hive using the PutHiveStreaming processor.
I'm using Nifi 1.2.0.
The error:
2017-05-22 13:46:38,518 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] failed to process due to java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed(); rolling back session: {} java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-05-22 13:46:38,519 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] failed to process session due to java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed(): {} java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] Processor Administratively Yielded for 1 sec due to processing failure 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] due to uncaught Exception: java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Any help would be much appreciated
Created on 05-23-2017 03:22 AM
Hey
Can you please tell me which version of NiFi are you using? Also you made the required changes to hive to enable streaming support?
Created on 05-23-2017 05:55 AM
Hi,
the problem was indeed with Hive's streaming config . Thanks a lot ! Now I'm able to push data to Hive.
Created on 06-26-2017 09:00 AM
Hi,
I'm using kerberos for mapreduce, what principal is needed ?
Created on 10-23-2017 04:35 PM
@mkalyanpur how would this be different for a Kerberized HDP environment; I'm having so much trouble connecting to Kerberized HDP 2.5 and 2.6, from NiFi 1.2.0; both PutHiveStreaming and PutHiveQL are not working. For PutHiveQL here is the detail on the error I get - https://community.hortonworks.com/questions/142110/nifi-processor-puthiveql-cannot-connect-to-kerber...
For PutHiveStreaming, I get the error below:
2017-10-23 11:32:23,841 INFO [put-hive-streaming-0] hive.metastore Trying to connect to metastore with URI thrift://server.domain.com:9083 2017-10-23 11:32:23,856 INFO [put-hive-streaming-0] hive.metastore Connected to metastore. 2017-10-23 11:32:23,885 INFO [Timer-Driven Process Thread-7] hive.metastore Trying to connect to metastore with URI thrift://server.domain.com:9083 2017-10-23 11:32:23,895 INFO [Timer-Driven Process Thread-7] hive.metastore Connected to metastore. 2017-10-23 11:32:24,730 WARN [put-hive-streaming-0] o.a.h.h.m.RetryingMetaStoreClient MetaStoreClient lost connection. Attempting to reconnect. org.apache.thrift.TApplicationException: Internal error processing open_txns at org.apache.thrift.TApplicationException.read(TApplicationException.java:111) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:3834) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:3821) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.openTxns(HiveMetaStoreClient.java:1841) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152) at com.sun.proxy.$Proxy231.openTxns(Unknown Source) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl$1.run(HiveEndPoint.java:525) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.openTxnImpl(HiveEndPoint.java:522) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:504) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:461) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatchImpl(HiveEndPoint.java:345) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.access$500(HiveEndPoint.java:243) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl$2.run(HiveEndPoint.java:332) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl$2.run(HiveEndPoint.java:329) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatch(HiveEndPoint.java:328) at org.apache.nifi.util.hive.HiveWriter.lambda$nextTxnBatch$2(HiveWriter.java:259) at org.apache.nifi.util.hive.HiveWriter.lambda$null$3(HiveWriter.java:368) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.nifi.util.hive.HiveWriter.lambda$callWithTimeout$4(HiveWriter.java:368) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) <br>
The strange thing is, using the same core-site, hdfs-site, hive-site config files and the same principal and keytab, NiFi can connect to HDFS and HBase without any issues, it's only Hive connection that errors; even using a sample java program to connect to Hive using Kerberos principal and keytab works fine.
Thanks for your time.
Created on 05-27-2019 07:27 AM
Hey @Jean Rossier, I am stuck on the same issue. Can you please share what you changed in the config to get it up and running? Thanks.
Created on 05-27-2019 07:28 AM
Hey @Raj B, I am stuck on the same error. Did you find a solution to this? Kindly share any insights.
Created on 05-27-2019 09:48 AM
@Sumit Das in my case, the problem was that Hive was not properly configured to support streaming. Basically transactions must be enabled but some others properties must be set as well. More info here: https://community.hortonworks.com/articles/49949/test-7.html
The table must also respect some conditions (stored as ORC, transactional, bucketed).