Master Guru


Version: HDF 3.1, HDP 2.6.4, PySpark 2.2.0, Python 2.7, Apache NiFi 1.5.

I push my power data from a local Apache NiFi 1.5 server over Site-to-Site HTTP to a cloud hosted HDF 3.1 cluster. This cluster has a Remote Input that passes the information on to a version controlled Process Group called "Spark-Kafka-Streaming". Once inside, I set a schema name and data type then push the data to Kafka 1.0 hosted in HDF 3.1.


The PublishKafkaRecord_1.0 settings are super easy. We use the JsonTreeReader and the supplied schema to read the JSON file into records. I chose to use the JsonRecordSetWriter to push JSON out. I could have easily done Apache Avro or CSV or another format. I chose JSON as it is easy to work with in Apache Spark and good for debug display.


This method and code will work for several versions forward, but I cannot confirm for previous versions.

This article is how to connect Apache NiFi with Apache Spark via Kafka using Spark Streaming. The example code is in PySpark.

I run the streaming Spark code two different ways for testing:

First way is via Apache Zeppelin, you will need to load the Apache Spark Kafka Streaming package to Apache Zeppelin




To add Kafka Streaming Support we just add a dependency to the spark2 interpreter and restart the interpreter with the restart button. No need to restart Apache Zeppelin or a server.

The other way I run this is as a Spark Submit with YARN Master in Cluster mode. As you see here I also include the Spark Streaming Kafka Package.

/usr/hdp/current/spark2-client/bin/spark-submit  --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

My example PySpark program is really basic but shows you the integration. This is forked from the standard Spark example.

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="kafkaTest")
ssc = StreamingContext(sc,5)

print "Connected to spark streaming"

def process(time, rdd):
    print("========= %s =========" % str(time))
    if not rdd.isEmpty():

ssc = StreamingContext(sc, 5)
kafkaStream = KafkaUtils.createStream(ssc, "server:2181", "pysparkclient1", {"smartPlug": 1})


This program runs every 5 seconds and grabs the Kafka JSON message as an RDD, if it's not empty, I run a count and get the first row.

You can see the application running in Apache YARN UI.


From Apache Ambari we can monitor the data moving through the Kafka Broker topics.


We can also monitor the Spark job via the URL supplied in the output of the submit.


We can see the STDOUT of the submitted Spark job here in the YARN logs.


Example PySpark Run

Using the data from here:

Example Data

{"day19": 2.035, "day20": 1.191, "day21": 0.637, "day22": 1.497, "day23": 1.151, "day24": 1.227, "day25": 1.387, "day26": 1.138, "day27": 1.204, "day28": 1.401, "day29": 1.288, "day30": 1.439, "day31": 0.126, "day1": 1.204, "day2": 1.006, "day3": 1.257, "day4": 1.053, "day5": 1.597, "day6": 1.642, "day7": 1.439, "day8": 1.178, "day9": 1.259, "day10": 0.995, "day11": 0.569, "day12": 1.287, "day13": 1.371, "day14": 1.404, "day15": 1.588, "day16": 0.474, "day17": 1.438, "day18": 1.056, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 5778, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -35, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month1": 32.674, "month2": 19.323, "current": 0.664822, "voltage": 121.700245, "power": 77.280039, "total": 0.158, "time": "02/16/2018 12:20:08", "ledon": true, "systemtime": "02/16/2018 12:20:08"}

Expert Contributor

@Timothy Spann Great article, but i am a bit confused between spark-streaming and spark-structured streaming intetgration with Kafka. Which one is advised to use for similar use cases, is spark-streaming planned to be deprecated soon?

@balalaika ok, with no code changes here is the Structured version. Structured Streaming is basically just GA in Spark 2.2 which is HDP 2.6.4 and above. It works fine, a little different from the old style. They will probably keep the old one for until 2.5 or maybe 3.0. Both styles are nice. Another option is to use Apache Beam or Streaming Analytics Manager.