Created on 06-13-2018 08:29 PM
Okay, so in preparation for the DataWorks Summit :: San Jose I was going over the Spark 2 cluster we give our students, you know - testing the important labs, etc. and lo and behold I found a problem:
18/06/1219:14:59 ERROR MicroBatchExecution: Query [id = bf5215b6-6dec-4ed3-be11-991da4025bc8, runId = 78680de5-be6f-4b60-a635-1cc96003b054] terminated with error java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99) at org.apache.spark.sql.kafka010.KafkaSourceProvider.initializeLogIfNecessary(KafkaSourceProvider.scala:40) at org.apache.spark.internal.Logging$class.log(Logging.scala:46) at org.apache.spark.sql.kafka010.KafkaSourceProvider.log(KafkaSourceProvider.scala:40) at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.sql.kafka010.KafkaSourceProvider.logInfo(KafkaSourceProvider.scala:40) at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:269) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) atorg.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Exception in thread "stream execution thread for [id = bf5215b6-6dec-4ed3-be11-991da4025bc8, runId = 78680de5-be6f-4b60-a635-1cc96003b054]" java.lang.AbstractMethodError at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99) at org.apache.spark.sql.kafka010.KafkaSourceProvider.initializeLogIfNecessary(KafkaSourceProvider.scala:40) at org.apache.spark.internal.Logging$class.log(Logging.scala:46) at org.apache.spark.sql.kafka010.KafkaSourceProvider.log(KafkaSourceProvider.scala:40) at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.sql.kafka010.KafkaSourceProvider.logInfo(KafkaSourceProvider.scala:40) at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:269) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77) at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194) at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) atorg.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
2. A quick Google search determined I am not alone. My fist clue was StackOverflow, where someone had the same situation as me, suddenly in Spark 2.3 there is a Kafka integration problem, where before there was no problem. Uggh!: StackOverflow here...
2.a. It was true, when I checked my HDP version, suddenly I was running Spark 2.3 whereas when my code worked before it was Spark 2.2.
In fact, there is a clue right there in the opening of the spark-shell, it tells you Spark 2.3 is using Scala version 2.11.8. That is well and good, but how to fix my problem?:
Spark version 2.3.0.2.6.5.0-292 Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
3. After researching some more, and speaking to my HWX buddy Timothy Spann, he pointed me at an Apache NiFi article which helped me more: NiFi article here...
4. But even after reading that I couldn't fix it. So, then I was directed by Tim (again) to the Spark 2.3 Structured Streaming and Kafka integration docs here: Apache Spark Streaming 2.3 and Kafka Integration doc here...
5. That was what I needed. At the very bottom of that doc it gave me what I needed to fix the code. When we start the spark-shell, we rely upon the dependencies given. What I used to use was this (note my versions):
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.10:2.1.0 --master local[2] <=BUT THIS FAILS NOW
6. You need to fix that to read this, and then you are golden, note the main difference is the versioning, you need this version "0-10_2.11:2.3.0" to make it work, whew! That should be read as Spark version 2.3.0 is built with the Scala 2.11 JARs (which will be running on Java 8 or 1.8.0 to be precise - but 1.8 is called 8, man-o-man):
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master local[2] <=THIS WORKS NOW!
7. The whole thing all together:
export SPARK_MAJOR_VERSION=2 spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master local[2] SPARK_MAJOR_VERSION is set to 2, using Spark2 Ivy Default Cache set to: /home/zeppelin/.ivy2/cache The jars for the packages stored in: /home/zeppelin/.ivy2/jars :: loading settings :: url = jar:file:/usr/hdp/2.6.5.0-292/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 in central found org.apache.kafka#kafka-clients;0.10.0.1 in central found net.jpountz.lz4#lz4;1.3.0 in central found org.xerial.snappy#snappy-java;1.1.2.6 in central found org.slf4j#slf4j-api;1.7.16 in central found org.spark-project.spark#unused;1.0.0 in central downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.3.0/spark-sql-kafka-0-10... ... [SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0!spark-sql-kafka-0-10_2.11.jar (104ms) :: resolution report :: resolve 2862ms :: artifacts dl 112ms :: modules inuse: net.jpountz.lz4#lz4;1.3.0 from central in [default] org.apache.kafka#kafka-clients;0.10.0.1 from central in [default] org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 from central in [default] org.slf4j#slf4j-api;1.7.16 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.2.6 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default |6| 1 |1| 0 || 6 |1| --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 1 artifacts copied, 5 already retrieved (396kB/24ms) Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/06/12 20:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 18/06/12 20:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. Spark context Web UI available at http://ip-172-30-8-172.us-west-2.compute.internal:4042 Spark context available as 'sc' (master = local[2], app id = local-1528833864176). Spark session available as 'spark'. Welcome to SPARK version 2.3.0.2.6.5.0-292 Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.streaming.ProcessingTime scala> val kafkaVoterDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers","//ip-172-30-8-172.us-west-2.compute.internal:6667").option("subscribe", "voters").option("startingOffsets","earliest").load() kafkaVoterDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] scala> val rawVoterQuery = kafkaVoterDF.writeStream.trigger(ProcessingTime("10 seconds")).outputMode("append").format("console").start() warning: there was one deprecation warning; re-run with -deprecation for details rawVoterQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@20276412 ------------------------------------------- Batch: 0 ------------------------------------------- +----+--------------------+------+---------+------+--------------------+-------------+ | key| value| topic|partition|offset| timestamp|timestampType| +----+--------------------+------+---------+------+--------------------+-------------+ |null|[7B 22 67 65 6E 6...|voters| 0|0|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|1|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|2|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|3|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|4|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|5|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|6|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|7|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|8|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|9|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|10|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|11|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|12|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|13|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|14|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|15|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|16|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|17|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|18|2018-06-12 19:13:...|0| |null|[7B 22 67 65 6E 6...|voters| 0|19|2018-06-12 19:13:...|0| +----+--------------------+------+---------+------+--------------------+-------------+ only showing top 20 rows
Spark Version | Scala Version (don't worry about minor versions, just major) | Java Version |
1.6.1 | 2.10 | 1.7 |
2.1.0 | 2.11 (v 2.10 is deprecated, may be removed in Spark 2.3.0) | 1.7 |
2.2 to 2.3.0 | 2.11 | 1.8 |
And this, in case that is not enough to figure out. If they only supported one Scala version per major release of Spark there would be a different problem of excluding people and forcing everyone to upgrade in lockstep. But if they support too many versions there is further headaches, so they tend to support two versions of Scala for each Spark release. And then if you are not matching that in your work, boom Exception is thrown. Ugggh!:
Spark Version | Scala Version | Java Version |
2.1 | 2.10 | 1.7 |
2.2 | 2.10 | 1.8 |
For details of this table see:
Spark Scala version stuff in Spark 2.2 docsLaurent Weichberger, Big Data Bear, Hortonworks:
lweichberger@hortonworks.com
Come see me at the San Jose 2018 DataWorks Summit, Spark2 training...
Created on 01-04-2019 09:01 PM
@lweichberger Thanks for your article. It really helps a lot.
I am facing similar kind of issue with pyspark.
I am using the spark version 2.3.0.2.6.5-292.I want to read the kakfa stream. My code was working fine in spark version 2.2.0 with this package p.p1 org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
Support for spark-streaming-kafka-0-8.2.11 is deprecated in spark 2.3.0 per documentation.
I am able to import the StreamingContext from pyspark but I am not able to create the StreamingContext. I am getting the same error of Java AbstractMethod. It seems something has changed in the underneath.
I am following the official guide from here: https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
# pyspark --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.0.2.6.5.0-292 /_/ Using Python version 2.7.5 (default, Sep 15 2016 22:37:39) SparkSession available as 'spark'. >>> from pyspark.streaming import StreamingContext >>> ssc = StreamingContext(sc, 5) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/hdp/current/spark2-client/python/pyspark/streaming/context.py", line 61, in __init__ self._jssc = jssc or self._initialize_context(self._sc, batchDuration) File "/usr/hdp/current/spark2-client/python/pyspark/streaming/context.py", line 65, in _initialize_context return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1428, in __call__ File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext. : java.lang.AbstractMethodError at org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:35) at org.apache.spark.streaming.scheduler.StreamingListenerBus.<init>(StreamingListenerBus.scala:30) at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:57) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:184) at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:76) at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) >>>
Created on 01-07-2019 01:14 PM
@Ajay Tanpure thank you for the comment. When I look at that official guide:
Official Guide :: Structured Streaming :: Deployment
I see their spark-sql-kafka package version number is different from yours, that is the key here.
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 ...
So we have to find the correct version for your situation.
Created on 01-07-2019 07:18 PM
@lweichberger, I suspect the same about the version mismatch.
https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html
As per the example given in above link, when I create the streaming context
ssc = StreamingContext(sc, 5)
I get the JavaAbstractMethodError from py4j. Using spark-sql-kafka, I can get the structured stream with new API. But I like to fix the existing version issue.