Created 04-28-2017 12:31 PM
Context: Flume takes files from local dir and writes them to transactional hive table.
Problem: Sometime flume agent fails with exception. For example:
pr 2017 15:48:02,126 ERROR [hive-f_sink-call-runner-0] (org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.markDead:755) - Fatal error on TxnIds=[841192...841291] on endPoint = {metaStoreUri='thrift://node02.hdp:9083', database='default', table='f', partitionVals=[] }; cause Unable to abort invalid transaction id : 841253: No such transaction txnid:841253 org.apache.hive.hcatalog.streaming.TransactionError: Unable to abort invalid transaction id : 841253: No such transaction txnid:841253 at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.abortImpl(HiveEndPoint.java:934) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.abort(HiveEndPoint.java:893) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.markDead(HiveEndPoint.java:752) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.commit(HiveEndPoint.java:852) at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:345) at org.apache.flume.sink.hive.HiveWriter$6.call(HiveWriter.java:342) at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:429) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: NoSuchTxnException(message:No such transaction txnid:841253) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$abort_txn_result$abort_txn_resultStandardScheme.read(ThriftHiveMetastore.java) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$abort_txn_result$abort_txn_resultStandardScheme.read(ThriftHiveMetastore.java) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$abort_txn_result.read(ThriftHiveMetastore.java) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_abort_txn(ThriftHiveMetastore.java:3898) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.abort_txn(ThriftHiveMetastore.java:3885) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.rollbackTxn(HiveMetaStoreClient.java:1885) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:155) at com.sun.proxy.$Proxy8.rollbackTxn(Unknown Source) at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.abortImpl(HiveEndPoint.java:922) ... 10 more
Can anyone explain why HCatalog trying to abort transaction? Setup: HDP-2.5
Flume conf:
f.sources = f_dir f.channels = f_channel f.sinks = f_sink f.sources.f_dir.type = spooldir f.sources.f_dir.spoolDir = /var/data/f_data/History f.sources.f_dir.deletePolicy = immediate f.sources.f_dir.channels = f_channel f.sources.f_dir.deserializer.maxLineLength = 150000 f.channels.f_channel.type = file f.channels.f_channel.capacity = 20000 f.channels.f_channel.transactionCapacity = 15000 f.sinks.f_sink.type = hive f.sinks.f_sink.hive.metastore = thrift://node02.hdp:9083 f.sinks.f_sink.hive.database = default f.sinks.f_sink.hive.table = f f.sinks.f_sink.channel = f_channel f.sinks.f_sink.serializer = JSON f.sinks.f_sink.HDFSEventSink.batchSize = 2000 f.sinks.f_sink.callTimeout = 80000 f.sinks.f_sink.heartBeatInterval = 10000
Table - it's just some json fields, clustered by monotonically increasing timestamp into 30 buckets. Timestamp definitely not repeating.
Hive table props:
inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:30, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[firsttimestamp], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false partitionKeys:[], parameters:{totalSize=5498078799, last_modified_time=1490710346, numRows=1027277, rawDataSize=28307100383, compactorthreshold.hive.compactor.delta.num.threshold=1, numFiles=270, transient_lastDdlTime=1490710346, last_modified_by=root, transactional=true}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE
Sorry for my english 😃
Created 05-15-2017 07:19 AM
The problem was in Hive transaction timeouts.
In Flume agent conf:
hive_sink.heartBeatInterval = 10000
In Hive conf:
hive.txn.timeout = 300
Hive close transactions before any heartbeat was received.
Created 05-15-2017 07:19 AM
The problem was in Hive transaction timeouts.
In Flume agent conf:
hive_sink.heartBeatInterval = 10000
In Hive conf:
hive.txn.timeout = 300
Hive close transactions before any heartbeat was received.