Community Articles

Find and share helpful community-sourced technical articles.
avatar
Master Guru

62414-spark-nifi-flowoverview.png

Version: HDF 3.1, HDP 2.6.4, PySpark 2.2.0, Python 2.7, Apache NiFi 1.5.


I push my power data from a local Apache NiFi 1.5 server over Site-to-Site HTTP to a cloud hosted HDF 3.1 cluster. This cluster has a Remote Input that passes the information on to a version controlled Process Group called "Spark-Kafka-Streaming". Once inside, I set a schema name and data type then push the data to Kafka 1.0 hosted in HDF 3.1.

62423-nifipushtokafkaflow.png

The PublishKafkaRecord_1.0 settings are super easy. We use the JsonTreeReader and the supplied schema to read the JSON file into records. I chose to use the JsonRecordSetWriter to push JSON out. I could have easily done Apache Avro or CSV or another format. I chose JSON as it is easy to work with in Apache Spark and good for debug display.

62424-publishkafkarecord1-0.png

This method and code will work for several versions forward, but I cannot confirm for previous versions.

This article is how to connect Apache NiFi with Apache Spark via Kafka using Spark Streaming. The example code is in PySpark.

I run the streaming Spark code two different ways for testing:

First way is via Apache Zeppelin, you will need to load the Apache Spark Kafka Streaming package to Apache Zeppelin

62415-runningpysparkinzeppelin.png

62419-zeppelindisplayingarow.png

62417-zeppelinconfiguration.png

To add Kafka Streaming Support we just add a dependency to the spark2 interpreter and restart the interpreter with the restart button. No need to restart Apache Zeppelin or a server.

The other way I run this is as a Spark Submit with YARN Master in Cluster mode. As you see here I also include the Spark Streaming Kafka Package.

/usr/hdp/current/spark2-client/bin/spark-submit  --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kafka_test.py


My example PySpark program is really basic but shows you the integration. This is forked from the standard Spark example.

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="kafkaTest")
ssc = StreamingContext(sc,5)

print "Connected to spark streaming"

def process(time, rdd):
    print("========= %s =========" % str(time))
    if not rdd.isEmpty():
        rdd.count()
        rdd.first()

ssc = StreamingContext(sc, 5)
kafkaStream = KafkaUtils.createStream(ssc, "server:2181", "pysparkclient1", {"smartPlug": 1})
kafkaStream.pprint()
kafkaStream.foreachRDD(process)

ssc.start()
ssc.awaitTermination()


This program runs every 5 seconds and grabs the Kafka JSON message as an RDD, if it's not empty, I run a count and get the first row.

You can see the application running in Apache YARN UI.

62416-yarnapplication.png

From Apache Ambari we can monitor the data moving through the Kafka Broker topics.

62418-monitoringkafka.png

We can also monitor the Spark job via the URL supplied in the output of the submit.

62420-sparkstreamingmonitoring.png

We can see the STDOUT of the submitted Spark job here in the YARN logs.

62421-yarnoutput.png


Example PySpark Run

root@princeton0 demo]# ./submit.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.4.0-91/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 in central
    found org.apache.kafka#kafka_2.11;0.8.2.1 in central
    found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
    found com.yammer.metrics#metrics-core;2.2.0 in central
    found org.slf4j#slf4j-api;1.7.16 in central
    found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 in central
    found com.101tec#zkclient;0.3 in central
    found log4j#log4j;1.2.17 in central
    found org.apache.kafka#kafka-clients;0.8.2.1 in central
    found net.jpountz.lz4#lz4;1.3.0 in central
    found org.xerial.snappy#snappy-java;1.1.2.6 in central
    found org.apache.spark#spark-tags_2.11;2.2.0 in central
    found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 3452ms :: artifacts dl 21ms
    :: modules in use:
    com.101tec#zkclient;0.3 from central in [default]
    com.yammer.metrics#metrics-core;2.2.0 from central in [default]
    log4j#log4j;1.2.17 from central in [default]
    net.jpountz.lz4#lz4;1.3.0 from central in [default]
    org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
    org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
    org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 from central in [default]
    org.apache.spark#spark-tags_2.11;2.2.0 from central in [default]
    org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 from central in [default]
    org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
    org.slf4j#slf4j-api;1.7.16 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   13  |   2   |   2   |   0   ||   13  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 13 already retrieved (0kB/23ms)
