Member since
06-09-2017
9
Posts
7
Kudos Received
0
Solutions
01-07-2019
01:14 PM
@Ajay Tanpure thank you for the comment. When I look at that official guide: Official Guide :: Structured Streaming :: Deployment I see their spark-sql-kafka package version number is different from yours, that is the key here. ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 ... So we have to find the correct version for your situation.
... View more
01-02-2019
03:21 PM
Excellent work, thank you!
... View more
06-13-2018
08:29 PM
9 Kudos
Okay, so in preparation for the DataWorks Summit :: San Jose I was going over the Spark 2 cluster we give our students, you know - testing the important labs, etc. and lo and behold I found a problem:
Testing my old code for Spark 2 Structured Streaming with Apache Kafka was suddenly broken with an odd Exception "MicroBatch Execution: Query [id = blah blah blah ... java.lang.AbstractMethodError." But I have no abstract methods in my code, and this has always worked before ... so what the heck?: 18/06/1219:14:59 ERROR MicroBatchExecution: Query [id = bf5215b6-6dec-4ed3-be11-991da4025bc8, runId = 78680de5-be6f-4b60-a635-1cc96003b054] terminated with error
java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.initializeLogIfNecessary(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.log(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.logInfo(KafkaSourceProvider.scala:40)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:269)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
atorg.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "stream execution thread for [id = bf5215b6-6dec-4ed3-be11-991da4025bc8, runId = 78680de5-be6f-4b60-a635-1cc96003b054]" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.initializeLogIfNecessary(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.log(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.logInfo(KafkaSourceProvider.scala:40)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:269)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
atorg.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
2. A quick Google search determined I am not alone. My fist clue was StackOverflow, where someone had the same situation as me, suddenly in Spark 2.3 there is a Kafka integration problem, where before there was no problem. Uggh!: StackOverflow here... 2.a. It was true, when I checked my HDP version, suddenly I was running Spark 2.3 whereas when my code worked before it was Spark 2.2. In fact, there is a clue right there in the opening of the spark-shell, it tells you Spark 2.3 is using Scala version 2.11.8. That is well and good, but how to fix my problem?: Spark version 2.3.0.2.6.5.0-292
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
3. After researching some more, and speaking to my HWX buddy Timothy Spann, he pointed me at an Apache NiFi article which helped me more: NiFi article here... 4. But even after reading that I couldn't fix it. So, then I was directed by Tim (again) to the Spark 2.3 Structured Streaming and Kafka integration docs here: Apache Spark Streaming 2.3 and Kafka Integration doc here... 5. That was what I needed. At the very bottom of that doc it gave me what I needed to fix the code. When we start the spark-shell, we rely upon the dependencies given. What I used to use was this (note my versions): spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.10:2.1.0 --master local[2]
<=BUT THIS FAILS NOW
6. You need to fix that to read this, and then you are golden, note the main difference is the versioning, you need this version "0-10_2.11:2.3.0" to make it work, whew! That should be read as Spark version 2.3.0 is built with the Scala 2.11 JARs (which will be running on Java 8 or 1.8.0 to be precise - but 1.8 is called 8, man-o-man): spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master local[2]
<=THIS WORKS NOW!
7. The whole thing all together: export SPARK_MAJOR_VERSION=2
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master local[2]
SPARK_MAJOR_VERSION is set to 2, using Spark2
Ivy Default Cache set to: /home/zeppelin/.ivy2/cache
The jars for the packages stored in: /home/zeppelin/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.5.0-292/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 in central
found org.apache.kafka#kafka-clients;0.10.0.1 in central
found net.jpountz.lz4#lz4;1.3.0 in central
found org.xerial.snappy#snappy-java;1.1.2.6 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.3.0/spark-sql-kafka-0-10_2.11-2.3.0.jar ...
[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0!spark-sql-kafka-0-10_2.11.jar (104ms)
:: resolution report :: resolve 2862ms :: artifacts dl 112ms
:: modules inuse:
net.jpountz.lz4#lz4;1.3.0 from central in [default]
org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default |6| 1 |1| 0 || 6 |1|
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
1 artifacts copied, 5 already retrieved (396kB/24ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/12 20:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
18/06/12 20:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Spark context Web UI available at http://ip-172-30-8-172.us-west-2.compute.internal:4042
Spark context available as 'sc' (master = local[2], app id = local-1528833864176).
Spark session available as 'spark'.
Welcome to
SPARK version 2.3.0.2.6.5.0-292
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.ProcessingTime
scala> val kafkaVoterDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers","//ip-172-30-8-172.us-west-2.compute.internal:6667").option("subscribe", "voters").option("startingOffsets","earliest").load()
kafkaVoterDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
scala> val rawVoterQuery = kafkaVoterDF.writeStream.trigger(ProcessingTime("10 seconds")).outputMode("append").format("console").start()
warning: there was one deprecation warning; re-run with -deprecation for details
rawVoterQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@20276412
-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------------------+------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 67 65 6E 6...|voters| 0|0|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|1|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|2|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|3|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|4|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|5|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|6|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|7|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|8|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|9|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|10|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|11|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|12|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|13|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|14|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|15|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|16|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|17|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|18|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|19|2018-06-12 19:13:...|0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 20 rows
8. So what is the take away from all this? Well, for one there are headaches here. The hardest thing about this has to do with, "What version of Scala was used when building the Spark release you are using?" So we have a tiny table that may help. The long and short of it is that the packages statement I had didn't have the right matching Scala version for my Spark version. If you are anything like me, that can be a headache:
Spark Version Scala Version (don't worry about minor versions, just major) Java Version 1.6.1 2.10 1.7 2.1.0 2.11 (v 2.10 is deprecated, may be removed in Spark 2.3.0) 1.7 2.2 to 2.3.0 2.11 1.8 And this, in case that is not enough to figure out. If they only supported one Scala version per major release of Spark there would be a different problem of excluding people and forcing everyone to upgrade in lockstep. But if they support too many versions there is further headaches, so they tend to support two versions of Scala for each Spark release. And then if you are not matching that in your work, boom Exception is thrown. Ugggh!: Spark Version Scala Version Java Version 2.1 2.10 1.7 2.2 2.10 1.8
For details of this table see: Spark Scala version stuff in Spark 2.2 docs
For more information:
Laurent Weichberger, Big Data Bear, Hortonworks:
lweichberger@hortonworks.com
Come see me at the San Jose 2018 DataWorks Summit, Spark2 training...
... View more
Labels:
06-12-2017
01:33 PM
This has been solved previously: 1. HIVE_AUX_JARS_PATH in the hive-env.sh or set on your shell (not hive cli) will override the one set by "hive.aux.jars.path" in hive-site.xml. However calling "hive -hiveconf hive.aux.jars.path=..." will override HIVE_AUX_JARS_PATH setting in hive-env.sh. Using "hive --auxpath ..." will append file to any settings you may have in HIVE_AUX_JARS_PATH.2. Since we ship the Apache Hive binaries so hive.reloadable.aux.jars.path should be supported on any hive shipped since HDP 2.2.0.3. "hdfs:///" is supported on add jar, doubt if hive.aux.jars.path supports it.4. Wildcards are not supported in both hive.aux.jars.path and add jar. See previous post by Deepesh (looks like a HWX worker) at https://community.hortonworks.com/questions/2390/methods-to-add-jars-to-hive.html
... View more
06-10-2017
11:39 PM
Awesome! Love this work, great job Jobin!
... View more
06-09-2017
10:57 PM
1 Kudo
I wrote about this in my Spark Structured Streaming blog here: https://www.linkedin.com/pulse/spark-21-structured-streaming-databricks-laurent-weichberger See this sample: val query = inactive.writeStream
.format("parquet")
.option("path", "/com/infotrellis/spark")
.option("checkpointLocation", "/com/infotrellis/check")
.start()
query.awaitTermination()
... View more