Created on 02-16-2017 08:01 PM - edited 08-17-2019 02:24 PM
Welcome back folks, in this tutorial, I'm going to demonstrate how to easily import existing Spark workflows and execute them in WFM as well as create your own Spark workflows. As of today, Apache Spark 2.x is not supported in Apache Oozie bundled with HDP. There is community work around making Spark2 run in Oozie but it is not released yet. I'm going to concentrate on Spark 1.6.3 today.
First things first, I'm going to import a workflow into WFM from Oozie examples https://github.com/apache/oozie/tree/master/examples/src/main/apps/spark
My cluster setup is:
Luckily for Spark action in Kerberos environment I didn't need to add anything else (i.e. credential).
First thing I need is to get dfs.nameservices property from HDFS
Ambari > HDFS > Configs
I'm going to use that for nameNode variable.
I'm ready to import this workflow into WFM, for the details, please review one of my earlier tutorials.
I'm presented with spark action node
Click on the spark-node and hit the gear icon to preview the properties.
let's also review any arguments for input and output as well as RM and NameNode, also notice prepare step, we can select to delete a directory if exists.
We're going to leave everything as is.
When we submit the workflow, we're going to supply nameNode and resourceManager address, below are my properties
notice jobTracker and resourceManager both appear, ignore jobTracker, since it was in the original wf, it was inherited, we're concerned about RM going forward. Also nameNode value is the dfs.nameservices property from core-site.xml as I stated earlier.
Once the job completes, you can navigate to the output directory and see that file was copied.
hdfs dfs -ls /user/aervits/examples/output-data/spark/ Found 3 items -rw-r--r-- 3 aervits hdfs 0 2017-02-16 17:16 /user/aervits/examples/output-data/spark/_SUCCESS -rw-r--r-- 3 aervits hdfs 706 2017-02-16 17:16 /user/aervits/examples/output-data/spark/part-00000 -rw-r--r-- 3 aervits hdfs 704 2017-02-16 17:16 /user/aervits/examples/output-data/spark/part-00001
In my case sample input was a book in the examples directory
hdfs dfs -cat /user/aervits/examples/output-data/spark/part-00000 To be or not to be, that is the question; Whether 'tis nobler in the mind to suffer The slings and arrows of outrageous fortune, Or to take arms against a sea of troubles, And by opposing, end them. To die, to sleep; No more; and by a sleep to say we end The heart-ache and the thousand natural shocks That flesh is heir to ? 'tis a consummation
Next up, I'm going to demonstrate authoring a new Spark action instead of importing one. I'm following a guide http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.3/bk_spark-component-guide/content/run-sample-... to demonstrate how to add this Pi job to Oozie workflow via WFM.
First you need to create a workflow directory on HDFS along with lib folder. Then upload the Spark jar to that directory.
hdfs dfs -mkdir -p oozie/spark/lib cd /usr/hdp/current/spark-client/lib hdfs dfs -put spark-examples-220.127.116.11.6.0.0-502-hadoop18.104.22.168.6.0.0-502.jar oozie/spark/lib
next, let's add a spark action to WFM and edit it. Fill out the properties as below and make sure to select Yarn Cluster, Yarn Client in Oozie will be deprecated soon. Notice you can pass Spark options on its own line.
I also need to add an argument to SparkPi job, in this case it's 10
If you didn't figure out already, I'm trying to recreate the following command in Oozie
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10
Aside from changing yarn-client to yarn-cluster, everything else is as in the command above. I'd like to preview my XML now.
I'm ready to submit the job and run it.
Next I'm going to demonstrate how to run a PySpark job in Oozie via WFM. The code I'm going to run is below
from pyspark import SparkContext, SparkConf import sys datain = sys.argv dataout = sys.argv conf = SparkConf().setAppName('counts_with_pyspark') sc = SparkContext(conf=conf) text_file = sc.textFile(str(datain)) counts = text_file.flatMap(lambda line: line.split(" ")) .map(lambda word: (word, 1)) .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile(str(dataout))
It's taken from http://spark.apache.org/examples.html, I only added an option to pass input and output directories from command line.
I'm going to run the code to make sure it works with the following command
/usr/hdp/current/spark-client/bin/spark-submit counts.py hdfs://mycluster/user/aervits/examples/input-data/text/ hdfs://mycluster/user/aervits/pyspark-output
This will produce the output in the pyspark-output HDFS directory with a count for each instance of a word. Expected output is below
hdfs dfs –cat pyspark-output/part-0000 | less (u'and', 7) (u'slings', 1) (u'fardels', 1) (u'mind', 1) (u'natural', 1) (u'sea', 1) (u'For', 2) (u'arrows', 1) (u'is', 2) (u'ills', 1) (u'resolution', 1) (u'merit', 1) (u'death,', 1) (u'say', 1) (u'pause.', 1) (u'bare', 1) (u'Devoutly', 1)
Next, I'm ready to add a Spark action node to WFM and edit it by populating the properties below.
Notice I'm passing the Spark options as well as yarn-cluster as deployment mode. Next I need to configure input/output and prepare step.
I need to delete output directory so that I can re-run my wf w/out manually deleting the output directory. Nothing new here, I'm passing the input and output as arguments to the action.
I'm ready to preview the XML.
Last step here is to create the lib directory in the pyspark workflow directory and upload the counts.py file there.
hdfs dfs -mkdir oozie/pyspark/lib hdfs dfs -put counts.py oozie/pyspark/lib/
Now I am ready to submit the job, luckily it succeeds.
As usual, you can find my code here