Created on 11-29-2017 04:59 PM - edited 08-17-2019 10:05 AM
There are many ways to integrate Apache NiFi and Apache Spark.
We can call Apache Spark Streaming via S2S (Apache NiFi's Site to Site) or Kafka. If you want to execute a regular Apache Spark job, you can do that via Apache Livy which is included in HDP 2.6+. This is how Apache Zeppelin integrates with Apache Spark, so it's secure and a reasonable approach.
I use this approach when I want to use Spark to process part of my process in the middle of an Apache NiFi flow.
Syntax for Calling a Job
This job is stored in HDFS as /apps/logs*jar with the class name com.dataflowdeveloper.logs.Logs.
Schema For Apache Livy Status Messages
{ "type": "record", "name": "livystatus", "fields": [ { "name": "id", "type": [ "null", "int" ] }, { "name": "state", "type": [ "null", "string" ] }, { "name": "appId", "type": [ "null", "string" ] }, { "name": "driverLogUrl", "type": [ "null", "string" ] }, { "name": "sparkUiUrl", "type": [ "null", "string" ] } ] }
Example Apache Spark Apache Livy Status Message Reformatted For Usage
{ "sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0022/", "id" : "19", "state" : "success", "driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8188/applicationhistory/logs/princeton-14-2-1.field.hortonworks.com:45454/container_e02_1511839325046_0022_01_000001/container_e02_1511839325046_0022_01_000001/livy", "appId" : "application_1511839325046_0022" }
Apache Livy Status Monitoring Flow
Query Record Processor for Querying and Determining with Result to Do
Apache Ambari Screen for Turning off CSRF Protection for Apache Livy
Results in JSON from Apache Livy REST Call
Hortonworks Schema Registry
Apache Spark Job Submitted And Running
The Apache Spark Job Environment During the Run
Results from the Apache Spark job Shown in YARN Logs
Apache YARN Run Information on the Apache Spark Job
This is the result of a completed message. As you can see we get some really cool information here. The State is really important, once it has success you can do the other processing you need.
Driver Log URL will point you to the logs, you could ingest this with Apache NiFi.
Spark UI URL will point you to the running Spark logs.
Raw REST JSON Message
{ "from" : 0, "total" : 1, "sessions" : [ { "id" : 12, "state" : "running", "appId" : "application_1511839325046_0015", "appInfo" : { "driverLogUrl" : "http://princeton-14-2-1.field.hortonworks.com:8042/node/containerlogs/container_e02_1511839325046_0015_01_000001/livy", "sparkUiUrl" : "http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/" }, "log" : [ "\t diagnostics: [Tue Nov 28 19:24:09 +0000 2017] Scheduler has assigned a container for AM, waiting for AM container to be launched", "\t ApplicationMaster host: N/A", "\t ApplicationMaster RPC port: -1", "\t queue: default", "\t start time: 1511897049505", "\t final status: UNDEFINED", "\t tracking URL: http://princeton-14-2-1.field.hortonworks.com:8088/proxy/application_1511839325046_0015/", "\t user: livy", "17/11/28 19:24:09 INFO ShutdownHookManager: Shutdown hook called", "17/11/28 19:24:09 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8ada7f2-43d0-4823-8816-6e930101f2f1" ] } ] }
REST URL: http://yourlivyapi:8999/batches/
You can see this in Ambari.
My Apache Spark Job Configuration
My Apache Spark job needs some data.
hdfs dfs -mkdir -p /user/livy/data/ hdfs dfs -put access3.log /user/livy/data hdfs dfs -chmod -R 777 /user/livy/data
The Run of the chosen Apache Spark Job
Log Type: stdout Log Upload Time: Tue Nov 28 17:59:02 +0000 2017 Log Length: 34968 ===== Log Count: 206857 LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:45 -0500,GET,200,187,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*) LogRecord(194.187.168.230,-,-,20/Feb/2016:00:00:46 -0500,GET,200,24810,-,Mozilla/5.0 (compatible; Qwantify/2.2w; +https://www.qwant.com/)/*) LogRecord(192.0.84.33,-,-,20/Feb/2016:00:01:56 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com)) LogRecord(66.249.64.8,-,-,20/Feb/2016:00:02:13 -0500,GET,200,28486,-,Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)) LogRecord(192.0.84.33,-,-,20/Feb/2016:00:02:15 -0500,HEAD,200,0,-,jetmon/1.0 (Jetpack Site Uptime Monitor by WordPress.com)) root |-- clientIp: string (nullable = true) |-- clientIdentity: string (nullable = true) |-- user: string (nullable = true) |-- dateTime: string (nullable = true) |-- request: string (nullable = true) |-- statusCode: integer (nullable = true) |-- bytesSent: long (nullable = true) |-- referer: string (nullable = true) |-- userAgent: string (nullable = true) +-------+-----------------+ |summary| bytesSent| +-------+-----------------+ | count| 206857| | mean|28017.72503226867| | stddev|137716.9426060656| | min| 0| | max| 15379053| +-------+-----------------+ == Physical Plan == *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIp, true) AS clientIp#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).clientIdentity, true) AS clientIdentity#11, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).user, true) AS user#12, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).dateTime, true) AS dateTime#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).request, true) AS request#14, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).statusCode AS statusCode#15, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).bytesSent AS bytesSent#16L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).referer, true) AS referer#17, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.dataflowdeveloper.logs.LogRecord, true]).userAgent, true) AS userAgent#18] +- Scan ExternalRDDScan[obj#9] After writing results ===== Number of Log Records: 206857 Content Size Total: 5795662547, Avg: 28017, Min: 0, Max: 15379053 =====Status Code counts: [(404,21611),(200,170127),(302,1467),(206,87),(304,1260),(406,3242),(500,1106),(409,28),(301,4968),(403,2601),(407,123),(429,1),(405,236)] =====IP Addresses Accessed > 10 times: [51.255.65.87,146.127.253.45,201.239.138.159,157.55.39.157,1.22.196.230,54.211.201.215,180.179.40.44,....]
My example simple Apache Spark job to parse Apache Logs is included in the github referenced below.
Gothcha
You may need to disable this (set to false) in Ambari under Apache Spark area:
livy.server.csrf_protection.enabled
Directories
Flow File
References:
Created on 03-26-2018 02:00 PM
Is Kerberized server supported by LivySessionController ?