Member since
11-14-2017
4
Posts
5
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
8205 | 05-23-2018 01:59 PM |
06-28-2018
04:09 PM
3 Kudos
In this article I will describe how NiFi interacts with Spark via Livy. This is necessary because we are going with the assumption that HDF and HDP are two separate clusters and NiFi can't access Spark binaries so as to directly perform shell commands submitting spark jobs. There are two modes of Spark submissions via Livy:
Batch: REST endpoint is /batches Interactive: REST endpoint is /sessions Batch based invocations create a yarn container and then tears it down once the spark job finishes, where-as in interactive mode the yarn container is created and is kept alive until it's explicitly told to be closed. Based on user requirements you may go with the either of them based on SLAs. Be aware, batch-based invocations have additional time lag due to YARN container creation, where-as in interactive mode only first call has time lag and subsequent invocations are faster. Batch mode Overview: Sample spark code (python based): import sys
from pyspark import SparkContext,SparkConf
APP_NAME = "My PySpark Application"
SOURCE_DATA = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/data/testdata.dat"
DESTINATION_LOCATION = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/output"
conf = SparkConf().setAppName("My app")
def doMax(myrdd):
maxRdd = myrdd.reduceByKey(lambda x,y: y if (x < y) else x)
maxRdd.saveAsTextFile(DESTINATION_LOCATION)
def doMin(myrdd):
minRdd = myrdd.reduceByKey(lambda x,y: x if (x < y) else y)
minRdd.saveAsTextFile(DESTINATION_LOCATION)
def doSum(myrdd):
sumRdd = myrdd.reduceByKey(lambda x,y: x + y)
sumRdd.saveAsTextFile(DESTINATION_LOCATION)
def doAvg(myrdd):
compRdd = myrdd.combineByKey( (lambda x: (x, 1)), (lambda p, x: (p[0] + x, p[1] + 1)), (lambda a,b: (a[0] + b[0], a[1] + b[1])))
avgRdd = compRdd.map( lambda item: (item[0], (item[1][0]/item[1][1])))
avgRdd.saveAsTextFile(DESTINATION_LOCATION)
def main(sc, taskToDo):
myrdd =sc.textFile(SOURCE_DATA).map(lambda line: line.split(',')).map(lambda item: (int(item[0]), int(item[1])))
if taskToDo == "sum":
doSum(myrdd)
elif taskToDo == "min":
doMin(myrdd)
elif taskToDo == "max":
doMax(myrdd)
else:
doAvg(myrdd)
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
main(sc, sys.argv[1])
The code is in 'mypi.py' (hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/codebase/mypi.py) GenerateFlowFile config: UpdateAttribute config: InvokeHttp config; One attribute in the snapshot couldn't be visible, however do set up '
Content-Type' to '${mime.type}' Post execution of this processor, the response is in JSON format: {"id":50,"state":"starting","appId":null,"appInfo":{"driverLogUrl":null,"sparkUiUrl":null},"log":["stdout: ","\nstderr: ","\nYARN Diagnostics: "]}
Extract 'id' using EvaluateJSONPath: Ping livy for status of batch session based on session_id created above using InvokeHttp: Response in flowfile is in JSON format: {
"id": 52,
"state": "starting",
"appId": "application_1530193181085_0008",
"appInfo": {
"driverLogUrl": null,
"sparkUiUrl": null
},
"log": [
"\t ApplicationMaster RPC port: -1",
"\t queue: default",
"\t start time: 1530200888994",
"\t final status: UNDEFINED",
"\t tracking URL: http://sandbox-hdp.hortonworks.com:8088/proxy/application_1530193181085_0008/",
"\t user: livy",
"18/06/28 15:48:09 INFO ShutdownHookManager: Shutdown hook called",
"18/06/28 15:48:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-56d56fc0-026f-4f90-becd-1431d17680d5",
"\nstderr: ",
"\nYARN Diagnostics: "
]
}
Parse out the status using EvaluateJsonPath: We make this cyclic loop until the spark job finishes. For that, we keep looping until we get jsonStatus equals 'success'. We use RouteOnAttribute for this: If this is not complete, go back to InvokeHttp for getting status. We throttle this processor to run every 1 second. Once isComplete predicate gets satisfied, i.e jsonStatus is 'success', we exit. This is an example of implementing NiFi orchestration with Spark/Livy.
... View more
Labels: