Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka class not found error when running Atlas hook in Oozie

avatar
Contributor

I am running a Spark action on Oozie in a Cloudera 7.1.4 cluster.

The action itself completes successfully but in the logs there is a stack trace showing the atlas hook failed. It looks like the application is looking for the Spark SQL Kafka 0.10 Jar.

 

<<< Invocation of Main class completed <<<

2021-03-08 09:31:00,968 [SparkExecutionPlanProcessor-thread] WARN  com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor  - Caught exception during parsing event
java.lang.ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaRelation
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:398)
	at com.hortonworks.spark.atlas.utils.ReflectionHelper$.classForName(ReflectionHelper.scala:113)
	at org.apache.spark.sql.kafka010.atlas.ExtractFromDataSource$.isKafkaRelation(ExtractFromDataSource.scala:187)
	at com.hortonworks.spark.atlas.sql.CommandsHarvester$KafkaEntities$.unapply(CommandsHarvester.scala:560)
	at com.hortonworks.spark.atlas.sql.CommandsHarvester$$anonfun$com$hortonworks$spark$atlas$sql$CommandsHarvester$$discoverInputsEntities$1.apply(CommandsHarvester.scala:260)
	at com.hortonworks.spark.atlas.sql.CommandsHarvester$$anonfun$com$hortonworks$spark$atlas$sql$CommandsHarvester$$discoverInputsEntities$1.apply(CommandsHarvester.scala:249)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at com.hortonworks.spark.atlas.sql.CommandsHarvester$.com$hortonworks$spark$atlas$sql$CommandsHarvester$$discoverInputsEntities(CommandsHarvester.scala:249)
	at com.hortonworks.spark.atlas.sql.CommandsHarvester$InsertIntoHadoopFsRelationHarvester$.harvest(CommandsHarvester.scala:76)
	at com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor$$anonfun$2.apply(SparkExecutionPlanProcessor.scala:138)
	at com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor$$anonfun$2.apply(SparkExecutionPlanProcessor.scala:97)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
	at com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor.process(SparkExecutionPlanProcessor.scala:97)
	at com.hortonworks.spark.atlas.sql.SparkExecutionPlanProcessor.process(SparkExecutionPlanProcessor.scala:71)
	at com.hortonworks.spark.atlas.AbstractEventProcessor.eventProcess(AbstractEventProcessor.scala:97)
	at com.hortonworks.spark.atlas.AbstractEventProcessor$$anon$1.run(AbstractEventProcessor.scala:46)
Oozie Launcher, uploading action data to HDFS sequence file: hdfs://XXXX:8020/user/XXXX/oozie-oozi/0000007-210307120244183-oozie-oozi-W/XXXX--spark/action-data.seq
2021-03-08 09:31:01,011 [main] INFO  org.apache.hadoop.io.compress.CodecPool  - Got brand-new compressor [.deflate]
Stopping AM
2021-03-08 09:31:01,039 [main] INFO  org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl  - Waiting for application to be successfully unregistered.
Callback notification attempts left 0
Callback notification trying http://XXXX/oozie/callback?id=0000007-210307120244183-oozie-oozi-W@XXXX&status=SUCCEEDED
Callback notification to http://XXXX/oozie/callback?id=0000007-210307120244183-oozie-oozi-W@XXXX&status=SUCCEEDED succeeded
Callback notification succeeded
2021-03-08 09:31:01,174 [shutdown-hook-0] INFO  org.apache.spark.SparkContext  - Invoking stop() from shutdown hook
2021-03-08 09:31:01,175 [spark-listener-group-shared] INFO  com.hortonworks.spark.atlas.SparkAtlasEventTracker  - Receiving application end event - shutting down SAC
2021-03-08 09:31:01,179 [spark-listener-group-shared] INFO  com.hortonworks.spark.atlas.SparkAtlasEventTracker  - Done shutting down SAC
2021-03-08 09:31:01,810 [dispatcher-event-loop-4] INFO  org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint  - Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.30.10.77:33544) with ID 3

I can see that this Jar exists in the parcel on the Oozie host.

$ pwd
/opt/cloudera/parcels/CDH/jars

$ ls -l spark*kafka*
-rw-r--r--. 1 root root 538452 Oct  6 07:14 spark-sql-kafka-0-10_2.11-2.4.0.7.1.4.0-203.jar
-rw-r--r--. 1 root root 216594 Oct  6 07:15 spark-streaming-kafka-0-10_2.11-2.4.0.7.1.4.0-203.jar

Looking through the rest of the log I can see that Oozie loads the Spark Streaming Kafka jar, but not the SQL one.

2021-03-08 09:30:42,979 [main] INFO  org.apache.spark.deploy.yarn.Client  - Source and destination file systems are the same. Not copying hdfs://XXXX:8020/user/oozie/share/lib/lib_20210221143618/spark/spark-streaming-kafka-0-10_2.11-2.4.0.7.1.4.0-203.jar

