Created on 02-17-2018 04:21 PM - edited 08-17-2019 08:49 AM
Version: HDF 3.1, HDP 2.6.4, PySpark 2.2.0, Python 2.7, Apache NiFi 1.5.
I push my power data from a local Apache NiFi 1.5 server over Site-to-Site HTTP to a cloud hosted HDF 3.1 cluster. This cluster has a Remote Input that passes the information on to a version controlled Process Group called "Spark-Kafka-Streaming". Once inside, I set a schema name and data type then push the data to Kafka 1.0 hosted in HDF 3.1.
The PublishKafkaRecord_1.0 settings are super easy. We use the JsonTreeReader and the supplied schema to read the JSON file into records. I chose to use the JsonRecordSetWriter to push JSON out. I could have easily done Apache Avro or CSV or another format. I chose JSON as it is easy to work with in Apache Spark and good for debug display.
This method and code will work for several versions forward, but I cannot confirm for previous versions.
This article is how to connect Apache NiFi with Apache Spark via Kafka using Spark Streaming. The example code is in PySpark.
I run the streaming Spark code two different ways for testing:
First way is via Apache Zeppelin, you will need to load the Apache Spark Kafka Streaming package to Apache Zeppelin
To add Kafka Streaming Support we just add a dependency to the spark2 interpreter and restart the interpreter with the restart button. No need to restart Apache Zeppelin or a server.
The other way I run this is as a Spark Submit with YARN Master in Cluster mode. As you see here I also include the Spark Streaming Kafka Package.
/usr/hdp/current/spark2-client/bin/spark-submit --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kafka_test.py
My example PySpark program is really basic but shows you the integration. This is forked from the standard Spark example.
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(appName="kafkaTest") ssc = StreamingContext(sc,5) print "Connected to spark streaming" def process(time, rdd): print("========= %s =========" % str(time)) if not rdd.isEmpty(): rdd.count() rdd.first() ssc = StreamingContext(sc, 5) kafkaStream = KafkaUtils.createStream(ssc, "server:2181", "pysparkclient1", {"smartPlug": 1}) kafkaStream.pprint() kafkaStream.foreachRDD(process) ssc.start() ssc.awaitTermination()
This program runs every 5 seconds and grabs the Kafka JSON message as an RDD, if it's not empty, I run a count and get the first row.
You can see the application running in Apache YARN UI.
From Apache Ambari we can monitor the data moving through the Kafka Broker topics.
We can also monitor the Spark job via the URL supplied in the output of the submit.
We can see the STDOUT of the submitted Spark job here in the YARN logs.
Example PySpark Run
root@princeton0 demo]# ./submit.sh Ivy Default Cache set to: /root/.ivy2/cache The jars for the packages stored in: /root/.ivy2/jars :: loading settings :: url = jar:file:/usr/hdp/2.6.4.0-91/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 in central found org.apache.kafka#kafka_2.11;0.8.2.1 in central found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central found com.yammer.metrics#metrics-core;2.2.0 in central found org.slf4j#slf4j-api;1.7.16 in central found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 in central found com.101tec#zkclient;0.3 in central found log4j#log4j;1.2.17 in central found org.apache.kafka#kafka-clients;0.8.2.1 in central found net.jpountz.lz4#lz4;1.3.0 in central found org.xerial.snappy#snappy-java;1.1.2.6 in central found org.apache.spark#spark-tags_2.11;2.2.0 in central found org.spark-project.spark#unused;1.0.0 in central :: resolution report :: resolve 3452ms :: artifacts dl 21ms :: modules in use: com.101tec#zkclient;0.3 from central in [default] com.yammer.metrics#metrics-core;2.2.0 from central in [default] log4j#log4j;1.2.17 from central in [default] net.jpountz.lz4#lz4;1.3.0 from central in [default] org.apache.kafka#kafka-clients;0.8.2.1 from central in [default] org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default] org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 from central in [default] org.apache.spark#spark-tags_2.11;2.2.0 from central in [default] org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 from central in [default] org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default] org.slf4j#slf4j-api;1.7.16 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.2.6 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 13 | 2 | 2 | 0 || 13 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 13 already retrieved (0kB/23ms) 18/02/17 01:03:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/02/17 01:03:01 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 18/02/17 01:03:01 INFO RMProxy: Connecting to ResourceManager at princeton0.field.hortonworks.com/172.26.200.216:8050 18/02/17 01:03:01 INFO Client: Requesting a new application from cluster with 1 NodeManagers 18/02/17 01:03:02 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (43008 MB per container) 18/02/17 01:03:02 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead 18/02/17 01:03:02 INFO Client: Setting up container launch context for our AM 18/02/17 01:03:02 INFO Client: Setting up the launch environment for our AM container 18/02/17 01:03:02 INFO Client: Preparing resources for our AM container 18/02/17 01:03:04 INFO Client: Use hdfs cache file as spark.yarn.archive for HDP, hdfsCacheFile:hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz 18/02/17 01:03:04 INFO Client: Source and destination file systems are the same. Not copying hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz 18/02/17 01:03:04 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka_2.11-0.8.2.1.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-tags_2.11-2.2.0.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.spark-project.spark_unused-1.0.0.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.yammer.metrics_metrics-core-2.2.0.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.101tec_zkclient-0.3.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka-clients-0.8.2.1.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.slf4j_slf4j-api-1.7.16.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/log4j_log4j-1.2.17.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/net.jpountz.lz4_lz4-1.3.0.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.xerial.snappy_snappy-java-1.1.2.6.jar 18/02/17 01:03:05 INFO Client: Uploading resource file:/opt/demo/kafka_test.py -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/kafka_test.py 18/02/17 01:03:05 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/pyspark.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/pyspark.zip 18/02/17 01:03:06 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/py4j-0.10.4-src.zip 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar added multiple times to distributed cache. 18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar added multiple times to distributed cache. 18/02/17 01:03:06 INFO Client: Uploading resource file:/tmp/spark-bc1bedca-6201-4715-812e-cd06f8e6efac/__spark_conf__9099337700911844616.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/__spark_conf__.zip 18/02/17 01:03:06 INFO SecurityManager: Changing view acls to: root 18/02/17 01:03:06 INFO SecurityManager: Changing modify acls to: root 18/02/17 01:03:06 INFO SecurityManager: Changing view acls groups to: 18/02/17 01:03:06 INFO SecurityManager: Changing modify acls groups to: 18/02/17 01:03:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 18/02/17 01:03:06 INFO Client: Submitting application application_1517883514475_0424 to ResourceManager 18/02/17 01:03:06 INFO YarnClientImpl: Submitted application application_1517883514475_0424 18/02/17 01:03:07 INFO Client: Application report for application_1517883514475_0424 (state: ACCEPTED) 18/02/17 01:03:07 INFO Client: client token: N/A diagnostics: AM container is launched, waiting for AM container to Register with RM ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1518829386408 final status: UNDEFINED tracking URL: http://princeton0.field.hortonworks.com:8088/proxy/application_1517883514475_0424/ user: root
Source
https://github.com/tspannhw/nifi-sparkstreaming-kafka
Reference
Using the data from here: https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth...
Example Data
{"day19": 2.035, "day20": 1.191, "day21": 0.637, "day22": 1.497, "day23": 1.151, "day24": 1.227, "day25": 1.387, "day26": 1.138, "day27": 1.204, "day28": 1.401, "day29": 1.288, "day30": 1.439, "day31": 0.126, "day1": 1.204, "day2": 1.006, "day3": 1.257, "day4": 1.053, "day5": 1.597, "day6": 1.642, "day7": 1.439, "day8": 1.178, "day9": 1.259, "day10": 0.995, "day11": 0.569, "day12": 1.287, "day13": 1.371, "day14": 1.404, "day15": 1.588, "day16": 0.474, "day17": 1.438, "day18": 1.056, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 5778, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -35, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month1": 32.674, "month2": 19.323, "current": 0.664822, "voltage": 121.700245, "power": 77.280039, "total": 0.158, "time": "02/16/2018 12:20:08", "ledon": true, "systemtime": "02/16/2018 12:20:08"}
Created on 02-19-2018 02:14 PM
@Timothy Spann Great article, but i am a bit confused between spark-streaming and spark-structured streaming intetgration with Kafka. Which one is advised to use for similar use cases, is spark-streaming planned to be deprecated soon?
Created on 02-20-2018 04:20 PM
Structured Spark Streaming is uses Spark SQL https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-structured-streaming.html
Created on 02-22-2018 03:05 PM
@balalaika ok, with no code changes here is the Structured version. Structured Streaming is basically just GA in Spark 2.2 which is HDP 2.6.4 and above. It works fine, a little different from the old style. They will probably keep the old one for until 2.5 or maybe 3.0. Both styles are nice. Another option is to use Apache Beam or Streaming Analytics Manager.