Community Articles

Find and share helpful community-sourced technical articles.
Labels (2)
avatar
Master Guru

There are many ways to integrate Apache NiFi and Apache Spark.

We can call Apache Spark Streaming via S2S (Apache NiFi's Site to Site) or Kafka. If you want to execute a regular Apache Spark job, you can do that via Apache Livy which is included in HDP 2.6+. This is how Apache Zeppelin integrates with Apache Spark, so it's secure and a reasonable approach.

I use this approach when I want to use Spark to process part of my process in the middle of an Apache NiFi flow.

42797-livy-submit-flow.png

Syntax for Calling a Job

This job is stored in HDFS as /apps/logs*jar with the class name com.dataflowdeveloper.logs.Logs.

42800-livy-calltosparkjob.png

Schema For Apache Livy Status Messages

{
 "type": "record",
 "name": "livystatus",
 "fields": [
  {
   "name": "id",
   "type": [
    "null",
    "int"
   ]
  },
  {
   "name": "state",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "appId",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "driverLogUrl",
   "type": [
    "null",
    "string"
   ]
  },
  {
   "name": "sparkUiUrl",
   "type": [
    "null",
    "string"
   ]
  }
 ]
}

Example Apache Spark Apache Livy Status Message Reformatted For Usage

{
  "sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0022/",
  "id" : "19",
  "state" : "success",
  "driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8188/applicationhistory/logs/princeton-14-2-1.field.hortonworks.com:45454/container_e02_1511839325046_0022_01_000001/container_e02_1511839325046_0022_01_000001/livy",
  "appId" : "application_1511839325046_0022"
}

Apache Livy Status Monitoring Flow

42801-livy-spark-monitoring.png

Query Record Processor for Querying and Determining with Result to Do

42810-livy-queryrecord.png


Apache Ambari Screen for Turning off CSRF Protection for Apache Livy

42802-livy-csrf.png

Results in JSON from Apache Livy REST Call

42803-livy-restresults.png

Hortonworks Schema Registry

42804-livy-schema.png

Apache Spark Job Submitted And Running

42805-livy-yarn-sparkjob.png

42806-livy-sparkprocess.png

The Apache Spark Job Environment During the Run

42807-livy-spark.png

Results from the Apache Spark job Shown in YARN Logs

42808-livy-sparklog-yarn.png

Apache YARN Run Information on the Apache Spark Job

42809-livy-yarn.png

This is the result of a completed message. As you can see we get some really cool information here. The State is really important, once it has success you can do the other processing you need.

Driver Log URL will point you to the logs, you could ingest this with Apache NiFi.

Spark UI URL will point you to the running Spark logs.

Raw REST JSON Message

{
  "from" : 0,
  "total" : 1,
  "sessions" : [ {
    "id" : 12,
    "state" : "running",
    "appId" : "application_1511839325046_0015",
    "appInfo" : {
      "driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8042/node/containerlogs/container_e02_1511839325046_0015_01_000001/livy",
      "sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/"
    },
    "log" : [ "\t diagnostics: [Tue Nov 28 19:24:09 +0000 2017] Scheduler has assigned a container for AM, waiting for AM container to be launched", "\t ApplicationMaster host: N/A", "\t ApplicationMaster RPC port: -1", "\t queue: default", "\t start time: 1511897049505", "\t final status: UNDEFINED", "\t tracking URL: http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/", "\t user: livy", "17/11/28 19:24:09 INFO ShutdownHookManager: Shutdown hook called", "17/11/28 19:24:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8ada7f2-43d0-4823-8816-6e930101f2f1" ]
  } ]
}

REST URL: http://yourlivyapi:8999/batches/

You can see this in Ambari.

My Apache Spark Job Configuration

My Apache Spark job needs some data.

hdfs dfs -mkdir -p /user/livy/data/
hdfs dfs -put access3.log /user/livy/data
hdfs dfs -chmod -R 777 /user/livy/data

The Run of the chosen Apache Spark Job

Log Type: stdout
Log Upload Time: Tue Nov 28 17:59:02 +0000 2017
Log Length: 34968
===== Log Count: 206857
LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:45 -0500,GET,200,187,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*)
LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:46 -0500,GET,200,24810,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*)
LogRecord(192.0.84.33,-,-,20/Feb/2016:00:01:56 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com))
LogRecord(66.249.64.8,-,-,20/Feb/2016:00:02:13 -0500,GET,200,28486,-,Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html))
LogRecord(192.0.84.33,-,-,20/Feb/2016:00:02:15 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com))
root
 |-- clientIp: string (nullable = true)
 |-- clientIdentity: string (nullable = true)
 |-- user: string (nullable = true)
 |-- dateTime: string (nullable = true)
 |-- request: string (nullable = true)
 |-- statusCode: integer (nullable = true)
 |-- bytesSent: long (nullable = true)
 |-- referer: string (nullable = true)
 |-- userAgent: string (nullable = true)

+-------+-----------------+
|summary|        bytesSent|
+-------+-----------------+
|  count|           206857|
|   mean|28017.72503226867|
| stddev|137716.9426060656|
|    min|                0|
|    max|         15379053|
+-------+-----------------+

== Physical Plan ==
*SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIp, true) AS clientIp#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIdentity, true) AS clientIdentity#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).user, true) AS user#12, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).dateTime, true) AS dateTime#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).request, true) AS request#14, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).statusCode AS statusCode#15, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).bytesSent AS bytesSent#16L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).referer, true) AS referer#17, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).userAgent, true) AS userAgent#18]
+- Scan ExternalRDDScan[obj#9]
After writing results
===== Number of Log Records: 206857  Content Size Total: 5795662547, Avg: 28017, Min: 0, Max: 15379053
=====Status Code counts: [(404,21611),(200,170127),(302,1467),(206,87),(304,1260),(406,3242),(500,1106),(409,28),(301,4968),(403,2601),(407,123),(429,1),(405,236)]
=====IP Addresses Accessed > 10 times: [51.255.65.87,146.127.253.45,201.239.138.159,157.55.39.157,1.22.196.230,54.211.201.215,180.179.40.44,....]

My example simple Apache Spark job to parse Apache Logs is included in the github referenced below.

Gothcha

You may need to disable this (set to false) in Ambari under Apache Spark area:

livy.server.csrf_protection.enabled

Directories

  • /var/log/livy2/livy-livy-server.out

Flow File

livynifiintegration.xml

References:

10,509 Views
Comments
avatar
Contributor

Is Kerberized server supported by LivySessionController ?