Community Articles

Find and share helpful community-sourced technical articles.
avatar
Master Mentor

Part 1: https://community.hortonworks.com/articles/82964/getting-started-with-apache-ambari-workflow-design....

Part 2: https://community.hortonworks.com/articles/82967/apache-ambari-workflow-designer-view-for-apache-oo....

Part 3: https://community.hortonworks.com/articles/82988/apache-ambari-workflow-designer-view-for-apache-oo-...

Part 4: https://community.hortonworks.com/articles/83051/apache-ambari-workflow-designer-view-for-apache-oo-...

Part 5: https://community.hortonworks.com/articles/83361/apache-ambari-workflow-manager-view-for-apache-ooz....

Part 6: https://community.hortonworks.com/articles/83787/apache-ambari-workflow-manager-view-for-apache-ooz-...

Part 8: https://community.hortonworks.com/articles/84394/apache-ambari-workflow-manager-view-for-apache-ooz-...

Part 9: https://community.hortonworks.com/articles/85091/apache-ambari-workflow-manager-view-for-apache-ooz-...

Part 10: https://community.hortonworks.com/articles/85354/apache-ambari-workflow-manager-view-for-apache-ooz-...

Part 11: https://community.hortonworks.com/articles/85361/apache-ambari-workflow-manager-view-for-apache-ooz-...

Part 12: https://community.hortonworks.com/articles/131389/apache-ambari-workflow-manager-view-for-apache-ooz...

Welcome back folks, in this tutorial, I'm going to demonstrate how to easily import existing Spark workflows and execute them in WFM as well as create your own Spark workflows. As of today, Apache Spark 2.x is not supported in Apache Oozie bundled with HDP. There is community work around making Spark2 run in Oozie but it is not released yet. I'm going to concentrate on Spark 1.6.3 today.

First things first, I'm going to import a workflow into WFM from Oozie examples https://github.com/apache/oozie/tree/master/examples/src/main/apps/spark

My cluster setup is:

Ambari 2.5.0

HDP 2.6

HDFS HA

RM HA

Oozie HA

Kerberos

Luckily for Spark action in Kerberos environment I didn't need to add anything else (i.e. credential).

First thing I need is to get dfs.nameservices property from HDFS

Ambari > HDFS > Configs

12589-01-dfsnameservices.png

I'm going to use that for nameNode variable.

I'm ready to import this workflow into WFM, for the details, please review one of my earlier tutorials.

I'm presented with spark action node

12590-02-spark-node.png

Click on the spark-node and hit the gear icon to preview the properties.

12592-6a-preview-spark-node.png

let's also review any arguments for input and output as well as RM and NameNode, also notice prepare step, we can select to delete a directory if exists.

12594-6b.png

We're going to leave everything as is.

12591-03-preview-sample-workflow.png

When we submit the workflow, we're going to supply nameNode and resourceManager address, below are my properties

12593-4-job-config.png

notice jobTracker and resourceManager both appear, ignore jobTracker, since it was in the original wf, it was inherited, we're concerned about RM going forward. Also nameNode value is the dfs.nameservices property from core-site.xml as I stated earlier.

Once the job completes, you can navigate to the output directory and see that file was copied.

hdfs dfs -ls /user/aervits/examples/output-data/spark/
Found 3 items
-rw-r--r--   3 aervits hdfs          0 2017-02-16 17:16 /user/aervits/examples/output-data/spark/_SUCCESS
-rw-r--r--   3 aervits hdfs        706 2017-02-16 17:16 /user/aervits/examples/output-data/spark/part-00000
-rw-r--r--   3 aervits hdfs        704 2017-02-16 17:16 /user/aervits/examples/output-data/spark/part-00001

In my case sample input was a book in the examples directory

hdfs dfs -cat /user/aervits/examples/output-data/spark/part-00000
To be or not to be, that is the question;
Whether 'tis nobler in the mind to suffer
The slings and arrows of outrageous fortune,
Or to take arms against a sea of troubles,
And by opposing, end them. To die, to sleep;
No more; and by a sleep to say we end
The heart-ache and the thousand natural shocks
That flesh is heir to ? 'tis a consummation