How can I fix this?

1 ACCEPTED SOLUTION

avatar
Contributor

Summarising all of the above, assuming you need the spark - atlas hook in your system, the solution is as follows.

 

  • You need to add the file atlas-application.properties and spark-sql-kafka-0-10_2.11-<version>.jar to the Oozie shared spark  library.
  • The library is located on HDFS at <home>/oozie/share/lib/lib_<date>/spark.
  • The application properties file can be found in various places such as /etc/hive/conf.cloudera.hive/.
  • The jar file can be found in the parcels directory in /opt/cloudera/parcels/CDH/jars/.
  • When you copy the files they need to be owned by oozie.oozie and be world readable.
  • If you make any changes to the shared library you must then restart the Oozie server before jobs can find the new files.

View solution in original post

8 REPLIES 8

avatar
Expert Contributor

Hello,

Have you included the jar in the spark-submit/spark shell command as below (comma separated for multiple jars)
$ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar>

SME || Kafka | Schema Registry | SMM | SRM

avatar
Contributor

Hi Nanindin,

 

This is running inside Oozie as a spark action, so is not using a spark submit.

I am not running the atlas hook as part of my application, Oozie is running it itself as part of the tear down at the end of the task. So I would expect Oozie to be in control of its own environment.

 

Regards

Steve

avatar
Master Collaborator

Hello hindmasj,


Could you please check, Atlas service is enabled in the Spark service or not?

Spark --> Configuration --> Atlas Service 

If Atlas service is enabled then Spark internally requires the spark-sql-kafka-0-10_2.11-2.4.0.7.1.4.0-203.jar file. 

Copy the jar to hdfs and update in Spark action in oozie

<jar>hdfs://host/path/to/spark-sql-kafka-0-10_2.11-2.4.0.7.1.4.0-203.jar</jar>

If Atlas service not required please disable and run the Spark job.

avatar
Contributor

Hi Range,


I checked and the atlas service is configured for Spark. I think we need it for Ranger? I thought about your suggestion about adding it to the workflow, but if that was the case we would need to add it to every workflow, as using the atlas hook is an administrative decision, not a developer one.


Anyway I did some reading around and saw that Oozie builds a shared library, in oozie/share/lib/lib_<date> and that under there all of the spark jars in a spark directory. The sql-kafka jar is not in there. I tried to rebuild the library with oozie -> actions -> Install Oozie Share Lib, but the new library did not contain the required file either.


So I found the right jar in parcels/CDH/jars and copied that to the library. That still did not work and I noticed that the Spark job was still being built with the old library. At this point I should have restarted Oozie, but I did not know that and Cloudera Manager did not indicate it was required. So I deleted the old library and then the job was built with the new library. This however was a big mistake as the new job could not register a spark listener. The error was

 

Caused by: org.apache.atlas.AtlasException: Failed to load application properties
	at org.apache.atlas.ApplicationProperties.get(ApplicationProperties.java:147)

 

It seems that the recreation of the share lib did not include the file atlas-application.properties. It took me a few hours and various restarts of services and redeployment of client configs before I discovered this was the root cause. I manually added the file to the share lib and restarted Oozie again. After this I could run my job and the error about the atlas hook failing was gone.

 

I think if I had any recommendations they would be

 

1. If you enable the spark atlas hook then Oozie should include the atlas properties file in the share lib. In fact it should probably do this by default.
2. Likewise, the share lib should include the spark-sql-kafka jar.
3. When you rebuild the shared lib then Oozie should be flagged by Cloudera Manager as requiring a restart.

 

Regards
Steve

 

avatar
Master Collaborator

Hi Steve,

 

If you enable Atlas service in Spark, then there will be two flows

             ---> Your Application Flow 

Spark 

             --> Write Spark events to Kafka --> HBase --> This HBase data is visualised in Atlas UI.

 

Please check with admin team, for Spark Atlas service is required or not. If it is not required then disable in the Spark UI. Then you will not see any issues in oozie. Mean while have you tried to submit the same job without oozie? 

avatar
Contributor

Hi Ranga,

 

I am pretty sure it is required as part of our data governance policy but thank you for the tip.

 

Regards

Steve

avatar
Master Collaborator

Please let me know, further any help is required on this issue.

avatar
Contributor

Summarising all of the above, assuming you need the spark - atlas hook in your system, the solution is as follows.

 

  • You need to add the file atlas-application.properties and spark-sql-kafka-0-10_2.11-<version>.jar to the Oozie shared spark  library.
  • The library is located on HDFS at <home>/oozie/share/lib/lib_<date>/spark.
  • The application properties file can be found in various places such as /etc/hive/conf.cloudera.hive/.
  • The jar file can be found in the parcels directory in /opt/cloudera/parcels/CDH/jars/.
  • When you copy the files they need to be owned by oozie.oozie and be world readable.
  • If you make any changes to the shared library you must then restart the Oozie server before jobs can find the new files.