Created on 02-21-2018 09:34 PM - edited 08-17-2019 08:47 AM
Apache Spark 2.2.0 with Scala 2.11.8 with Java 1.8.0_112 on HDP 2.6.4 called from HDF 3.1 with Apache NiFi 1.5.
This is a follow up to: https://community.hortonworks.com/articles/173818/hdp-264-hdf-31-apache-spark-streaming-integration.... and https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth...
We are using the same Apache NiFi flow to send messages to Apache Kafka. What is nice you could have the Structured Streaming version, non-structured version and others listening to the same Topic and same messages sent by Apache NiFi.
When we start, no data yet.
We quickly get a ton of data
By Default A Kafka Cluster is 3 Nodes. Replication Factor of 3 is good then. I have one node. I had to change this. Tons of warnings in the /usr/hdf/current/kafka-broker/logs directory.
The simplest Apache Spark client is one run in the shell:
/usr/hdp/current/spark2-client/bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0
The code is a simple fork of code from this excellent highly recommended tutorial: https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-Ka... If you are submitting this job and not running in a shell, add: // In the end, stop the streaming query sq.awaitTermination
val records = spark. readStream. format("kafka"). option("subscribe", "smartPlug2"). option("kafka.bootstrap.servers", "mykafkabroker:6667").load records.printSchema val result = records. select( $"key" cast "string", $"value" cast "string", $"topic", $"partition", $"offset") import org.apache.spark.sql.streaming.{OutputMode, Trigger} import scala.concurrent.duration._ val sq = result. writeStream. format("console"). option("truncate", false). trigger(Trigger.ProcessingTime(10.seconds)). outputMode(OutputMode.Append). queryName("scalastrstrclient"). start sq.status
Example Run
Spark context Web UI available at http://myipiscool:4045 Spark context available as 'sc' (master = local[*], app id = local-1519248053841). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.2.0.2.6.4.0-91 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Type :help for more information. scala> val records = spark. | readStream. | format("kafka"). | option("subscribe", "smartPlug2"). | option("kafka.bootstrap.servers", "server:6667").load records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] scala> records.printSchema root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) scala> val result = records. | select( | $"key" cast "string", | $"value" cast "string", | $"topic", | $"partition", | $"offset") result: org.apache.spark.sql.DataFrame = [key: string, value: string ... 3 more fields] scala> import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.sql.streaming.{OutputMode, Trigger} scala> import scala.concurrent.duration._ import scala.concurrent.duration._ scala> val sq = result. | writeStream. | format("console"). | option("truncate", false). | trigger(Trigger.ProcessingTime(10.seconds)). | outputMode(OutputMode.Append). | queryName("scalastrstrclient"). | start sq: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3638a852 scala> sq.status res1: org.apache.spark.sql.streaming.StreamingQueryStatus = { "message" : "Getting offsets from KafkaSource[Subscribe[smartPlug2]]", "isDataAvailable" : false, "isTriggerActive" : true } scala> ------------------------------------------- Batch: 0 ------------------------------------------- +---+-----+-----+---------+------+ |key|value|topic|partition|offset| +---+-----+-----+---------+------+ +---+-----+-----+---------+------+ ------------------------------------------- Batch: 1 ------------------------------------------- +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+ |key |value |topic |partition|offset| +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+ |02/21/2018 16:22:00|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.333,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:BF:B1:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"7777","fwId":"777","oemId":"FFF22CFF774A0B89F7624BFC6F50D5DE","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"777","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452287,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41,"longitude":-77,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.888908,"voltage":118.880856,"power":103.141828,"total":8.19,"time":"02/21/2018 16:22:00","ledon":true,"systemtime":"02/21/2018 16:22:00"}|smartPlug2|0 |14 | +-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+
Example JSON Data
|02/21/2018 16:23:58|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.337,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:88:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"8888","fwId":"6767","oemId":"6767","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"7676","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452404,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41.3241234,"longitude":-74.1234234,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.932932,"voltage":118.890282,"power":107.826982,"total":8.194,"time":"02/21/2018 16:23:58","ledon":true,"systemtime":"02/21/2018 16:23:58"}|smartPlug2|0 |24
Reference:
Created on 09-25-2018 08:24 PM
@Timothy Spann What would this look like in HDP 2.6.5 (kerberized), with Kafka 1.0.0? I tried your above scala code (without Nifi, with an existing Kafka setup) with spark 2.3.0, scala 2.11.8, and the spark-sql-kafka-0-10_2.11:2.3.0 package, and when the stream starts, I get no data and four "<timestamp> WARN NetworkClient: Bootstrap broker <hostname>:6667 disconnected" messages every second.
Created on 09-26-2018 10:17 PM
I was able to answer my own question.
To make spark work in my kerberized HDP 2.6.5 cluster, I had to pass the jaas config file to the driver, and set the correct security protocol:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --driver-java-options "-Dhttp.proxyHost=<my proxy host> -Dhttp.proxyPort=80 -Djava.security.auth.login.config=<kafka client jaas config filename>"
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "<hostname>:6667").option("kafka.security.protocol", "SASL_PLAINTEXT").option("subscribe", "<mytopic>").load
Created on 12-07-2018 04:59 PM
Thanks for the update!