18/02/17 01:03:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/02/17 01:03:01 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
18/02/17 01:03:01 INFO RMProxy: Connecting to ResourceManager at princeton0.field.hortonworks.com/172.26.200.216:8050
18/02/17 01:03:01 INFO Client: Requesting a new application from cluster with 1 NodeManagers
18/02/17 01:03:02 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (43008 MB per container)
18/02/17 01:03:02 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/02/17 01:03:02 INFO Client: Setting up container launch context for our AM
18/02/17 01:03:02 INFO Client: Setting up the launch environment for our AM container
18/02/17 01:03:02 INFO Client: Preparing resources for our AM container
18/02/17 01:03:04 INFO Client: Use hdfs cache file as spark.yarn.archive for HDP, hdfsCacheFile:hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz
18/02/17 01:03:04 INFO Client: Source and destination file systems are the same. Not copying hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz
18/02/17 01:03:04 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka_2.11-0.8.2.1.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-tags_2.11-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.spark-project.spark_unused-1.0.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.yammer.metrics_metrics-core-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.101tec_zkclient-0.3.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka-clients-0.8.2.1.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.slf4j_slf4j-api-1.7.16.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/log4j_log4j-1.2.17.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/net.jpountz.lz4_lz4-1.3.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.xerial.snappy_snappy-java-1.1.2.6.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/opt/demo/kafka_test.py -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/kafka_test.py
18/02/17 01:03:05 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/pyspark.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/pyspark.zip
18/02/17 01:03:06 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/py4j-0.10.4-src.zip
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar added multiple times to distributed cache.
18/02/17 01:03:06 INFO Client: Uploading resource file:/tmp/spark-bc1bedca-6201-4715-812e-cd06f8e6efac/__spark_conf__9099337700911844616.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/__spark_conf__.zip
18/02/17 01:03:06 INFO SecurityManager: Changing view acls to: root
18/02/17 01:03:06 INFO SecurityManager: Changing modify acls to: root
18/02/17 01:03:06 INFO SecurityManager: Changing view acls groups to:
18/02/17 01:03:06 INFO SecurityManager: Changing modify acls groups to:
18/02/17 01:03:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
18/02/17 01:03:06 INFO Client: Submitting application application_1517883514475_0424 to ResourceManager
18/02/17 01:03:06 INFO YarnClientImpl: Submitted application application_1517883514475_0424
18/02/17 01:03:07 INFO Client: Application report for application_1517883514475_0424 (state: ACCEPTED)
18/02/17 01:03:07 INFO Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1518829386408
     final status: UNDEFINED
     tracking URL: http://princeton0.field.hortonworks.com:8088/proxy/application_1517883514475_0424/
     user: root

Source

https://github.com/tspannhw/nifi-sparkstreaming-kafka

spark-kafka-streaming.xml

Reference

Using the data from here: https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth...

Example Data

{"day19": 2.035, "day20": 1.191, "day21": 0.637, "day22": 1.497, "day23": 1.151, "day24": 1.227, "day25": 1.387, "day26": 1.138, "day27": 1.204, "day28": 1.401, "day29": 1.288, "day30": 1.439, "day31": 0.126, "day1": 1.204, "day2": 1.006, "day3": 1.257, "day4": 1.053, "day5": 1.597, "day6": 1.642, "day7": 1.439, "day8": 1.178, "day9": 1.259, "day10": 0.995, "day11": 0.569, "day12": 1.287, "day13": 1.371, "day14": 1.404, "day15": 1.588, "day16": 0.474, "day17": 1.438, "day18": 1.056, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 5778, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -35, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month1": 32.674, "month2": 19.323, "current": 0.664822, "voltage": 121.700245, "power": 77.280039, "total": 0.158, "time": "02/16/2018 12:20:08", "ledon": true, "systemtime": "02/16/2018 12:20:08"}


6,791 Views
Comments
avatar
Expert Contributor

@Timothy Spann Great article, but i am a bit confused between spark-streaming and spark-structured streaming intetgration with Kafka. Which one is advised to use for similar use cases, is spark-streaming planned to be deprecated soon?

@balalaika ok, with no code changes here is the Structured version. Structured Streaming is basically just GA in Spark 2.2 which is HDP 2.6.4 and above. It works fine, a little different from the old style. They will probably keep the old one for until 2.5 or maybe 3.0. Both styles are nice. Another option is to use Apache Beam or Streaming Analytics Manager.

https://community.hortonworks.com/content/kbentry/174105/hdp-264-hdf-31-apache-spark-structured-stre...