Member since
11-14-2017
4
Posts
5
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
9378 | 05-23-2018 01:59 PM |
06-28-2018
04:09 PM
3 Kudos
In this article I will describe how NiFi interacts with Spark via Livy. This is necessary because we are going with the assumption that HDF and HDP are two separate clusters and NiFi can't access Spark binaries so as to directly perform shell commands submitting spark jobs. There are two modes of Spark submissions via Livy:
Batch: REST endpoint is /batches Interactive: REST endpoint is /sessions Batch based invocations create a yarn container and then tears it down once the spark job finishes, where-as in interactive mode the yarn container is created and is kept alive until it's explicitly told to be closed. Based on user requirements you may go with the either of them based on SLAs. Be aware, batch-based invocations have additional time lag due to YARN container creation, where-as in interactive mode only first call has time lag and subsequent invocations are faster. Batch mode Overview: Sample spark code (python based): import sys
from pyspark import SparkContext,SparkConf
APP_NAME = "My PySpark Application"
SOURCE_DATA = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/data/testdata.dat"
DESTINATION_LOCATION = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/output"
conf = SparkConf().setAppName("My app")
def doMax(myrdd):
maxRdd = myrdd.reduceByKey(lambda x,y: y if (x < y) else x)
maxRdd.saveAsTextFile(DESTINATION_LOCATION)
def doMin(myrdd):
minRdd = myrdd.reduceByKey(lambda x,y: x if (x < y) else y)
minRdd.saveAsTextFile(DESTINATION_LOCATION)
def doSum(myrdd):
sumRdd = myrdd.reduceByKey(lambda x,y: x + y)
sumRdd.saveAsTextFile(DESTINATION_LOCATION)
def doAvg(myrdd):
compRdd = myrdd.combineByKey( (lambda x: (x, 1)), (lambda p, x: (p[0] + x, p[1] + 1)), (lambda a,b: (a[0] + b[0], a[1] + b[1])))
avgRdd = compRdd.map( lambda item: (item[0], (item[1][0]/item[1][1])))
avgRdd.saveAsTextFile(DESTINATION_LOCATION)
def main(sc, taskToDo):
myrdd =sc.textFile(SOURCE_DATA).map(lambda line: line.split(',')).map(lambda item: (int(item[0]), int(item[1])))
if taskToDo == "sum":
doSum(myrdd)
elif taskToDo == "min":
doMin(myrdd)
elif taskToDo == "max":
doMax(myrdd)
else:
doAvg(myrdd)
if __name__ == "__main__":
conf = SparkConf().setAppName(APP_NAME)
sc = SparkContext(conf=conf)
main(sc, sys.argv[1])
The code is in 'mypi.py' (hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/codebase/mypi.py) GenerateFlowFile config: UpdateAttribute config: InvokeHttp config; One attribute in the snapshot couldn't be visible, however do set up '
Content-Type' to '${mime.type}' Post execution of this processor, the response is in JSON format: {"id":50,"state":"starting","appId":null,"appInfo":{"driverLogUrl":null,"sparkUiUrl":null},"log":["stdout: ","\nstderr: ","\nYARN Diagnostics: "]}
Extract 'id' using EvaluateJSONPath: Ping livy for status of batch session based on session_id created above using InvokeHttp: Response in flowfile is in JSON format: {
"id": 52,
"state": "starting",
"appId": "application_1530193181085_0008",
"appInfo": {
"driverLogUrl": null,
"sparkUiUrl": null
},
"log": [
"\t ApplicationMaster RPC port: -1",
"\t queue: default",
"\t start time: 1530200888994",
"\t final status: UNDEFINED",
"\t tracking URL: http://sandbox-hdp.hortonworks.com:8088/proxy/application_1530193181085_0008/",
"\t user: livy",
"18/06/28 15:48:09 INFO ShutdownHookManager: Shutdown hook called",
"18/06/28 15:48:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-56d56fc0-026f-4f90-becd-1431d17680d5",
"\nstderr: ",
"\nYARN Diagnostics: "
]
}
Parse out the status using EvaluateJsonPath: We make this cyclic loop until the spark job finishes. For that, we keep looping until we get jsonStatus equals 'success'. We use RouteOnAttribute for this: If this is not complete, go back to InvokeHttp for getting status. We throttle this processor to run every 1 second. Once isComplete predicate gets satisfied, i.e jsonStatus is 'success', we exit. This is an example of implementing NiFi orchestration with Spark/Livy.
... View more
Labels:
05-23-2018
01:59 PM
1 Kudo
Finally I was able to get it working. You need to pass 'spark.yarn.dist.pyFiles' to conf. An example: curl -X POST --data '{"kind":"pyspark", "conf":{ "spark.yarn.dist.pyFiles" : "hdfs://sandbox-hdp.hortonworks.com:8020/user/skekatpu/pw/codebase"} }' -H "Content-Type: application/json" -H "X-Requested-By: someuserid"http://localhost:8999/sessions ...where 'codebase' is an hdfs folder containing .py modules. Felix: Yes, we have some flows that work with batches as well, but this particular one needs interactive connectivity to Livy, and hence /sessions needs to be used.
... View more
05-23-2018
12:25 AM
1 Kudo
Sorry, it didn't work. Here's the request to create session: curl -X POST --data '{"kind":"pyspark", "conf":{ "spark.submit.pyFiles" : "/user/skekatpu/pw/codebase/splitter.py"} }' -H "Content-Type: application/json" -H "X-Requested-By: root" http://localhost:8999/sessions I retried it using fully qualified hdfs name (hdfs:///sandbox-hdp.hortonworks.com/user/skekatpu/pw/codebase/splitter.py), still didn't work. Response: { "id": 1, "code": "import splitter ", "state": "available", "output": { "status": "error", "execution_count": 1, "ename": "ImportError", "evalue": "No module named splitter", "traceback": [ "Traceback (most recent call last):\n", "ImportError: No module named splitter\n" ] }, "progress": 1.0 }
... View more
05-22-2018
07:18 PM
1 Kudo
Platform: HDP 2.6.4 If I set –py-files in pyspark (shell mode), it works fine. However, if I set pyFiles parameter in Livy’s CURL request, it returns error “No module found” I was able to replicate this issue on HDP sandbox as well. Example: Create livy/spark session: curl -X POST --data '{"kind": "pyspark", "pyFiles" : ["/some hdfs location/splitter.py"]}' -H "Content-Type: application/json" -H "X-Requested-By: root"http://localhost:8999/sessions Submit livy/spark statement: Based on the response above, I extracted session id, and it was 71. curl -X POST --data '{"code": "from splitter import getWords"}' \-H "Content-Type: application/json" -H "X-Requested-By: root"http://localhost:8999/sessions/71/statements Check statement status: curl -X GET -H "Content-Type: application/json" -H "X-Requested-By: root"http://localhost:8999/sessions/71/statements
Response: {
"id": 0,
"code": "from splitter import getWords",
"state": "available",
"output": {
"status": "error",
"execution_count": 0,
"ename": "ImportError",
"evalue": "No module named splitter",
"traceback": [
"Traceback (most recent call last):\n",
"ImportError: No module named splitter\n"
]
},
"progress": 1.0
} Any ideas? pyspark shell works fine, but Livy does not. Please suggest. Thank you
... View more
Labels:
- Labels:
-
Apache Spark