Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Querying ACID Tables from Hive LLAP/Spark

Querying ACID Tables from Hive LLAP/Spark

Explorer

I have a transactional (ACID) table in Hive and can query it fine using the hive shell. However, I want to query it using LLAP/SPark Shell which I am unable to do. When I set all the parameters required for querying ACID tables (transaction manager, compactor threads etc) in the beeline shell I open to connect to Hive Interactive Server, it seems to work but when I run the query, everything just halts and nothing works. After a very long time I get the following error message:

ERROR : Status: Failed
ERROR : Dag received [DAG_TERMINATE, SERVICE_PLUGIN_ERROR] in RUNNING state.
ERROR : Error reported by TaskScheduler [[2:LLAP]][SERVICE_UNAVAILABLE] No LLAP Daemons are running
ERROR : Vertex killed, vertexName=Reducer 2, vertexId=vertex_1506697113479_0016_1_03, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1009, Vertex vertex_1506697113479_0016_1_03 [Reducer 2] killed/failed due to:DAG_TERMINATED]
ERROR : Vertex killed, vertexName=Map 3, vertexId=vertex_1506697113479_0016_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1506697113479_0016_1_00 [Map 3] killed/failed due to:DAG_TERMINATED]
ERROR : Vertex killed, vertexName=Reducer 4, vertexId=vertex_1506697113479_0016_1_01, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1506697113479_0016_1_01 [Reducer 4] killed/failed due to:DAG_TERMINATED]
ERROR : Vertex killed, vertexName=Map 1, vertexId=vertex_1506697113479_0016_1_02, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:354, Vertex vertex_1506697113479_0016_1_02 [Map 1] killed/failed due to:DAG_TERMINATED]
ERROR : DAG did not succeed due to SERVICE_PLUGIN_ERROR. failedVertices:0 killedVertices:4
ERROR : FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Dag received [DAG_TERMINATE, SERVICE_PLUGIN_ERROR] in RUNNING state.Error reported by TaskScheduler [[2:LLAP]][SERVICE_UNAVAILABLE] No LLAP Daemons are runningVertex killed, vertexName=Reducer 2, vertexId=vertex_1506697113479_0016_1_03, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1009, Vertex vertex_1506697113479_0016_1_03 [Reducer 2] killed/failed due to:DAG_TERMINATED]Vertex killed, vertexName=Map 3, vertexId=vertex_1506697113479_0016_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1506697113479_0016_1_00 [Map 3] killed/failed due to:DAG_TERMINATED]Vertex killed, vertexName=Reducer 4, vertexId=vertex_1506697113479_0016_1_01, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1506697113479_0016_1_01 [Reducer 4] killed/failed due to:DAG_TERMINATED]Vertex killed, vertexName=Map 1, vertexId=vertex_1506697113479_0016_1_02, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:354, Vertex vertex_1506697113479_0016_1_02 [Map 1] killed/failed due to:DAG_TERMINATED]DAG did not succeed due to SERVICE_PLUGIN_ERROR. failedVertices:0 killedVertices:4
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Dag received [DAG_TERMINATE, SERVICE_PLUGIN_ERROR] in RUNNING state.Error reported by TaskScheduler [[2:LLAP]][SERVICE_UNAVAILABLE] No LLAP Daemons are runningVertex killed, vertexName=Reducer 2, vertexId=vertex_1506697113479_0016_1_03, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1009, Vertex vertex_1506697113479_0016_1_03 [Reducer 2] killed/failed due to:DAG_TERMINATED]Vertex killed, vertexName=Map 3, vertexId=vertex_1506697113479_0016_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1506697113479_0016_1_00 [Map 3] killed/failed due to:DAG_TERMINATED]Vertex killed, vertexName=Reducer 4, vertexId=vertex_1506697113479_0016_1_01, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:1, Vertex vertex_1506697113479_0016_1_01 [Reducer 4] killed/failed due to:DAG_TERMINATED]Vertex killed, vertexName=Map 1, vertexId=vertex_1506697113479_0016_1_02, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to DAG_TERMINATED, failedTasks:0 killedTasks:354, Vertex vertex_1506697113479_0016_1_02 [Map 1] killed/failed due to:DAG_TERMINATED]DAG did not succeed due to SERVICE_PLUGIN_ERROR. failedVertices:0 killedVertices:4 (state=08S01,code=2)

