Created on 02-07-2018 04:26 PM - edited 08-17-2019 09:03 AM
Use Case: Ingesting energy data and running an Apache Spark job as part of the flow.
We will be using the new (in Apache NiFi 1.5 / HDF 3.1) ExecuteSparkInteractive processor with the LivyController to accomplish that integration. As we mentioned in the first part of the article, it's pretty easy to set this up.
Since this is a modern Apache NiFi project, we use version control on our code:
On a local machine, I am talking to an electricity sensor over WiFi in a Python script. This code is processed, cleaned and sent to a cloud hosted Apache NiFi instance via S2S over HTTP.
In the cloud we receive the pushed messages.
Once we open the Spark It Up processor group, we have a flow to process the data.
Flow Overview
Path for All Files
Path For Large Voltage
We could take all the metadata attributes and send them somewhere or store them as a JSON file.
We tested our PySpark program in Apache Zeppelin and then copy it to our processor.
Our ExecuteSparkInteractive Processor:
In our QueryProcessor we send messages with large voltages to the Apache Spark executor to run a PySpark job to do some more processing.
Once we have submitted a job via Apache Livy, we are now able to see the job during and after execution with detailed Apache Livy UI screens and Spark screens. In the Apache Livy UI screen below we can see the PySpark code executed and it's output.
Apache Livy UI
Apache Spark Jobs UI - Jobs
Apache Spark Jobs UI - SQL
Apache Spark Jobs UI - Executors
Apache Zeppelin SQL Search of the Data
Hive / Spark SQL Table DDL Generated Automagically by Apache NiFi
Below are the source code related to this article:
Source Code: https://github.com/tspannhw/nifi-spark-livy
PySpark Code
shdf = spark.read.json("hdfs://yourhdp264server:8020/spark2-history") shdf.printSchema() shdf.createOrReplaceTempView("sparklogs") stuffdf = spark.sql("SELECT * FROM sparklogs") stuffdf.count()
This is a pretty simple PySpark application to read the JSON results of Spark2 History, print a schema inferred from it and then do a simple SELECT and count. We could do Spark machine learning or other processing in there very easily. You can run Python 2.x or 3.x for this with PySpark. I am running this in Apache Spark 2.2.0 hosted on a HDP 2.6.4 cluster running Centos 7. The fun part is that everytime I run this Spark job it produces more results for it to read. I should probably just read that log in Apache NiFi, but it was a fun little example. Clearly you can run any kind of job in here, my next article will be around running Apache MXNet and Spark MLib jobs through Apache Livy and Apache NiFi.
For a quick side note, you have a lot of options for working with schemas now:
Schema For Energy Data
inferred.avro.schema { "type" : "record", "name" : "smartPlug", "fields" : [ { "name" : "day19", "type" : "double", "doc" : "Type inferred from '2.035'" }, { "name" : "day20", "type" : "double", "doc" : "Type inferred from '1.191'" }, { "name" : "day21", "type" : "double", "doc" : "Type inferred from '0.637'" }, { "name" : "day22", "type" : "double", "doc" : "Type inferred from '1.497'" }, { "name" : "day23", "type" : "double", "doc" : "Type inferred from '1.151'" }, { "name" : "day24", "type" : "double", "doc" : "Type inferred from '1.227'" }, { "name" : "day25", "type" : "double", "doc" : "Type inferred from '1.387'" }, { "name" : "day26", "type" : "double", "doc" : "Type inferred from '1.138'" }, { "name" : "day27", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day28", "type" : "double", "doc" : "Type inferred from '1.401'" }, { "name" : "day29", "type" : "double", "doc" : "Type inferred from '1.288'" }, { "name" : "day30", "type" : "double", "doc" : "Type inferred from '1.439'" }, { "name" : "day31", "type" : "double", "doc" : "Type inferred from '0.126'" }, { "name" : "day1", "type" : "double", "doc" : "Type inferred from '1.204'" }, { "name" : "day2", "type" : "double", "doc" : "Type inferred from '1.006'" }, { "name" : "day3", "type" : "double", "doc" : "Type inferred from '1.257'" }, { "name" : "day4", "type" : "double", "doc" : "Type inferred from '1.053'" }, { "name" : "day5", "type" : "double", "doc" : "Type inferred from '1.597'" }, { "name" : "day6", "type" : "double", "doc" : "Type inferred from '1.642'" }, { "name" : "day7", "type" : "double", "doc" : "Type inferred from '0.443'" }, { "name" : "day8", "type" : "double", "doc" : "Type inferred from '0.01'" }, { "name" : "day9", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day10", "type" : "double", "doc" : "Type inferred from '0.009'" }, { "name" : "day11", "type" : "double", "doc" : "Type inferred from '0.075'" }, { "name" : "day12", "type" : "double", "doc" : "Type inferred from '1.149'" }, { "name" : "day13", "type" : "double", "doc" : "Type inferred from '1.014'" }, { "name" : "day14", "type" : "double", "doc" : "Type inferred from '0.851'" }, { "name" : "day15", "type" : "double", "doc" : "Type inferred from '1.134'" }, { "name" : "day16", "type" : "double", "doc" : "Type inferred from '1.54'" }, { "name" : "day17", "type" : "double", "doc" : "Type inferred from '1.438'" }, { "name" : "day18", "type" : "double", "doc" : "Type inferred from '1.056'" }, { "name" : "sw_ver", "type" : "string", "doc" : "Type inferred from '\"1.1.1 Build 160725 Rel.164033\"'" }, { "name" : "hw_ver", "type" : "string", "doc" : "Type inferred from '\"1.0\"'" }, { "name" : "mac", "type" : "string", "doc" : "Type inferred from '\"50:C7:BF:B1:95:D5\"'" }, { "name" : "type", "type" : "string", "doc" : "Type inferred from '\"IOT.SMARTPLUGSWITCH\"'" }, { "name" : "hwId", "type" : "string", "doc" : "Type inferred from '\"60FF6B258734EA6880E186F8C96DDC61\"'" }, { "name" : "fwId", "type" : "string", "doc" : "Type inferred from '\"060BFEA28A8CD1E67146EB5B2B599CC8\"'" }, { "name" : "oemId", "type" : "string", "doc" : "Type inferred from '\"FFF22CFF774A0B89F7624BFC6F50D5DE\"'" }, { "name" : "dev_name", "type" : "string", "doc" : "Type inferred from '\"Wi-Fi Smart Plug With Energy Monitoring\"'" }, { "name" : "model", "type" : "string", "doc" : "Type inferred from '\"HS110(US)\"'" }, { "name" : "deviceId", "type" : "string", "doc" : "Type inferred from '\"8006ECB1D454C4428953CB2B34D9292D18A6DB0E\"'" }, { "name" : "alias", "type" : "string", "doc" : "Type inferred from '\"Tim Spann's MiniFi Controller SmartPlug - Desk1\"'" }, { "name" : "icon_hash", "type" : "string", "doc" : "Type inferred from '\"\"'" }, { "name" : "relay_state", "type" : "int", "doc" : "Type inferred from '1'" }, { "name" : "on_time", "type" : "int", "doc" : "Type inferred from '1995745'" }, { "name" : "active_mode", "type" : "string", "doc" : "Type inferred from '\"schedule\"'" }, { "name" : "feature", "type" : "string", "doc" : "Type inferred from '\"TIM:ENE\"'" }, { "name" : "updating", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-34'" }, { "name" : "led_off", "type" : "int", "doc" : "Type inferred from '0'" }, { "name" : "latitude", "type" : "double", "doc" : "Type inferred from '40.268216'" }, { "name" : "longitude", "type" : "double", "doc" : "Type inferred from '-74.529088'" }, { "name" : "index", "type" : "int", "doc" : "Type inferred from '18'" }, { "name" : "zone_str", "type" : "string", "doc" : "Type inferred from '\"(UTC-05:00) Eastern Daylight Time (US & Canada)\"'" }, { "name" : "tz_str", "type" : "string", "doc" : "Type inferred from '\"EST5EDT,M3.2.0,M11.1.0\"'" }, { "name" : "dst_offset", "type" : "int", "doc" : "Type inferred from '60'" }, { "name" : "month1", "type" : "double", "doc" : "Type inferred from '32.674'" }, { "name" : "month2", "type" : "double", "doc" : "Type inferred from '8.202'" }, { "name" : "current", "type" : "double", "doc" : "Type inferred from '0.772548'" }, { "name" : "voltage", "type" : "double", "doc" : "Type inferred from '121.740428'" }, { "name" : "power", "type" : "double", "doc" : "Type inferred from '91.380606'" }, { "name" : "total", "type" : "double", "doc" : "Type inferred from '48.264'" }, { "name" : "time", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" }, { "name" : "ledon", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"02/07/2018 11:17:30\"'" } ] }
Python Source (Updated to include 31 days)
from pyHS100 import SmartPlug, SmartBulb #from pprint import pformat as pf import json import datetime plug = SmartPlug("192.168.1.203") row = { } emeterdaily = plug.get_emeter_daily(year=2017, month=12) for k, v in emeterdaily.items(): row["day%s" % k] = v emeterdaily = plug.get_emeter_daily(year=2018, month=1) for k, v in emeterdaily.items(): row["day%s" % k] = v emeterdaily = plug.get_emeter_daily(year=2018, month=2) for k, v in emeterdaily.items(): row["day%s" % k] = v hwinfo = plug.hw_info for k, v in hwinfo.items(): row["%s" % k] = v sysinfo = plug.get_sysinfo() for k, v in sysinfo.items(): row["%s" % k] = v timezone = plug.timezone for k, v in timezone.items(): row["%s" % k] = v emetermonthly = plug.get_emeter_monthly(year=2018) for k, v in emetermonthly.items(): row["month%s" % k] = v realtime = plug.get_emeter_realtime() for k, v in realtime.items(): row["%s" % k] = v row['alias'] = plug.alias row['time'] = plug.time.strftime('%m/%d/%Y %H:%M:%S') row['ledon'] = plug.led row['systemtime'] = datetime.datetime.now().strftime('%m/%d/%Y %H:%M:%S') json_string = json.dumps(row) print(json_string)
Example Output
{"text\/plain":"root\n |-- App Attempt ID: string (nullable = true)\n |-- App ID: string (nullable = true)\n |-- App Name: string (nullable = true)\n |-- Block Manager ID: struct (nullable = true)\n | |-- Executor ID: string (nullable = true)\n | |-- Host: string (nullable = true)\n | |-- Port: long (nullable = true)\n |-- Classpath Entries: struct (nullable = true)\n | |-- \/etc\/hadoop\/conf\/: string (nullable = true)\n | |-- \/etc\/hadoop\/conf\/secure: string (nullable = true)\n | |-- \/etc\/zeppelin\/conf\/external-dependency-conf\/: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_conf__: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/JavaEWAH-0.3.2.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/RoaringBitmap-0.5.11.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/ST4-4.0.4.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/activation-1.1.1.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/aircompressor-0.8.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-2.7.7.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr-runtime-3.4.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/application_1517883514475_0002\/container_e01_1517883514475_0002_01_000001\/__spark_libs__\/antlr4-runtime-4.5.3.jar: string (nullable = true)\n | |-- \/hadoop\/yarn\/local\/usercache\/livy\/appcache\/
Shell Tip:
Apache MXnet may have some warnings sent to STDERR. I don't want these, so send them to /dev/null:
python3 -W ignore analyze.py 2>/dev/null
Software:
Reference:
Created on 07-21-2021 01:56 PM
hello @TimothySpann I am currently working on a flow of nifi version 1.11 in which I intend to apply quality rules on data that I am extracting from different sources of relational databases such as Oracle, once I ingest the information to the cloudera cluster I make a puthdfs and I save the extracted information in csv format, then I intend to take said file and apply quality rules to it, said logic is developed in python and spark code, so I want to use the ExecuteSparkInteractive processor to execute said logic, I already have the processor configured without errors both the livycontroller controller service and the drivers for kerberos and ssl. but I can't find a way to execute the python script through the executesparkinteractive processor, I tried to try putting python code in the processor, but it doesn't generate any flowfile, could you tell me what I'm doing wrong?
Created on 07-21-2021 01:59 PM
Livy and the sparkinteractive connector aren't stable at this point.
it only works with Scala code and a jar. it's hacky. i recommend you call cloudera's CDE envirnment
Created on 07-21-2021 02:24 PM
@TimothySpann thank you for your prompt reply