1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1850 | 04-03-2024 06:39 AM | |
| 2886 | 01-12-2024 08:19 AM | |
| 1594 | 12-07-2023 01:49 PM | |
| 2353 | 08-02-2023 07:30 AM | |
| 3246 | 03-29-2023 01:22 PM |
05-08-2017
05:40 PM
The complete code is in the Zeppelin Spark Python notebook referenced here: https://github.com/zaratsian/PySpark/blob/master/text_analytics_datadriven_topics.json
... View more
02-23-2017
07:54 AM
Thanks @Bryan Bende @Timothy Spann, @ozhurakousky foryour reply. It was some configuration issue. While trying to put file into Splunk, I was using web-port (8081 in my case) of splunk in configuration of PutSplunk. When I pointed my PutSplunk configuration to TCP port of Splunk(In Splunk setting go to Data Inputs -> Click on TCP and enter details as instructed to create a new TCP input port of Splunk) it started working properly.
... View more
05-05-2017
10:00 PM
@Pradhuman Gupta You cannot setup logging for a specific processor. But you can setup a new logger for a specific processor class. First you would create a new appender in the nifi logback.xml file: <appender name="PROCESSOR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${org.apache.nifi.bootstrap.config.log.dir}/nifi-processsor.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!--
For daily rollover, use 'user_%d.log'.
For hourly rollover, use 'user_%d{yyyy-MM-dd_HH}.log'.
To GZIP rolled files, replace '.log' with '.log.gz'.
To ZIP rolled files, replace '.log' with '.log.zip'.
-->
<fileNamePattern>${org.apache.nifi.bootstrap.config.log.dir}/nifi-processor_%d.log</fileNamePattern>
<!-- keep 5 log files worth of history -->
<maxHistory>5</maxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%date %level [%thread] %logger{120} %msg%n</pattern>
<immediateFlush>true</immediateFlush>
</encoder>
</appender>
Then you create a new logger that will write to that appender log file: <logger name="org.apache.nifi.processors.attributes.UpdateAttribute" level="WARN" additivity="false">
<appender-ref ref="PROCESSOR_FILE"/>
</logger> In the above example i am creating a logger for the UpdateAttribute processor. Now any WARN or ERROR log messages produced by this specific processor will be written to this new log. You can expand upon this flow by configuring loggers for each Processor class you want to monitor and send them to the same appender. Then use a SplitText processor to split the content of the FlowFile produced by the TailFile. then use Route On Content processor to route specific log lines produced by each processor class to a different put email or simply create a different message body attribute for each. Thanks, Matt
... View more
02-17-2017
04:13 PM
Installed the Hadoop client on Windows! see here: https://github.com/steveloughran/winutils
https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-tips-and-tricks-running-spark-windows.adoc
... View more
02-16-2017
02:07 PM
Thanks for the config tip, really helpful. It eventually came up by the morning. I think the issue is that my Hadoop cluster is down and I have at least 50 connections to it. I imagine a few minute time out per connection, 3-6 connections each, that's a big chunk of time.
... View more
02-13-2017
03:33 AM
3 Kudos
Overview I have been running a similar program on Raspberry Pi devices with TensorFlow. Now that MXNet has entered Apache incubation, it has become incredibly interesting to me. With the backing of Apache and Amazon, this library cannot be ignored. So I tried in on the same Raspberry Pi 3B that I was using for TensorFlow. For this example, we are grabbing images from the standard Raspberry Pi Camera and running live image analysis on it with MXNet using the Inception pre-built model from the MXNet Model Zoo. This is the nearly the same as the TensorFlow example. What I noticed is a bit faster execution and smoother process. For accuracy, I have not run enough tests for weighing the two libraries out, but that is something I will look at doing for large number of images. Training both with my camera and images I am interested in would be very helpful. Some use cases I am thinking of are: Security Camera, Water Leak Detection, Evil Cat Sensing, Engine Vibration and self-driving model car. Raspberry Pi v3 B with PI Camera Setup Your Device For Running MXNet sudo apt-get -y install git cmake build-essential g++-4.8 c++-4.8 liblapack* libblas* libopencv*
git clone https://github.com/dmlc/mxnet.git --recursive
cd mxnet
make
cd python
sudo python setup.py install
curl --header 'Host: data.mxnet.io' --header 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:45.0) Gecko/20100101 Firefox/45.0' --header 'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' --header 'Accept-Language: en-US,en;q=0.5' --header 'Referer: http://data.mxnet.io/models/imagenet/' --header 'Connection: keep-alive' 'http://data.mxnet.io/models/imagenet/inception-bn.tar.gz' -o 'inception-bn.tar.gz' -L
tar -xvzf inception-bn.tar.gz
mv Inception_BN-0126.params Inception_BN-0000.params
The primary code is Python taken from some examples from MXNet, OpenCV and PICamera. topn = inception_predict.predict_from_local_file(filename, N=5) This calls the inception_predict from MXNet example. The inception_predict code is referenced in the reference links below. Main Python Code #!/usr/bin/python
# 2017 load pictures and analyze
import time
import sys
import datetime
import subprocess
import sys
import urllib2
import os
import datetime
import ftplib
import traceback
import math
import random, string
import base64
import json
import paho.mqtt.client as mqtt
import picamera
from time import sleep
from time import gmtime, strftime
import inception_predict
packet_size=3000
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
# Create camera interface
camera = picamera.PiCamera()
while True:
# Create unique image name
uniqueid = 'mxnet_uuid_{0}_{1}'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))
# Take the jpg image from camera
filename = '/home/pi/cap.jpg'
# Capture image from RPI
camera.capture(filename)
# Run inception prediction on image
topn = inception_predict.predict_from_local_file(filename, N=5)
# CPU Temp
p = subprocess.Popen(['/opt/vc/bin/vcgencmd','measure_temp'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
# MQTT
client = mqtt.Client()
client.username_pw_set("username","password")
client.connect("mqttcloudprovider", 14162, 60)
# CPU Temp
out = out.replace('\n','')
out = out.replace('temp=','')
# 5 MXNET Analysis
top1 = str(topn[0][1])
top1pct = str(round(topn[0][0],3) * 100)
top2 = str(topn[1][1])
top2pct = str(round(topn[1][0],3) * 100)
top3 = str(topn[2][1])
top3pct = str(round(topn[2][0],3) * 100)
top4 = str(topn[3][1])
top4pct = str(round(topn[3][0],3) * 100)
top5 = str(topn[4][1])
top5pct = str(round(topn[4][0],3) * 100)
row = [ { 'uuid': uniqueid, 'top1pct': top1pct, 'top1': top1, 'top2pct': top2pct, 'top2': top2,'top3pct': top3pct, 'top3': top3,'top4pct': top4pct,'top4': top4, 'top5pct': top5pct,'top5': top5, 'cputemp': out} ]
json_string = json.dumps(row)
client.publish("mxnet",payload=json_string,qos=1,retain=False)
client.disconnect()
We grab an image from a camera, run it through MXNet, convert the results to JSON and then send the message to a cloud hosted MQTT broker. I also grab the CPU temperature to show we can add more sensors.
Example JSON Sent via MQTT
[{"top1pct": "54.5", "top5": "n04590129 window shade", "top4": "n03452741 grand piano, grand", "top3": "n03018349 china cabinet, china closet", "top2": "n03201208 dining table, board", "top1": "n04099969 rocking chair, rocker", "top2pct": "9.1", "top3pct": "8.0", "uuid": "mxnet_uuid_oqy_20170211203727", "top4pct": "2.8", "top5pct": "2.2", "cputemp": "75.2'C"}] Our schema is pretty consistent as above, so we can create a Hive or Phoenix table and insert into that.
HDF / NiFi Flow Consume MQTT This processor will receive messages from a cloud based MQTT broker sent by a few Raspberry PIs I have setup. Extract Fields from MXNET (EvaluateJSONPath) Build a Message (UpdateAttribute) Category 1 ${top1} at ${top1pct}%
Category 2 ${top2} at ${top2pct}%
Category 3 ${top3} at ${top3pct}%
Category 4 ${top4} at ${top4pct}%
Category 5 ${top5} at ${top5pct}%
UUID ${uuid}
CPU Temp ${cputemp}
Send Msg to Slack Channel (PutSlack)
Channel is mxnet Stores Files (PutFile)
We take the JSON convert it to a text message to a Slack channel. That's all it takes to ingest data from an edge device running a camera and running Deep Learning on a tiny device and then send the data asynchronously to a cloud hosted broker that can distribute to cloud and on-premise hosted Apache NiFi servers. We could also use Site-to-Site, HTTP or TCP/IP. MQTT is very lightweight, works over the Internet, has an easy Python library and works well with Apache NiFi.
Reference: This sample program is critical and gave me most of the code needed to run: http://mxnet.io/tutorials/embedded/wine_detector.html http://data.mxnet.io/models/imagenet/ https://community.hortonworks.com/content/repo/77987/rpi-picamera-mqtt-nifi.html https://github.com/tspannhw/mxnet_rpi/blob/master/analyze.py https://community.hortonworks.com/content/kbentry/80339/iot-capturing-photos-and-analyzing-the-image-with.html CloudMQTT has proven to be awesome. Instant setup and a free instance for testing. This is great for getting data from my remote raspberry pis to the cloud and back into HDF 2.1 servers behind firewalls.
http://cloudmqtt.com http://www.jsonpath.com/ Github Repo https://github.com/tspannhw/mxnet_rpi https://community.hortonworks.com/repos/83001/python-mxnet-raspberry-pi-example.html?shortDescriptionMaxLength=140 Pushing to Slack Channel https://nifi-se.slack.com/messages/mxnet/details/ Apache MXNet Incubation https://wiki.apache.org/incubator/MXNetProposal Awesome MXNet https://github.com/dmlc/mxnet/tree/master/example Install MXNet on Raspian http://mxnet.io/get_started/raspbian_setup.html Example Program for MXNet on Raspberry PI 3 http://mxnet.io/tutorials/embedded/wine_detector.html Raspberry Pi with MXNET http://mxnet.io/tutorials/embedded/wine_detector.html MQTT https://github.com/tspannhw/rpi-picamera-mqtt-nifi/blob/master/upload.py Real-Image with Pretrained Model http://mxnet.io/tutorials/r/classifyRealImageWithPretrainedModel.html MXNet GTC Tutorial
https://github.com/dmlc/mxnet-gtc-tutorial MXNet for Facial Identification
https://github.com/tornadomeet/mxnet-face http://vis-www.cs.umass.edu/fddb/results.html http://www.cbsr.ia.ac.cn/english/CASIA-WebFace-Database.html
MXNet Models for ImageNet 1K Inception BN
https://github.com/dmlc/mxnet-model-gallery/blob/master/imagenet-1k-inception-bn.md MXNet Example Image Classification
https://github.com/dmlc/mxnet/tree/master/example/image-classification sudo apt-get install imagemagick identify -verbose /home/pi/cap.jpg
... View more
Labels:
02-03-2017
04:48 AM
5 Kudos
Sentiment CoreNLP Processor [pool-1-thread-1] INFO edu.stanford.nlp.pipeline.StanfordCoreNLP -
Adding annotator tokenize[pool-1-thread-1] INFO edu.stanford.nlp.pipeline.TokenizerAnnotator
- No tokenizer type provided. Defaulting to PTBTokenizer.[pool-1-thread-1] INFO edu.stanford.nlp.pipeline.StanfordCoreNLP -
Adding annotator ssplit[pool-1-thread-1] INFO edu.stanford.nlp.pipeline.StanfordCoreNLP -
Adding annotator parse[pool-1-thread-1] INFO edu.stanford.nlp.parser.common.ParserGrammar
- Loading parser from serialized file edu/stanford/nlp/models/lexparser/englishPCFG.ser.gz
... done [0.4 sec].[pool-1-thread-1] INFO edu.stanford.nlp.pipeline.StanfordCoreNLP -
Adding annotator sentimentFILE:Header,Header2,Header3Value,Value2,Value3Value4,Value5,Value6Attribute: {"names":"NEGATIVE"} Service Source Code JUnit Test for Processor To Add Sentiment Analysis to Your NiFi Data Flow, just add the custom processor, CoreNLPProcessor. You can downloada pre-built NAR from the github listed below. Add to your NiFi/lib directory and restart each node. The results of the run will be an attribute named sentiment: You can see how easy it is to add to your dataflows. If you would like to add more features to this processor, please fork the github below. This is not an official NiFi processor, just one I wrote in a couple of hours for my own use and for testing. There are four easy ways to add Sentiment Analysis to your Big Data pipelines: executescript of Python NLP scripts, call my custom processor, make a REST call to a Stanford CoreNLP sentiment server, make a REST call to a public sentiment as a service and send a message via Kafka (or JMS) to Spark or Storm to run other JVM sentiment analysis tools. Download a release https://github.com/tspannhw/nifi-corenlp-processor/releases/tag/v1.0 sentimentanalysiscustomprocessor.xml http://stanfordnlp.github.io/CoreNLP https://github.com/tspannhw/neural-sentiment https://github.com/tspannhw/nlp-utilities https://community.hortonworks.com/content/kbentry/81222/adding-stanford-corenlp-to-big-data-pipelines-apac.html https://community.hortonworks.com/content/repo/81187/nifi-corenlp-processor-example-processor-for-doing.html https://community.hortonworks.com/repos/79537/various-utilities-and-examples-for-working-with-va.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html https://community.hortonworks.com/questions/20791/sentiment-analysis-with-hdp.html https://community.hortonworks.com/articles/30213/us-presidential-election-tweet-analysis-using-hdfn.html https://community.hortonworks.com/articles/52415/processing-social-media-feeds-in-stream-with-apach.html https://community.hortonworks.com/articles/81222/adding-stanford-corenlp-to-big-data-pipelines-apac.html https://community.hortonworks.com/content/kbentry/67983/apache-hive-with-apache-hivemall.html
... View more
Labels:
02-10-2017
05:21 AM
This was awesome Tim
... View more
02-01-2017
10:46 PM
1 Kudo
@Timothy Spann, I don't have a solution, yet, but I think that ReplaceText is doing recursive replacement of some sort. It is replacing the value with the requested attribute, but it is then evaluating that replacement again, including a new $2, but there is no group 2 captured in the search. I find that this happens if the attribute I'm replacing has a dollar sign followed by a number in it. For instance, the following replace text is going to put the user.name attribute into the flowfile: However, when the attribute has a $2 in it, I get an IndexOutOfBounds error and the offending flowfile stays in the upstream queue. Here are the flowfile attributes that show a $2 in the middle of the user.name attribute. I get the same issue with "no group 6" if the name has $6 in it. I'm going to try some of the quoting functions to see if that changes the behavior. John
... View more
02-14-2017
02:59 PM
the processor takes a property to run against. You just need to pass something in the sentence parameter. You can concatenate a few fields there. The source is open, it would be easy to ingest a flowfile and process that instead of doing an input attribute. It's changing 2-3 lines and rebuilding.
... View more