Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Contributor

In this article we will be creating a flow to read files from hdfs and insert the same into hive using the putHiveStreaming processor.

Before going to NiFi we need update some configurations in Hive.

To enable Hive streaming we need to update the following properties

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  • hive.compactor.initiator.on = true
  • hive.compactor.worker.threads > 0

Coming to NiFi we will be making use of the following processors :

  • 1.ListHdfs + FetchHdfs processor – While configuring the List and Fetch HDFS processors we need to make sure that both these processors run on the primary node only so that the flow files are not duplicated across nodes
  • 2.Convert Json to Avro processor – PutHiveStreaming processor supports input in the Avro format only. So any Json input needs to be converted to avro format
  • 3.PutHiveStreaming processor

Lets construct the Nifi flow as below : ListHDFS--> FetchHDFS--> ConvertJsonToAvro-->PutHiveStreaming

13520-flow.png

Configuring the PutHiveStreaming processor

13531-screen-shot-2017-03-11-at-110018-am.png

Set the values for the above as follows

  1. The Hive meta store Uri --- Should be of the format thrift://<Hive Metastore host>:9083. Note that hive meta store host is not the same as the hive server host.
  2. Hive Configuration Resources – Paths to Hadoop and hive configuration files. We need to copy the Hadoop and hive configuration files i.e. Hadoop-site.xml, core-site.xml and hive-site.xml to all the NiFi hosts.
  3. Database Name – the database to which you want to connect
  4. Table name – Table name in which you want to insert the data. Again note that the
    1. a.ORC is the only format supported currently. So your table must have "stored as orc"
    2. b.transactional = "true" should be set in the table create statement
    3. c.Bucketed but not sorted. So your table must have "clustered by (colName) into (n) buckets"
  5. Auto-create partitions – If set to true hive partitions will be auto created
  6. Kerberos Principal – The Kerberos principal name
  7. Kerberos keytab – the path to the Kerberos keytab

This completes the configuration part. Now we can start the processors to insert data into hive from hdfs.

17,330 Views
Comments
avatar
Explorer

Thanks for the nice article !

I would like to use the PutHiveStreaming processor but I always get the error below. My flow is pretty simple: I'm reading from an SQL database (SQL server) using the QueryDatabaseTable processor and writes the result to Hive using the PutHiveStreaming processor.

I'm using Nifi 1.2.0.

The error:

2017-05-22 13:46:38,518 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] failed to process due to java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed(); rolling back session: {} java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-05-22 13:46:38,519 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] failed to process session due to java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed(): {} java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.processors.hive.PutHiveStreaming PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] Processor Administratively Yielded for 1 sec due to processing failure 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding PutHiveStreaming[id=3056469a-015c-1000-a2bd-db25847b37ca] due to uncaught Exception: java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() 2017-05-22 13:46:38,519 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.IllegalStateException: TransactionBatch TxnIds=[101...200] on endPoint = {metaStoreUri='thrift://hive-metastore:9083', database='resultdb', table='sqldump', partitionVals=[] } has been closed() at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.checkIsClosed(HiveEndPoint.java:738) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:777) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:734) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:113) at org.apache.nifi.util.hive.HiveWriter$1.call(HiveWriter.java:110) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Any help would be much appreciated

avatar
Contributor

Hey

Can you please tell me which version of NiFi are you using? Also you made the required changes to hive to enable streaming support?

avatar
Explorer

Hi,

the problem was indeed with Hive's streaming config . Thanks a lot ! Now I'm able to push data to Hive.

avatar
Rising Star

Hi,

I'm using kerberos for mapreduce, what principal is needed ?

avatar
Expert Contributor

@mkalyanpur how would this be different for a Kerberized HDP environment; I'm having so much trouble connecting to Kerberized HDP 2.5 and 2.6, from NiFi 1.2.0; both PutHiveStreaming and PutHiveQL are not working. For PutHiveQL here is the detail on the error I get - https://community.hortonworks.com/questions/142110/nifi-processor-puthiveql-cannot-connect-to-kerber...

For PutHiveStreaming, I get the error below:

2017-10-23 11:32:23,841 INFO [put-hive-streaming-0] hive.metastore Trying to connect to metastore with URI thrift://server.domain.com:9083
2017-10-23 11:32:23,856 INFO [put-hive-streaming-0] hive.metastore Connected to metastore.
2017-10-23 11:32:23,885 INFO [Timer-Driven Process Thread-7] hive.metastore Trying to connect to metastore with URI thrift://server.domain.com:9083
2017-10-23 11:32:23,895 INFO [Timer-Driven Process Thread-7] hive.metastore Connected to metastore.
2017-10-23 11:32:24,730 WARN [put-hive-streaming-0] o.a.h.h.m.RetryingMetaStoreClient MetaStoreClient lost connection. Attempting to reconnect.
org.apache.thrift.TApplicationException: Internal error processing open_txns
	at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:3834)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:3821)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.openTxns(HiveMetaStoreClient.java:1841)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
	at com.sun.proxy.$Proxy231.openTxns(Unknown Source)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl$1.run(HiveEndPoint.java:525)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.openTxnImpl(HiveEndPoint.java:522)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:504)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:461)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatchImpl(HiveEndPoint.java:345)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.access$500(HiveEndPoint.java:243)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl$2.run(HiveEndPoint.java:332)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl$2.run(HiveEndPoint.java:329)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatch(HiveEndPoint.java:328)
	at org.apache.nifi.util.hive.HiveWriter.lambda$nextTxnBatch$2(HiveWriter.java:259)
	at org.apache.nifi.util.hive.HiveWriter.lambda$null$3(HiveWriter.java:368)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
	at org.apache.nifi.util.hive.HiveWriter.lambda$callWithTimeout$4(HiveWriter.java:368)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
<br>

The strange thing is, using the same core-site, hdfs-site, hive-site config files and the same principal and keytab, NiFi can connect to HDFS and HBase without any issues, it's only Hive connection that errors; even using a sample java program to connect to Hive using Kerberos principal and keytab works fine.

Thanks for your time.

avatar
Contributor

Hey @Jean Rossier, I am stuck on the same issue. Can you please share what you changed in the config to get it up and running? Thanks.

avatar
Contributor

Hey @Raj B, I am stuck on the same error. Did you find a solution to this? Kindly share any insights.

avatar
Explorer

@Sumit Das in my case, the problem was that Hive was not properly configured to support streaming. Basically transactions must be enabled but some others properties must be set as well. More info here: https://community.hortonworks.com/articles/49949/test-7.html

The table must also respect some conditions (stored as ORC, transactional, bucketed).