1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1944 | 04-03-2024 06:39 AM | |
| 3054 | 01-12-2024 08:19 AM | |
| 1670 | 12-07-2023 01:49 PM | |
| 2447 | 08-02-2023 07:30 AM | |
| 3406 | 03-29-2023 01:22 PM |
12-19-2016
04:46 AM
How big of a yarn app? http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_hive-performance-tuning/bk_hive-performance-tuning-20160829.pdf https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_command-line-installation/content/determine-hdp-memory-config.html
... View more
12-18-2016
08:25 PM
Sweet. You should write an article, that's good to know.
... View more
12-18-2016
05:51 AM
5 Kudos
To send data to my firewall protected internal Hadoop cluster, I have my remote Raspberry Pi 3 with attached Sense-Hat use a cron job to send MQTT messages to a cloud hosted MQTT Broker. NiFi then subscribes to that queue and pulls down the messages asynchronously. I use JSON as my packaging format as that is very easy to work with and process in NiFi and elsewhere. The Sense-Hat connects to your non-Pi Zero and provides a number of easy to access sensors including ones for temperature, humidity, X,Y, Z, Roll and Pitch. It also has a nice light display that can be used to display graphical messages. For data capture needs it also has standard Linux reports like CPU temperature that we can also grab. You could also grab memory usage and disk space. Since the API to access Sense-Hat is Python, it makes sense to keep my access program in Python. So I have a small Python program that reads and formats the Sense Hat sensor values, puts them into a JSON document and sends them up to my MQTT cloud broker. It's a very simple NiFi flow to ingest these values as they arrive. I have previously read these values via a REST API I wrote on the Pi using Flask, but that interface requires direct synchronous access from the cluster to the Pi, which is not usually possible. I would love to push from the Pi to NiFi, but again that would require a direct network connection. Having the asynchronous break, is great for performance and also allows either party to be offline and have the broker queue hold the messages until we return. My cloud provider for MQTT has a free plan and I am using that for this demo. They have a web UI that you can see some statistics and information on the broker, messages, topics and queue. Once ingested, I pull out the fields I like from the JSON received from MQTT, format it into a SQL Insert and then call the PutSQL processor to upsert those values in HBase through the Phoenix layer. I have added UUID() from NiFi as a primary key to allow for uniqueness for every row ingested. Once we have ingested data into Apache Phoenix it is very easy to display the data for exploration and graphing in Apache Zeppelin notebooks via the jdbc(phoenix) interpreter. Phoenix DDL: CREATE TABLE sensor (sensorpk varchar not null primary key, cputemp varchar,
humidity decimal(10,1),pressure decimal(10,1),temp decimal(5,1),tempf decimal(5,1),temph decimal(5,1), tempp decimal(5,1), pitch decimal(10,1), roll decimal(10,1), yaw decimal(10,1), x decimal(10,1), y decimal(10,1), z decimal(10,1)); HDFS Raw Copy of JSON [root@tspanndev13 bin]# hdfs dfs -ls /sensors/raw
Found 44 items
-rw-r--r-- 3 root hdfs 232 2016-12-14 21:33 /sensors/raw/6837549415709401
-rw-r--r-- 3 root hdfs 231 2016-12-14 21:33 /sensors/raw/6837551583239075
hdfs dfs -cat /sensors/raw/6837549415709401
[{"tempf": 76.26, "temph": 35.7, "pressure": 1016.5, "tempp": 34.3, "pitch": 5.391706934578461, "temp": 35.7, "yaw": 23.348624309736945, "humidity": 20.3, "cputemp": "51.5", "y": 0.0, "x": 0.0, "z": 1.0, "roll": 2.1213289036024343}
Python Code from sense_hat import SenseHat
import json
import paho.mqtt.client as paho
sense = SenseHat()
sense.clear()
temp = sense.get_temperature()
temp = round(temp, 1)
humidity = sense.get_humidity()
humidity = round(humidity, 1)
pressure = sense.get_pressure()
pressure = round(pressure, 1)
orientation = sense.get_orientation()
pitch = orientation['pitch']
roll = orientation['roll']
yaw = orientation['yaw']
acceleration = sense.get_accelerometer_raw()
x = acceleration['x']
y = acceleration['y']
z = acceleration['z']
x=round(x, 0)
y=round(y, 0)
z=round(z, 0)
row = [ { 'temp': temp, 'tempf': ((temp * 1.8) + 12), 'humidity': humidity, 'pressure': pressure, 'pitch': pitch, 'roll': roll, 'yaw': yaw, 'x': x, 'y': y, 'z': z } ]
json_string = json.dumps(row)
client = paho.Client()
client.username_pw_set("myuser","mypassword")
client.connect("cloudserver", 134324, 60)
client.publish("sensor", payload=json_string, qos=0, retain=True)
Please note: that the sensors on the sense hat are not industrial grade or extremely accurate. For commercial purposes you will want more precise industrial sensors, battery backups, special casing and higher end devices. Reference: https://github.com/PixelNoob/sensehat https://www.raspberrypi.org/products/raspberry-pi-3-model-b/ https://eclipse.org/paho/clients/python/ https://www.cloudmqtt.com/ https://www.raspberrypi.org/products/sense-hat/ https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.mqtt.ConsumeMQTT/ https://www.infoq.com/articles/practical-mqtt-with-paho https://github.com/CloudMQTT/python-mqtt-example https://pythonhosted.org/sense-hat/api/ https://www.raspberrypi.org/learning/sense-hat-data-logger/worksheet/ https://www.raspberrypi.org/learning/astro-pi-flight-data-analysis/worksheet/ Install Paho MQTT Client pip install paho-mqtt https://www.raspberrypi.org/learning/astro-pi-guide/sensors/temperature.md https://pypi.python.org/pypi/paho-mqtt/1.1 Sense-Hat Features: Gyroscope Accelerometer Magnetometer Barometer Temperature sensor Relative Humidity sensor 8x8 LED matrix display 5 button joystick
... View more
Labels:
12-18-2016
05:17 AM
1 Kudo
Here is your answer:
public class VolatileBulletinRepository implements BulletinRepository { private static final int CONTROLLER_BUFFER_SIZE = 10; private static final int COMPONENT_BUFFER_SIZE = 5; The limit for bulletins returned is set by the component buffer, which is hard coded to 5. https://github.com/apache/nifi/blob/d838f61291d2582592754a37314911b701c6891b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java https://github.com/apache/nifi/blob/7f5eabd603bfc326dadc35590bbe69304e8c90fa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java I am not sure who is setting that limit, but that may be 5 somewhere. final BulletinQuery.Builder queryBuilder = new BulletinQuery.Builder() .groupIdMatches(query.getGroupId()) .sourceIdMatches(query.getSourceId()) .nameMatches(query.getName()) .messageMatches(query.getMessage()) .after(query.getAfter()) .limit(query.getLimit()); // perform the query final List<Bulletin> results = bulletinRepository.findBulletins(queryBuilder.build());
https://github.com/apache/nifi/blob/2d6bba080f90a45a9f4149f6844f452150ed6bc1/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @QueryParam("limit") IntegerParameterlimit) throwsInterruptedException { Have you traced it. Maybe 500 is too big.
... View more
12-17-2016
10:25 PM
1 Kudo
1. Check logs 2. Check Summary 3. Make sure processor is started 4. List the queue and see information. Post as many details, logs here as you can. What is the prior processor, what is the flowfile you are sending the the PutSQL. Do you have a valid connection waiting to run? https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.PutSQL/ Change settings on Fragmented Transactions. Support Fragmented Transactions from False to True
... View more
12-17-2016
08:36 PM
is that database available from that machine? firewall? ports? can you get to it from elsewhere? did you check the connection pool? any errors listed there? any other errors in there. Try another restart. Could be wrong version of driver. Wrong version of JDK. http://stackoverflow.com/questions/9257537/cannot-create-poolableconnectionfactory-io-exception-the-network-adapter-could Check the sudo service nifi status (or ./nifi.sh status) and see what JAVA_HOME
... View more
12-17-2016
04:55 PM
and normal ambari hive it working? weird error, like hadoop is not running. https://community.hortonworks.com/questions/9790/orgapachehadoopipcstandbyexception.html Is HDFS in read only mode? Drive issues? Everything else working? Any other logs.
... View more
12-16-2016
09:54 PM
SPARK_HOME must be in your path Check your classpath. Make sure Java is working.
... View more
12-16-2016
02:47 PM
http://stackoverflow.com/questions/38083252/apache-nifi-convert-a-soap-file-into-json I have done it with InvokeHTTP and it works fine. http://apache-nifi-developer-list.39713.n7.nabble.com/SOAP-Service-through-InvokeHTTP-td13129.html The NiFi-Soap processor is the way to go.
... View more
12-15-2016
11:20 PM
:51:02.642 WARN 11624 --- [ main] o.a.h.h.zookeeper.RecoverableZooKeeper : Possibly transient ZooKeeper, quorum=tspann:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/hbaseid
2016-12-15 16:51:03.644 INFO 11624 --- [42.140.21:2181)] org.apache.zookeeper.ClientCnxn : Opening socket connection to server 92.242.140.21/92.242.140.21:2181. Will not attempt to authenticate using SASL (unknown error)
2016-12-15 16:52:19.044 WARN 11624 --- [42.140.21:2181)] org.apache.zookeeper.ClientCnxn : Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Operation timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_91]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_91]
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) ~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) ~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
... View more