Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (2)
New Contributor

In this article I will describe how NiFi interacts with Spark via Livy. This is necessary because we are going with the assumption that HDF and HDP are two separate clusters and NiFi can't access Spark binaries so as to directly perform shell commands submitting spark jobs.

There are two modes of Spark submissions via Livy:

  • Batch: REST endpoint is /batches
  • Interactive: REST endpoint is /sessions

Batch based invocations create a yarn container and then tears it down once the spark job finishes, where-as in interactive mode the yarn container is created and is kept alive until it's explicitly told to be closed. Based on user requirements you may go with the either of them based on SLAs. Be aware, batch-based invocations have additional time lag due to YARN container creation, where-as in interactive mode only first call has time lag and subsequent invocations are faster.


Batch mode

Overview:

77854-screen-shot-2018-06-28-at-93222-am.png

Sample spark code (python based):

import sys
from pyspark import SparkContext,SparkConf


APP_NAME = "My PySpark Application"
SOURCE_DATA = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/data/testdata.dat"
DESTINATION_LOCATION = "hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/output"


conf = SparkConf().setAppName("My app")


def doMax(myrdd):
        maxRdd = myrdd.reduceByKey(lambda x,y: y if (x < y) else x)
 maxRdd.saveAsTextFile(DESTINATION_LOCATION)


def doMin(myrdd):
        minRdd = myrdd.reduceByKey(lambda x,y: x if (x < y) else y)
 minRdd.saveAsTextFile(DESTINATION_LOCATION)


def doSum(myrdd):
        sumRdd = myrdd.reduceByKey(lambda x,y: x + y)
        sumRdd.saveAsTextFile(DESTINATION_LOCATION)


def doAvg(myrdd):
        compRdd = myrdd.combineByKey( (lambda x: (x, 1)), (lambda p, x: (p[0] + x, p[1] + 1)),  (lambda a,b: (a[0] + b[0], a[1] + b[1])))
        avgRdd = compRdd.map( lambda item: (item[0], (item[1][0]/item[1][1])))
        avgRdd.saveAsTextFile(DESTINATION_LOCATION) 




def main(sc, taskToDo):
 myrdd =sc.textFile(SOURCE_DATA).map(lambda line: line.split(',')).map(lambda item: (int(item[0]), int(item[1])))
 if taskToDo == "sum":
  doSum(myrdd)
 elif taskToDo == "min":
  doMin(myrdd)
 elif taskToDo == "max":
  doMax(myrdd)
 else:
  doAvg(myrdd)






if __name__ == "__main__":
 conf = SparkConf().setAppName(APP_NAME)
     sc   = SparkContext(conf=conf)
 main(sc, sys.argv[1])


The code is in 'mypi.py' (hdfs://sandbox-hdp.hortonworks.com:8020/user/root/skekatpu/codebase/mypi.py)

GenerateFlowFile config:

77857-screen-shot-2018-06-28-at-111842-am.png

UpdateAttribute config:

77858-screen-shot-2018-06-28-at-112004-am.png

InvokeHttp config;

77859-screen-shot-2018-06-28-at-112100-am.png

One attribute in the snapshot couldn't be visible, however do set up ' Content-Type' to '${mime.type}'

Post execution of this processor, the response is in JSON format:

{"id":50,"state":"starting","appId":null,"appInfo":{"driverLogUrl":null,"sparkUiUrl":null},"log":["stdout: ","\nstderr: ","\nYARN Diagnostics: "]}

Extract 'id' using EvaluateJSONPath:

77860-screen-shot-2018-06-28-at-112445-am.png

Ping livy for status of batch session based on session_id created above using InvokeHttp:

77861-screen-shot-2018-06-28-at-112625-am.png

Response in flowfile is in JSON format:

{
  "id": 52,
  "state": "starting",
  "appId": "application_1530193181085_0008",
  "appInfo": {
    "driverLogUrl": null,
    "sparkUiUrl": null
  },
  "log": [
    "\t ApplicationMaster RPC port: -1",
    "\t queue: default",
    "\t start time: 1530200888994",
    "\t final status: UNDEFINED",
    "\t tracking URL: http://sandbox-hdp.hortonworks.com:8088/proxy/application_1530193181085_0008/",
    "\t user: livy",
    "18/06/28 15:48:09 INFO ShutdownHookManager: Shutdown hook called",
    "18/06/28 15:48:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-56d56fc0-026f-4f90-becd-1431d17680d5",
    "\nstderr: ",
    "\nYARN Diagnostics: "
  ]
}

Parse out the status using EvaluateJsonPath:

77864-screen-shot-2018-06-28-at-115528-am.png

We make this cyclic loop until the spark job finishes. For that, we keep looping until we get jsonStatus equals 'success'. We use RouteOnAttribute for this:

77865-screen-shot-2018-06-28-at-120403-pm.png

If this is not complete, go back to InvokeHttp for getting status. We throttle this processor to run every 1 second. Once isComplete predicate gets satisfied, i.e jsonStatus is 'success', we exit.

77866-screen-shot-2018-06-28-at-120644-pm.png

This is an example of implementing NiFi orchestration with Spark/Livy.

1,127 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 07:03 AM
Updated by:
 
Contributors
Top Kudoed Authors