Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (2)
avatar

Okay, so in preparation for the DataWorks Summit :: San Jose I was going over the Spark 2 cluster we give our students, you know - testing the important labs, etc. and lo and behold I found a problem:

  1. Testing my old code for Spark 2 Structured Streaming with Apache Kafka was suddenly broken with an odd Exception "MicroBatch Execution: Query [id = blah blah blah ... java.lang.AbstractMethodError." But I have no abstract methods in my code, and this has always worked before ... so what the heck?:
18/06/1219:14:59 ERROR MicroBatchExecution: Query [id = bf5215b6-6dec-4ed3-be11-991da4025bc8, runId = 78680de5-be6f-4b60-a635-1cc96003b054] terminated with error
java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.initializeLogIfNecessary(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.log(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.logInfo(KafkaSourceProvider.scala:40)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:269)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
atorg.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "stream execution thread for [id = bf5215b6-6dec-4ed3-be11-991da4025bc8, runId = 78680de5-be6f-4b60-a635-1cc96003b054]" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.initializeLogIfNecessary(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.log(KafkaSourceProvider.scala:40)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.logInfo(KafkaSourceProvider.scala:40)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$ConfigUpdater.set(KafkaSourceProvider.scala:269)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:277)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1$$anonfun$applyOrElse$1.apply(MicroBatchExecution.scala:77)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:75)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:75)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
atorg.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:265)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)


2. A quick Google search determined I am not alone. My fist clue was StackOverflow, where someone had the same situation as me, suddenly in Spark 2.3 there is a Kafka integration problem, where before there was no problem. Uggh!: StackOverflow here...

2.a. It was true, when I checked my HDP version, suddenly I was running Spark 2.3 whereas when my code worked before it was Spark 2.2.

In fact, there is a clue right there in the opening of the spark-shell, it tells you Spark 2.3 is using Scala version 2.11.8. That is well and good, but how to fix my problem?:

Spark version 2.3.0.2.6.5.0-292
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)

3. After researching some more, and speaking to my HWX buddy Timothy Spann, he pointed me at an Apache NiFi article which helped me more: NiFi article here...

4. But even after reading that I couldn't fix it. So, then I was directed by Tim (again) to the Spark 2.3 Structured Streaming and Kafka integration docs here: Apache Spark Streaming 2.3 and Kafka Integration doc here...

5. That was what I needed. At the very bottom of that doc it gave me what I needed to fix the code. When we start the spark-shell, we rely upon the dependencies given. What I used to use was this (note my versions):

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.10:2.1.0 --master local[2]
<=BUT THIS FAILS NOW

6. You need to fix that to read this, and then you are golden, note the main difference is the versioning, you need this version "0-10_2.11:2.3.0" to make it work, whew! That should be read as Spark version 2.3.0 is built with the Scala 2.11 JARs (which will be running on Java 8 or 1.8.0 to be precise - but 1.8 is called 8, man-o-man):

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master local[2]
<=THIS WORKS NOW!

7. The whole thing all together:

