Community Articles

Find and share helpful community-sourced technical articles.
avatar
Master Guru

62466-hdfflow.png

Apache Spark 2.2.0 with Scala 2.11.8 with Java 1.8.0_112 on HDP 2.6.4 called from HDF 3.1 with Apache NiFi 1.5.

This is a follow up to: https://community.hortonworks.com/articles/173818/hdp-264-hdf-31-apache-spark-streaming-integration.... and https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth...

We are using the same Apache NiFi flow to send messages to Apache Kafka. What is nice you could have the Structured Streaming version, non-structured version and others listening to the same Topic and same messages sent by Apache NiFi.

When we start, no data yet.

62468-scalalogbatch0.png

We quickly get a ton of data

62467-scalalogs.png

By Default A Kafka Cluster is 3 Nodes. Replication Factor of 3 is good then. I have one node. I had to change this. Tons of warnings in the /usr/hdf/current/kafka-broker/logs directory.

62469-kafkatopicreplicationfactor.png

The simplest Apache Spark client is one run in the shell:

/usr/hdp/current/spark2-client/bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

The code is a simple fork of code from this excellent highly recommended tutorial: https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-Ka... If you are submitting this job and not running in a shell, add: // In the end, stop the streaming query sq.awaitTermination

val records = spark.
  readStream.
  format("kafka").
  option("subscribe", "smartPlug2").
  option("kafka.bootstrap.servers", "mykafkabroker:6667").load

records.printSchema

val result = records.
  select(
    $"key" cast "string",   
    $"value" cast "string",
    $"topic",
    $"partition",
    $"offset")

import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = result.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Append).
  queryName("scalastrstrclient").
  start

sq.status

Example Run

Spark context Web UI available at http://myipiscool:4045
Spark context available as 'sc' (master = local[*], app id = local-1519248053841).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0.2.6.4.0-91
      /_/


Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val records = spark.
     |   readStream.
     |   format("kafka"). 
     |   option("subscribe", "smartPlug2").
     |   option("kafka.bootstrap.servers", "server:6667").load
records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

scala> records.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

scala> val result = records.
     |   select(
     |     $"key" cast "string",  
     |     $"value" cast "string", 
     |     $"topic",
     |     $"partition",
     |     $"offset")
result: org.apache.spark.sql.DataFrame = [key: string, value: string ... 3 more fields]

scala> import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}


scala> import scala.concurrent.duration._
import scala.concurrent.duration._

scala> val sq = result.
     |   writeStream.
     |   format("console").
     |   option("truncate", false).
     |   trigger(Trigger.ProcessingTime(10.seconds)).
     |   outputMode(OutputMode.Append).
     |   queryName("scalastrstrclient").
     |   start
sq: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3638a852

scala> sq.status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
  "message" : "Getting offsets from KafkaSource[Subscribe[smartPlug2]]",
  "isDataAvailable" : false,
  "isTriggerActive" : true
}

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+
|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+


-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+
|key                |value                                                                                                                                                                          |topic     |partition|offset|
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+
|02/21/2018 16:22:00|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.333,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:BF:B1:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"7777","fwId":"777","oemId":"FFF22CFF774A0B89F7624BFC6F50D5DE","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"777","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452287,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41,"longitude":-77,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.888908,"voltage":118.880856,"power":103.141828,"total":8.19,"time":"02/21/2018 16:22:00","ledon":true,"systemtime":"02/21/2018 16:22:00"}|smartPlug2|0        |14    |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+


Example JSON Data

|02/21/2018 16:23:58|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.337,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:88:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"8888","fwId":"6767","oemId":"6767","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"7676","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452404,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41.3241234,"longitude":-74.1234234,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.932932,"voltage":118.890282,"power":107.826982,"total":8.194,"time":"02/21/2018 16:23:58","ledon":true,"systemtime":"02/21/2018 16:23:58"}|smartPlug2|0        |24

Reference:

4,896 Views
Comments

@Timothy Spann What would this look like in HDP 2.6.5 (kerberized), with Kafka 1.0.0? I tried your above scala code (without Nifi, with an existing Kafka setup) with spark 2.3.0, scala 2.11.8, and the spark-sql-kafka-0-10_2.11:2.3.0 package, and when the stream starts, I get no data and four "<timestamp> WARN NetworkClient: Bootstrap broker <hostname>:6667 disconnected" messages every second.

I was able to answer my own question.

To make spark work in my kerberized HDP 2.6.5 cluster, I had to pass the jaas config file to the driver, and set the correct security protocol:

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 --driver-java-options "-Dhttp.proxyHost=<my proxy host> -Dhttp.proxyPort=80 -Djava.security.auth.login.config=<kafka client jaas config filename>"

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "<hostname>:6667").option("kafka.security.protocol", "SASL_PLAINTEXT").option("subscribe", "<mytopic>").load