1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1914 | 04-03-2024 06:39 AM | |
| 3011 | 01-12-2024 08:19 AM | |
| 1642 | 12-07-2023 01:49 PM | |
| 2420 | 08-02-2023 07:30 AM | |
| 3361 | 03-29-2023 01:22 PM |
02-22-2018
03:05 PM
@balalaika ok, with no code changes here is the Structured version. Structured Streaming is basically just GA in Spark 2.2 which is HDP 2.6.4 and above. It works fine, a little different from the old style. They will probably keep the old one for until 2.5 or maybe 3.0. Both styles are nice. Another option is to use Apache Beam or Streaming Analytics Manager. https://community.hortonworks.com/content/kbentry/174105/hdp-264-hdf-31-apache-spark-structured-streaming-i.html
... View more
02-21-2018
09:34 PM
2 Kudos
Apache Spark 2.2.0 with Scala 2.11.8 with Java 1.8.0_112 on HDP 2.6.4 called from HDF 3.1 with Apache NiFi 1.5. This is a follow up to: https://community.hortonworks.com/articles/173818/hdp-264-hdf-31-apache-spark-streaming-integration.html and https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth.html We are using the same Apache NiFi flow to send messages to Apache Kafka. What is nice you could have the Structured Streaming version, non-structured version and others listening to the same Topic and same messages sent by Apache NiFi. When we start, no data yet. We quickly get a ton of data By Default A Kafka Cluster is 3 Nodes. Replication Factor of 3 is good then. I have one node. I had to change this. Tons of warnings in the /usr/hdf/current/kafka-broker/logs directory. The simplest Apache Spark client is one run in the shell: /usr/hdp/current/spark2-client/bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 The code is a simple fork of code from this excellent highly recommended tutorial: https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-KafkaSource.adoc If you are submitting this job and not running in a shell, add: // In the end, stop the streaming query
sq.awaitTermination val records = spark.
readStream.
format("kafka").
option("subscribe", "smartPlug2").
option("kafka.bootstrap.servers", "mykafkabroker:6667").load
records.printSchema
val result = records.
select(
$"key" cast "string",
$"value" cast "string",
$"topic",
$"partition",
$"offset")
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = result.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Append).
queryName("scalastrstrclient").
start
sq.status Example Run Spark context Web UI available at http://myipiscool:4045
Spark context available as 'sc' (master = local[*], app id = local-1519248053841).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.0.2.6.4.0-91
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val records = spark.
| readStream.
| format("kafka").
| option("subscribe", "smartPlug2").
| option("kafka.bootstrap.servers", "server:6667").load
records: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
scala> records.printSchema
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
scala> val result = records.
| select(
| $"key" cast "string",
| $"value" cast "string",
| $"topic",
| $"partition",
| $"offset")
result: org.apache.spark.sql.DataFrame = [key: string, value: string ... 3 more fields]
scala> import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
scala> import scala.concurrent.duration._
import scala.concurrent.duration._
scala> val sq = result.
| writeStream.
| format("console").
| option("truncate", false).
| trigger(Trigger.ProcessingTime(10.seconds)).
| outputMode(OutputMode.Append).
| queryName("scalastrstrclient").
| start
sq: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3638a852
scala> sq.status
res1: org.apache.spark.sql.streaming.StreamingQueryStatus =
{
"message" : "Getting offsets from KafkaSource[Subscribe[smartPlug2]]",
"isDataAvailable" : false,
"isTriggerActive" : true
}
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+-----+---------+------+
|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------+------+
|key |value |topic |partition|offset|

|02/21/2018 16:22:00|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.333,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:BF:B1:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"7777","fwId":"777","oemId":"FFF22CFF774A0B89F7624BFC6F50D5DE","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"777","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452287,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41,"longitude":-77,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.888908,"voltage":118.880856,"power":103.141828,"total":8.19,"time":"02/21/2018 16:22:00","ledon":true,"systemtime":"02/21/2018 16:22:00"}|smartPlug2|0 |14 |