Next up, I'm going to demonstrate authoring a new Spark action instead of importing one. I'm following a guide http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.3/bk_spark-component-guide/content/run-sample-... to demonstrate how to add this Pi job to Oozie workflow via WFM.

First you need to create a workflow directory on HDFS along with lib folder. Then upload the Spark jar to that directory.

hdfs dfs -mkdir -p oozie/spark/lib
cd /usr/hdp/current/spark-client/lib
hdfs dfs -put spark-examples-1.6.3.2.6.0.0-502-hadoop2.7.3.2.6.0.0-502.jar oozie/spark/lib

next, let's add a spark action to WFM and edit it. Fill out the properties as below and make sure to select Yarn Cluster, Yarn Client in Oozie will be deprecated soon. Notice you can pass Spark options on its own line.

12595-05-config-spark-node-for-pi.png

I also need to add an argument to SparkPi job, in this case it's 10

12596-06-config-spark-node-for-pi2.png

If you didn't figure out already, I'm trying to recreate the following command in Oozie

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10

Aside from changing yarn-client to yarn-cluster, everything else is as in the command above. I'd like to preview my XML now.

12598-07-preview-xml.png

I'm ready to submit the job and run it.

Next I'm going to demonstrate how to run a PySpark job in Oozie via WFM. The code I'm going to run is below

from pyspark import SparkContext, SparkConf
import sys

datain = sys.argv[1]
dataout = sys.argv[2]

conf = SparkConf().setAppName('counts_with_pyspark')
sc = SparkContext(conf=conf)

text_file = sc.textFile(str(datain))
counts = text_file.flatMap(lambda line: line.split(" "))              .map(lambda word: (word, 1))              .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(str(dataout))

It's taken from http://spark.apache.org/examples.html, I only added an option to pass input and output directories from command line.

I'm going to run the code to make sure it works with the following command

/usr/hdp/current/spark-client/bin/spark-submit counts.py hdfs://mycluster/user/aervits/examples/input-data/text/ hdfs://mycluster/user/aervits/pyspark-output

This will produce the output in the pyspark-output HDFS directory with a count for each instance of a word. Expected output is below

hdfs dfs –cat pyspark-output/part-0000 | less
(u'and', 7)
(u'slings', 1)
(u'fardels', 1)
(u'mind', 1)
(u'natural', 1)
(u'sea', 1)
(u'For', 2)
(u'arrows', 1)
(u'is', 2)
(u'ills', 1)
(u'resolution', 1)
(u'merit', 1)
(u'death,', 1)
(u'say', 1)
(u'pause.', 1)
(u'bare', 1)
(u'Devoutly', 1)

Next, I'm ready to add a Spark action node to WFM and edit it by populating the properties below.

12605-09-config-pyspark.png

Notice I'm passing the Spark options as well as yarn-cluster as deployment mode. Next I need to configure input/output and prepare step.

12606-10-configure-pyspark2.png

I need to delete output directory so that I can re-run my wf w/out manually deleting the output directory. Nothing new here, I'm passing the input and output as arguments to the action.

I'm ready to preview the XML.

12607-11-preview-pyspark-xml.png

Last step here is to create the lib directory in the pyspark workflow directory and upload the counts.py file there.

hdfs dfs -mkdir oozie/pyspark/lib
hdfs dfs -put counts.py oozie/pyspark/lib/

Now I am ready to submit the job, luckily it succeeds.

12608-12-job-success.png

12609-13-job-result.png

As usual, you can find my code here

https://github.com/dbist/oozie/tree/master/apps/pyspark

https://github.com/dbist/oozie/tree/master/apps/spark


09-config-pyspark.png
8,434 Views
Comments

Unfortunatelly, it does not work anymore. I've been told, that oozie pyspark is not supported anymore. Is it true?

Launcher ERROR, reason: Main class [org.apache.oozie.action.hadoop.SparkMain], exit code [1]

Truth is, I tried lots of different solutions, but the results is the same...

It doesn't work. I tried many times but the results were same: KILLED. I got following error:

Launcher exception: org/apache/spark/deploy/SparkSubmit java.lang.NoClassDefFoundError: org/apache/spark/deploy/SparkSubmit