export SPARK_MAJOR_VERSION=2
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --master local[2]
SPARK_MAJOR_VERSION is set to 2, using Spark2
Ivy Default Cache set to: /home/zeppelin/.ivy2/cache
The jars for the packages stored in: /home/zeppelin/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.5.0-292/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 in central
found org.apache.kafka#kafka-clients;0.10.0.1 in central
found net.jpountz.lz4#lz4;1.3.0 in central
found org.xerial.snappy#snappy-java;1.1.2.6 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.3.0/spark-sql-kafka-0-10... ...
[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0!spark-sql-kafka-0-10_2.11.jar (104ms)
:: resolution report :: resolve 2862ms :: artifacts dl 112ms
:: modules inuse:
net.jpountz.lz4#lz4;1.3.0 from central in [default]
org.apache.kafka#kafka-clients;0.10.0.1 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.0 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default |6| 1 |1| 0 || 6 |1|
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
1 artifacts copied, 5 already retrieved (396kB/24ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/12 20:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
18/06/12 20:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Spark context Web UI available at http://ip-172-30-8-172.us-west-2.compute.internal:4042
Spark context available as 'sc' (master = local[2], app id = local-1528833864176).
Spark session available as 'spark'.
Welcome to
SPARK version 2.3.0.2.6.5.0-292
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.ProcessingTime
scala> val kafkaVoterDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers","//ip-172-30-8-172.us-west-2.compute.internal:6667").option("subscribe", "voters").option("startingOffsets","earliest").load()
kafkaVoterDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
scala> val rawVoterQuery = kafkaVoterDF.writeStream.trigger(ProcessingTime("10 seconds")).outputMode("append").format("console").start()
warning: there was one deprecation warning; re-run with -deprecation for details
rawVoterQuery: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@20276412
-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------------------+------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[7B 22 67 65 6E 6...|voters| 0|0|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|1|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|2|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|3|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|4|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|5|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|6|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|7|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|8|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|9|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|10|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|11|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|12|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|13|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|14|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|15|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|16|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|17|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|18|2018-06-12 19:13:...|0|
|null|[7B 22 67 65 6E 6...|voters| 0|19|2018-06-12 19:13:...|0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 20 rows

8. So what is the take away from all this? Well, for one there are headaches here. The hardest thing about this has to do with, "What version of Scala was used when building the Spark release you are using?" So we have a tiny table that may help. The long and short of it is that the packages statement I had didn't have the right matching Scala version for my Spark version. If you are anything like me, that can be a headache:
Spark VersionScala Version (don't worry about minor versions, just major)Java Version
1.6.12.101.7
2.1.02.11 (v 2.10 is deprecated, may be removed in Spark 2.3.0)1.7
2.2 to 2.3.02.111.8

And this, in case that is not enough to figure out. If they only supported one Scala version per major release of Spark there would be a different problem of excluding people and forcing everyone to upgrade in lockstep. But if they support too many versions there is further headaches, so they tend to support two versions of Scala for each Spark release. And then if you are not matching that in your work, boom Exception is thrown. Ugggh!:

Spark VersionScala VersionJava Version
2.12.101.7
2.22.101.8

For details of this table see:

Spark Scala version stuff in Spark 2.2 docs
For more information:
  • Laurent Weichberger, Big Data Bear, Hortonworks:

    lweichberger@hortonworks.com

    Come see me at the San Jose 2018 DataWorks Summit, Spark2 training...

24,032 Views
Comments

@lweichberger Thanks for your article. It really helps a lot.

I am facing similar kind of issue with pyspark.

I am using the spark version 2.3.0.2.6.5-292.I want to read the kakfa stream. My code was working fine in spark version 2.2.0 with this package p.p1 org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

Support for spark-streaming-kafka-0-8.2.11 is deprecated in spark 2.3.0 per documentation.

I am able to import the StreamingContext from pyspark but I am not able to create the StreamingContext. I am getting the same error of Java AbstractMethod. It seems something has changed in the underneath.

I am following the official guide from here: https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html

# pyspark --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.2.6.5.0-292
      /_/


Using Python version 2.7.5 (default, Sep 15 2016 22:37:39)
SparkSession available as 'spark'.
>>> from pyspark.streaming import StreamingContext 
>>> ssc = StreamingContext(sc, 5)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/streaming/context.py", line 61, in __init__
    self._jssc = jssc or self._initialize_context(self._sc, batchDuration)
  File "/usr/hdp/current/spark2-client/python/pyspark/streaming/context.py", line 65, in _initialize_context
    return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1428, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext.
: java.lang.AbstractMethodError
	at org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:35)
	at org.apache.spark.streaming.scheduler.StreamingListenerBus.<init>(StreamingListenerBus.scala:30)
	at org.apache.spark.streaming.scheduler.JobScheduler.<init>(JobScheduler.scala:57)
	at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:184)
	at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:76)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)

>>> 

@Ajay Tanpure thank you for the comment. When I look at that official guide:

Official Guide :: Structured Streaming :: Deployment

I see their spark-sql-kafka package version number is different from yours, that is the key here.

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 ...

So we have to find the correct version for your situation.

@lweichberger, I suspect the same about the version mismatch.
https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html

As per the example given in above link, when I create the streaming context

ssc = StreamingContext(sc, 5)

I get the JavaAbstractMethodError from py4j. Using spark-sql-kafka, I can get the structured stream with new API. But I like to fix the existing version issue.