Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Master Guru

Use Case: Ingesting energy data and running an Apache Spark job as part of the flow.

We will be using the new (in Apache NiFi 1.5 / HDF 3.1) ExecuteSparkInteractive processor with the LivyController to accomplish that integration. As we mentioned in the first part of the article, it's pretty easy to set this up.

Since this is a modern Apache NiFi project, we use version control on our code:

56679-versioning.png

On a local machine, I am talking to an electricity sensor over WiFi in a Python script. This code is processed, cleaned and sent to a cloud hosted Apache NiFi instance via S2S over HTTP.

56675-nifilocalingest.png

In the cloud we receive the pushed messages.

56674-nifiingest.png

Once we open the Spark It Up processor group, we have a flow to process the data.

56673-nififlowsparkcall.png

Flow Overview

  • QueryRecord: Determine how to route based on query on streaming data. Converts JSON to Apache AVRO.

Path for All Files

  • UpdateAttribute: Set a schema
  • MergeContent: Do an Apache AVRO merge on our data to make bigger files.
  • ConvertAvroToORC: Build an Apache ORC file from merged Apache AVRO file.
  • PutHDFS: Store our Apache ORC file in an HDFS directory on our HDP 2.6.4 cluster.

Path For Large Voltage

  • ExecuteSparkInteractive: Call our PySpark job
  • PutHDFS: Store the results to HDFS.

We could take all the metadata attributes and send them somewhere or store them as a JSON file.

We tested our PySpark program in Apache Zeppelin and then copy it to our processor.

56678-testingpyspark.png

Our ExecuteSparkInteractive Processor:

56681-executesparkinteractive.png

In our QueryProcessor we send messages with large voltages to the Apache Spark executor to run a PySpark job to do some more processing.

56677-queryrecord.png

Once we have submitted a job via Apache Livy, we are now able to see the job during and after execution with detailed Apache Livy UI screens and Spark screens. In the Apache Livy UI screen below we can see the PySpark code executed and it's output.

Apache Livy UI

56617-livysessionsui.png

56672-livyuidisplay.png

Apache Spark Jobs UI - Jobs

56616-sparkjobsthrulivy.png


Apache Spark Jobs UI - SQL

56618-sparksqlquery.png

Apache Spark Jobs UI - Executors

56619-sparkexecutors.png


Apache Zeppelin SQL Search of the Data

56680-zeppsmartplugquery.png


Hive / Spark SQL Table DDL Generated Automagically by Apache NiFi

56676-sparkdatazepp.png

Below are the source code related to this article:


Source Code: https://github.com/tspannhw/nifi-spark-livy


PySpark Code

shdf = spark.read.json("hdfs://yourhdp264server:8020/spark2-history")

shdf.printSchema()

shdf.createOrReplaceTempView("sparklogs")

stuffdf = spark.sql("SELECT * FROM sparklogs")

stuffdf.count()

This is a pretty simple PySpark application to read the JSON results of Spark2 History, print a schema inferred from it and then do a simple SELECT and count. We could do Spark machine learning or other processing in there very easily. You can run Python 2.x or 3.x for this with PySpark. I am running this in Apache Spark 2.2.0 hosted on a HDP 2.6.4 cluster running Centos 7. The fun part is that everytime I run this Spark job it produces more results for it to read. I should probably just read that log in Apache NiFi, but it was a fun little example. Clearly you can run any kind of job in here, my next article will be around running Apache MXNet and Spark MLib jobs through Apache Livy and Apache NiFi.

For a quick side note, you have a lot of options for working with schemas now:

56670-jsontreereaderschemaoptions.png

Schema For Energy Data

inferred.avro.schema
{ "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }

Python Source (Updated to include 31 days)

from pyHS100 import SmartPlug, SmartBulb
#from pprint import pformat as pf
import json
import datetime


plug = SmartPlug("192.168.1.203")

row = { }

emeterdaily = plug.get_emeter_daily(year=2017, month=12)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=1)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


emeterdaily = plug.get_emeter_daily(year=2018, month=2)
for k, v in emeterdaily.items():
     row["day%s" % k] = v


hwinfo = plug.hw_info
for k, v in hwinfo.items():
     row["%s" % k] = v


sysinfo = plug.get_sysinfo()
for k, v in sysinfo.items():
     row["%s" % k] = v


timezone = plug.timezone
for k, v in timezone.items():
     row["%s" % k] = v


emetermonthly =  plug.get_emeter_monthly(year=2018)
for k, v in emetermonthly.items():
     row["month%s" % k] = v


realtime = plug.get_emeter_realtime()
for k, v in realtime.items():
     row["%s" % k] = v


row['alias'] = plug.alias
row['time'] =  plug.time.strftime('%m/%d/%Y %H:%M:%S')
row['ledon'] =  plug.led
row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S')
json_string = json.dumps(row)
print(json_string)


Example Output

{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n |    |-- Executor ID: string (nullable = true)\n |    |-- Host: string (nullable = true)\n |    |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n |    |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n |    |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n |    |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/


Shell Tip:

Apache MXnet may have some warnings sent to STDERR. I don't want these, so send them to /dev/null:

python3 -W ignore analyze.py 2>/dev/null

Software:

  • PySpark
  • Python
  • Apache NiFi
  • Apache Spark
  • HDF 3.1
  • HDP 2.6.4
  • Apache Hive
  • Apache Avro
  • Apache ORC
  • Apache Ambari
  • Apache Zeppelin

Reference:

9,913 Views
Comments

hello @TimothySpann  I am currently working on a flow of nifi version 1.11 in which I intend to apply quality rules on data that I am extracting from different sources of relational databases such as Oracle, once I ingest the information to the cloudera cluster I make a puthdfs and I save the extracted information in csv format, then I intend to take said file and apply quality rules to it, said logic is developed in python and spark code, so I want to use the ExecuteSparkInteractive processor to execute said logic, I already have the processor configured without errors both the livycontroller controller service and the drivers for kerberos and ssl. but I can't find a way to execute the python script through the executesparkinteractive processor, I tried to try putting python code in the processor, but it doesn't generate any flowfile, could you tell me what I'm doing wrong?

Livy and the sparkinteractive connector aren't stable at this point.

 

it only works with Scala code and a jar.   it's hacky.   i recommend you call cloudera's CDE envirnment

@TimothySpann thank you for your prompt reply