Similarly in Spark, I open the spark shell, make the following imports:

import org.apache.spark.sql.SparkSession
import spark.implicits._
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.sql.parquet.compression.codec", "lzo").getOrCreate()
spark.sqlContext.setConf("spark.sql.parquet.compression.codec", "lzo")
spark.sqlContext.setConf("spark.sql.crossJoin.enabled", "true")

and run the query as:

val q1 = spark.sql("select * fron <hive_transaction_table")
q1.show()

It gives me an error:

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#159L])
   +- HiveTableScan MetastoreRelation avlino_bm, subscriber_dim_test_parquet
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:112)
  at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
  at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235)
  at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:141)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368)
  at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:114)
  at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset$anonfun$org$apache$spark$sql$Dataset$execute$1$1.apply(Dataset.scala:2378)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2780)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$execute$1(Dataset.scala:2377)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$collect(Dataset.scala:2384)
  at org.apache.spark.sql.Dataset$anonfun$head$1.apply(Dataset.scala:2120)
  at org.apache.spark.sql.Dataset$anonfun$head$1.apply(Dataset.scala:2119)
  at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2810)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2119)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2334)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:638)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:597)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:606)
  ... 52 elided
Caused by: java.lang.RuntimeException: serious problem
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:202)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:261)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:84)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 85 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NumberFormatException: For input string: "0000035_0000"
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
  ... 122 more
Caused by: java.lang.NumberFormatException: For input string: "0000035_0000"
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.lang.Long.parseLong(Long.java:589)
  at java.lang.Long.parseLong(Long.java:631)
  at org.apache.hadoop.hive.ql.io.AcidUtils.parseDelta(AcidUtils.java:310)
  at org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState(AcidUtils.java:379)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:634)
  at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$FileGenerator.call(OrcInputFormat.java:620)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Is anyone else facing the same issue? HOw can I access transactional tables from Hive LLAP or Spark?

3 REPLIES 3
Highlighted

Re: Querying ACID Tables from Hive LLAP/Spark

Master Collaborator

In case of access through Hive Interactive Server, the Hive LLAP daemons are not running, see the following error snippet in your posted log:

ERROR : Error reported by TaskScheduler [[2:LLAP]][SERVICE_UNAVAILABLE] No LLAP Daemons are running

I would suggest you to restart your HiveServer Interactive and then try.

For Spark access there have been some issues with reading the data in transaction tables, can you do a MAJOR compaction from Hive cli (make sure Hive Metastore has ACID enabled) before querying through Spark shell.

Highlighted

Re: Querying ACID Tables from Hive LLAP/Spark

Explorer

Thanks, restarting Hive Inseractive Server did help.
Regarding Spark, I think it has a problem with the naming convention of the delta file. When I run the same query in Spark after performing a "major compaction", it was able to run the query. But how realistic is preforming major compaction on the table for every update in a production cluster? Is there a work around?

Highlighted

Re: Querying ACID Tables from Hive LLAP/Spark

New Contributor

Spark does not support Hive transactional tables very well.

There is an open ticket regarding these issues in the Spark project's JIRA: https://issues.apache.org/jira/browse/SPARK-15348

,

Hive transactional tables are not well supported. There is an open ticket regarding this: https://issues.apache.org/jira/browse/SPARK-15348

"Spark does not support any feature of hive's transactional tables,you cannot use spark to delete/update a table and it also has problems reading the aggregated data when no compaction was done."

Don't have an account?
Coming from Hortonworks? Activate your account here