Example JSON Data |02/21/2018 16:23:58|{"day1":1.204,"day2":1.006,"day3":1.257,"day4":1.053,"day5":1.597,"day6":1.642,"day7":1.439,"day8":1.178,"day9":1.259,"day10":0.995,"day11":0.569,"day12":1.287,"day13":1.371,"day14":1.404,"day15":1.588,"day16":1.426,"day17":1.707,"day18":1.153,"day19":1.155,"day20":1.732,"day21":1.337,"day22":1.497,"day23":1.151,"day24":1.227,"day25":1.387,"day26":1.138,"day27":1.204,"day28":1.401,"day29":1.288,"day30":1.439,"day31":0.126,"sw_ver":"1.1.1 Build 160725 Rel.164033","hw_ver":"1.0","mac":"50:C7:88:95:D5","type":"IOT.SMARTPLUGSWITCH","hwId":"8888","fwId":"6767","oemId":"6767","dev_name":"Wi-Fi Smart Plug With Energy Monitoring","model":"HS110(US)","deviceId":"7676","alias":"Tim Spann's MiniFi Controller SmartPlug - Desk1","icon_hash":"","relay_state":1,"on_time":452404,"active_mode":"schedule","feature":"TIM:ENE","updating":0,"rssi":-33,"led_off":0,"latitude":41.3241234,"longitude":-74.1234234,"index":18,"zone_str":"(UTC-05:00) Eastern Daylight Time (US & Canada)","tz_str":"EST5EDT,M3.2.0,M11.1.0","dst_offset":60,"month12":null,"current":0.932932,"voltage":118.890282,"power":107.826982,"total":8.194,"time":"02/21/2018 16:23:58","ledon":true,"systemtime":"02/21/2018 16:23:58"}|smartPlug2|0 |24 Reference: https://www.gitbook.com/book/jaceklaskowski/spark-structured-streaming/details https://github.com/jaceklaskowski/spark-structured-streaming-book/blob/master/spark-sql-streaming-KafkaSource.adoc https://community.hortonworks.com/articles/91379/spark-structured-streaming-with-nifi-and-kafka-usi.html https://community.hortonworks.com/articles/173818/hdp-264-hdf-31-apache-spark-streaming-integration.html https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html https://github.com/zaratsian/Spark/blob/master/pyspark_structured_stream_kafka.py https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html https://mtpatter.github.io/bilao/notebooks/html/01-spark-struct-stream-kafka.html
... View more
02-20-2018
04:20 PM
Structured Spark Streaming is uses Spark SQL https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-structured-streaming.html
... View more
02-17-2018
04:21 PM
2 Kudos
Version: HDF 3.1, HDP 2.6.4, PySpark 2.2.0, Python 2.7, Apache NiFi 1.5. I push my power data from a local Apache NiFi 1.5 server over Site-to-Site HTTP to a cloud hosted HDF 3.1 cluster. This cluster has a Remote Input that passes the information on to a version controlled Process Group called "Spark-Kafka-Streaming". Once inside, I set a schema name and data type then push the data to Kafka 1.0 hosted in HDF 3.1. The PublishKafkaRecord_1.0 settings are super easy. We use the JsonTreeReader and the supplied schema to read the JSON file into records. I chose to use the JsonRecordSetWriter to push JSON out. I could have easily done Apache Avro or CSV or another format. I chose JSON as it is easy to work with in Apache Spark and good for debug display. This method and code will work for several versions forward, but I cannot confirm for previous versions. This article is how to connect Apache NiFi with Apache Spark via Kafka using Spark Streaming. The example code is in PySpark. I run the streaming Spark code two different ways for testing: First way is via Apache Zeppelin, you will need to load the Apache Spark Kafka Streaming package to Apache Zeppelin To add Kafka Streaming Support we just add a dependency to the spark2 interpreter and restart the interpreter with the restart button. No need to restart Apache Zeppelin or a server. The other way I run this is as a Spark Submit with YARN Master in Cluster mode. As you see here I also include the Spark Streaming Kafka Package. /usr/hdp/current/spark2-client/bin/spark-submit --master yarn --deploy-mode cluster --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 kafka_test.py
My example PySpark program is really basic but shows you the integration. This is forked from the standard Spark example. import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="kafkaTest")
ssc = StreamingContext(sc,5)
print "Connected to spark streaming"
def process(time, rdd):
print("========= %s =========" % str(time))
if not rdd.isEmpty():
rdd.count()
rdd.first()
ssc = StreamingContext(sc, 5)
kafkaStream = KafkaUtils.createStream(ssc, "server:2181", "pysparkclient1", {"smartPlug": 1})
kafkaStream.pprint()
kafkaStream.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
This program runs every 5 seconds and grabs the Kafka JSON message as an RDD, if it's not empty, I run a count and get the first row. You can see the application running in Apache YARN UI. From Apache Ambari we can monitor the data moving through the Kafka Broker topics. We can also monitor the Spark job via the URL supplied in the output of the submit. We can see the STDOUT of the submitted Spark job here in the YARN logs. Example PySpark Run root@princeton0 demo]# ./submit.sh
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.4.0-91/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 in central
found org.apache.kafka#kafka_2.11;0.8.2.1 in central
found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
found com.yammer.metrics#metrics-core;2.2.0 in central
found org.slf4j#slf4j-api;1.7.16 in central
found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 in central
found com.101tec#zkclient;0.3 in central
found log4j#log4j;1.2.17 in central
found org.apache.kafka#kafka-clients;0.8.2.1 in central
found net.jpountz.lz4#lz4;1.3.0 in central
found org.xerial.snappy#snappy-java;1.1.2.6 in central
found org.apache.spark#spark-tags_2.11;2.2.0 in central
found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 3452ms :: artifacts dl 21ms
:: modules in use:
com.101tec#zkclient;0.3 from central in [default]
com.yammer.metrics#metrics-core;2.2.0 from central in [default]
log4j#log4j;1.2.17 from central in [default]
net.jpountz.lz4#lz4;1.3.0 from central in [default]
org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
org.apache.spark#spark-streaming-kafka-0-8_2.11;2.2.0 from central in [default]
org.apache.spark#spark-tags_2.11;2.2.0 from central in [default]
org.scala-lang.modules#scala-parser-combinators_2.11;1.0.2 from central in [default]
org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
org.slf4j#slf4j-api;1.7.16 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 13 | 2 | 2 | 0 || 13 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 13 already retrieved (0kB/23ms)
18/02/17 01:03:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/02/17 01:03:01 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
18/02/17 01:03:01 INFO RMProxy: Connecting to ResourceManager at princeton0.field.hortonworks.com/172.26.200.216:8050
18/02/17 01:03:01 INFO Client: Requesting a new application from cluster with 1 NodeManagers
18/02/17 01:03:02 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (43008 MB per container)
18/02/17 01:03:02 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
18/02/17 01:03:02 INFO Client: Setting up container launch context for our AM
18/02/17 01:03:02 INFO Client: Setting up the launch environment for our AM container
18/02/17 01:03:02 INFO Client: Preparing resources for our AM container
18/02/17 01:03:04 INFO Client: Use hdfs cache file as spark.yarn.archive for HDP, hdfsCacheFile:hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz
18/02/17 01:03:04 INFO Client: Source and destination file systems are the same. Not copying hdfs://princeton0.field.hortonworks.com:8020/hdp/apps/2.6.4.0-91/spark2/spark2-hdp-yarn-archive.tar.gz
18/02/17 01:03:04 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka_2.11-0.8.2.1.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.spark_spark-tags_2.11-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.spark-project.spark_unused-1.0.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.yammer.metrics_metrics-core-2.2.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/com.101tec_zkclient-0.3.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.apache.kafka_kafka-clients-0.8.2.1.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.slf4j_slf4j-api-1.7.16.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/log4j_log4j-1.2.17.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/net.jpountz.lz4_lz4-1.3.0.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/org.xerial.snappy_snappy-java-1.1.2.6.jar
18/02/17 01:03:05 INFO Client: Uploading resource file:/opt/demo/kafka_test.py -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/kafka_test.py
18/02/17 01:03:05 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/pyspark.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/pyspark.zip
18/02/17 01:03:06 INFO Client: Uploading resource file:/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/py4j-0.10.4-src.zip
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka-0-8_2.11-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.scala-lang.modules_scala-parser-combinators_2.11-1.0.2.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/com.101tec_zkclient-0.3.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.apache.kafka_kafka-clients-0.8.2.1.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/log4j_log4j-1.2.17.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar added multiple times to distributed cache.
18/02/17 01:03:06 WARN Client: Same path resource file:/root/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar added multiple times to distributed cache.
18/02/17 01:03:06 INFO Client: Uploading resource file:/tmp/spark-bc1bedca-6201-4715-812e-cd06f8e6efac/__spark_conf__9099337700911844616.zip -> hdfs://princeton0.field.hortonworks.com:8020/user/root/.sparkStaging/application_1517883514475_0424/__spark_conf__.zip
18/02/17 01:03:06 INFO SecurityManager: Changing view acls to: root
18/02/17 01:03:06 INFO SecurityManager: Changing modify acls to: root
18/02/17 01:03:06 INFO SecurityManager: Changing view acls groups to:
18/02/17 01:03:06 INFO SecurityManager: Changing modify acls groups to:
18/02/17 01:03:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
18/02/17 01:03:06 INFO Client: Submitting application application_1517883514475_0424 to ResourceManager
18/02/17 01:03:06 INFO YarnClientImpl: Submitted application application_1517883514475_0424
18/02/17 01:03:07 INFO Client: Application report for application_1517883514475_0424 (state: ACCEPTED)
18/02/17 01:03:07 INFO Client:
client token: N/A
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1518829386408
final status: UNDEFINED
tracking URL: http://princeton0.field.hortonworks.com:8088/proxy/application_1517883514475_0424/
user: root Source https://github.com/tspannhw/nifi-sparkstreaming-kafka spark-kafka-streaming.xml Reference Using the data from here: https://community.hortonworks.com/articles/155326/monitoring-energy-usage-utilizing-apache-nifi-pyth.html Example Data {"day19": 2.035, "day20": 1.191, "day21": 0.637, "day22": 1.497, "day23": 1.151, "day24": 1.227, "day25": 1.387, "day26": 1.138, "day27": 1.204, "day28": 1.401, "day29": 1.288, "day30": 1.439, "day31": 0.126, "day1": 1.204, "day2": 1.006, "day3": 1.257, "day4": 1.053, "day5": 1.597, "day6": 1.642, "day7": 1.439, "day8": 1.178, "day9": 1.259, "day10": 0.995, "day11": 0.569, "day12": 1.287, "day13": 1.371, "day14": 1.404, "day15": 1.588, "day16": 0.474, "day17": 1.438, "day18": 1.056, "sw_ver": "1.1.1 Build 160725 Rel.164033", "hw_ver": "1.0", "mac": "50:C7:BF:B1:95:D5", "type": "IOT.SMARTPLUGSWITCH", "hwId": "60FF6B258734EA6880E186F8C96DDC61", "fwId": "060BFEA28A8CD1E67146EB5B2B599CC8", "oemId": "FFF22CFF774A0B89F7624BFC6F50D5DE", "dev_name": "Wi-Fi Smart Plug With Energy Monitoring", "model": "HS110(US)", "deviceId": "8006ECB1D454C4428953CB2B34D9292D18A6DB0E", "alias": "Tim Spann's MiniFi Controller SmartPlug - Desk1", "icon_hash": "", "relay_state": 1, "on_time": 5778, "active_mode": "schedule", "feature": "TIM:ENE", "updating": 0, "rssi": -35, "led_off": 0, "latitude": 40.268216, "longitude": -74.529088, "index": 18, "zone_str": "(UTC-05:00) Eastern Daylight Time (US & Canada)", "tz_str": "EST5EDT,M3.2.0,M11.1.0", "dst_offset": 60, "month1": 32.674, "month2": 19.323, "current": 0.664822, "voltage": 121.700245, "power": 77.280039, "total": 0.158, "time": "02/16/2018 12:20:08", "ledon": true, "systemtime": "02/16/2018 12:20:08"}
... View more
02-13-2018
06:53 PM
I have posted an ExecuteSparkInteractive article
... View more
02-10-2018
07:51 PM
2 Kudos
Apache Deep Learning 101 Series This is for people preparing to attend my talk on Deep Learning at DataWorks Summit Berling 2018 (https://dataworkssummit.com/berlin-2018/#agenda) on Thursday April 19, 2018 at 11:50AM Berlin time. You can easily run Apache MXNet on an OSX machine or a Linux workstation utilizing a Python script. I have forked the standard Apache MXNet Wine Detector Tutorial (http://mxnet.incubator.apache.org/tutorials/embedded/wine_detector.html) to read our local OSX webcam (you may need to change your OpenCV WebCam port from 0 to 1 or to 2, depending on your number of webcams and which one you want to use. I am running this on an OSX laptop connected to a monitor that has a built in webcam, so I use that one which is 1. The webcam numbering starts at 0. If you only have one, then use 0. Let's get this installed! git clone https://github.com/apache/incubator-mxnet.git The installation instructions at Apache MXNet's website (http://mxnet.incubator.apache.org/install/index.html) are amazing. Pick your platform and your style. I am doing this the simplest way on a Mac, but you can use Virtual Python Environment which may be best for you. git clone https://github.com/tspannhw/ApacheBigData101.git You will want to copy my shell script osxlocalrun.sh, inception copy and analyze.py script to your machine. If you don't have a webcam you will want to use the Centos version of the shell and Python. That one works with a static image that you supply. I am assuming you are running a recently updated Mac with 16GB of RAM or more, PIP, Brew and Python 3 installed already. If not, do that. If you have a pre-1.0 Apache MXNet, please upgrade. You will need curl and tar installed which they should be. cd incubator-mxnet
mkdir images
curl --header 'Host: data.mxnet.io' --header 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:45.0) Gecko/20100101 Firefox/45.0' --header 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' --header 'Accept-Language: en-US,en;q=0.5' --header 'Referer: http://data.mxnet.io/models/imagenet/' --header 'Connection: keep-alive' 'http://data.mxnet.io/models/imagenet/inception-bn.tar.gz' -o 'inception-bn.tar.gz' -L
tar -xvzf inception-bn.tar.gz
cp Inception-BN-0126.params Inception-BN-0000.params Then brew update
pip install --upgrade pip
pip install --upgrade setuptools
pip install mxnet==1.0.0
brew install graphviz
pip install graphviz For your machine if you have two versions of Python, you may need to do pip3 and you may need to run via sudo. It depends on how your machine is setup and how locked down it is. We are creating a directory called images that will fill with OpenCV capture images. You probably want to delete them or ingest them. It's very easy to ingest with Apache NiFi or MiniFi both of which run on OSX with ease. See: https://community.hortonworks.com/articles/107379/minifi-for-image-capture-and-ingestion-from-raspbe.html So we call a simple shell script (osxlocalrun.sh), which calls our custom Python 3 script (you can easily convert this to Python 2 if you need to, in a future article I have this running on Python 2.7 on a Centos 7 HDP 2.6.4 cluster node). I send warnings to /dev/null to get rid of them since they are related to OSX configuration that you may or may not have and cannot easily change. Nothing to see here. You will probably need to chmod 755 your osxlocalrun.sh. If you are running on a Linux variant, follow this directions on the Apache MXNet site or wait for my next article on installing and using Apache MXNet in Centos-based HDP 2.6.4 and HDF 3.1 clusters. python3 -W ignore analyze.py 2>/dev/null For Apache NiFi Flow Templates You can download my Apache NiFi flows from github or this article. Architecture Local Apache NiFi 1.5 with NiFi Registry running with JDK 8 on OSX Local Apache MXNet installation with Python 3 Remote HDF 3.1 Cluster Running on Centos 7 on OpenStack with Apache Ambari, Apache NiFi, NiFi Registry, Hortonworks Schema Registry. Remote HDP 2.6.4 Cluster Runniong on Centos 7 on OpenStack with Apache Hive, Apache Ambari The flow is easy: ExecuteProcess: Execute that shell script UpdateAttribute: Add the schema name InferAvroSchema: Really need this one only once if you don't want to hand create your schema, push the results to an attribute Remote Process Group: Send via HTTP Site-to-Site to an HDF 3.1 cluster. Local OSX Processing Cluster based Record Processing On the cloud we use ConvertRecord to convert the Apache MXNet Python script generated JSON into AVRO. We merge a bunch of those together then convert that larger AVRO record to ORC. This ORC file is stored in HDFS. Apache NiFi will automatically generate Hive DDL that we can instantly execute via Apache NiFi or do manually. I do this manually in Apache Zeppelin. I could easily augment this data with weather, twitter and other REST feeds. Those have been covered in other articles I have written. I could also push the results to Kafka 1.0 for additional processing in Hortonworks Streaming Analytics Manager. I will do that a future time. Apache Hive SQL DDL CREATE EXTERNAL TABLE IF NOT EXISTS inception3 (uuid STRING, top1pct STRING, top1 STRING, top2pct STRING, top2 STRING, top3pct STRING, top3 STRING, top4pct STRING, top4 STRING, top5pct STRING, top5 STRING, imagefilename STRING, runtime STRING) STORED AS ORC
LOCATION '/mxnet/local' Example Output {"uuid":
"mxnet_uuid_img_20180208204131", "top1pct":
"30.0999999046", "top1": "n02871525 bookshop,
bookstore, bookstall", "top2pct": "23.7000003457",
"top2": "n04200800 shoe shop, shoe-shop, shoe store",
"top3pct": "4.80000004172", "top3":
"n03141823 crutch", "top4pct": "2.89999991655",
"top4": "n04370456 sweatshirt", "top5pct":
"2.80000008643", "top5": "n02834397 bib", "imagefilename":
"images/tx1_image_img_20180208204131.jpg", "runtime":
"2"} Query Results Example OpenCV Captured Image {"top1pct": "67.6", "top5": "n03485794 handkerchief, hankie, hanky, hankey", "top4": "n04590129 window shade", "top3": "n03938244 pillow", "top2": "n04589890 window screen", "top1": "n02883205 bow tie, bow-tie, bowtie", "top2pct": "11.5", "imagefilename": "nanotie7.png", "top3pct": "4.5", "uuid": "mxnet_uuid_img_20180211161220", "top4pct": "2.8", "top5pct": "2.8", "runtime": "3.0"} My cat assists me in some Deep Learning work, so I use Apache NiFi to track him to make sure he's working and hasn't taken his tie off during office hours. I run a strict office here in the Princeton lab. Source Code https://github.com/tspannhw/ApacheBigData101/tree/master apache-mxnet-local.xml apachemxnet-local-processing.xml References: https://community.hortonworks.com/articles/155435/using-the-new-mxnet-model-server.html https://community.hortonworks.com/articles/83100/deep-learning-iot-workflows-with-raspberry-pi-mqtt.html https://community.hortonworks.com/articles/146704/edge-analytics-with-nvidia-jetson-tx1-running-apac.html http://mxnet.incubator.apache.org/ In the Series: Interfacing with MXNet Model Server Using Apache MXNet with HDF 3.1 Clusters Using Apache MXNet with HDP 2.6.4 Clusters Using Apache MXNet with Hadoop 3.0 YARN 3.0 HDP 3.0 Dockerized GPU Aware Clusters
... View more
Labels:
02-09-2018
01:16 PM
1. Check logs after restart. 2. Make sure the nar is in the lib (and it's the correct nar) 3. Check file ownership and permission 4. Perhaps the name has a collision. 5. 1.6 Snapshot? Can you recompile in 1.5? What's in the pom.xml can you share that. 6. Can you send some screenshots? 7. Are you running same JDK? JDK 8? JDK 9? 8. Does it require any files, permissions, kerberos, etc...?
... View more
02-09-2018
12:05 AM
5 Kudos
I want to easily integrate Apache Spark jobs with my Apache NiFi flows. Fortunately with the release of HDF 3.1, I can do that via Apache NiFi's ExecuteSparkInteractive processor. First step, let me set up a Centos 7 cluster with HDF 3.1, follow the well-written guide here. With the magic of time lapse photography, instantly we have a new cluster of goodness: It is important to note the new NiFi Registry for doing version control and more. We also get the new Kafka 1.0, updated SAM and the ever important updated Schema Registry. The star of the show today tis Apache NiFi 1.5 here. My first step is to Add a Controller Service (LivySessionController). Then we add the Apache Livy Server, you can find this in your Ambari UI. It is by default port 8999. For my session, I am doing Python, so I picked pyspark. You can also pick pyspark3 for Python 3 code, spark for Scala, and sparkr for R. To execute my Python job, you can pass the code in from a previous processor to the ExecuteSparkInteractive processor or put the code inline. I put the code inline. Two new features of Schema Registry I have to mention are the version comparison: You click the COMPARE VERSIONS link and now you have a nice comparison UI. And the amazing new Swagger documentation for interactive documentation and testing of the schema registry APIs. Not only do you get all the parameters for input and output, the full URL and a Curl example, you get to run the code live against your server. I will be adding an article on how to use Apache NiFi to grab schemas from data using InferAvroSchema and publish these new schemas to the Schema Registry vai REST API automagically. Part two of this article will focus on the details of using Apache Livy + Apache NiFi + Apache Spark with the new processor to call jobs. Part 2 -> https://community.hortonworks.com/articles/171787/hdf-31-executing-apache-spark-via-executesparkinte.html References https://community.hortonworks.com/articles/148730/integrating-apache-spark-2x-jobs-with-apache-nifi.html https://community.hortonworks.com/articles/73828/submitting-spark-jobs-from-apache-nifi-using-livy.html
... View more
02-08-2018
02:25 PM
Nifi registry does not work on Windows as read me says
... View more
02-07-2018
05:58 PM
https://issues.apache.org/jira/browse/NIFIREG-140 - Windows is not supported as per requirements here: https://nifi.apache.org/docs/nifi-registry-docs/html/administration-guide.html#system-requirements Supported Operating Systems:
Linux Unix Mac OS X
... View more