1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1842 | 04-03-2024 06:39 AM | |
| 2863 | 01-12-2024 08:19 AM | |
| 1582 | 12-07-2023 01:49 PM | |
| 2345 | 08-02-2023 07:30 AM | |
| 3233 | 03-29-2023 01:22 PM |
02-10-2019
10:08 PM
Tracking Air Quality with HDP and HDF: Part 2 - Indoor Air Quality Using a few sensors on a MiniFi node we are able to generate some air quality sensor readings. Source: https://github.com/tspannhw/minifi-breakoutgarden/blob/master/aqminifi.py Data: row['bme680_tempc'] = '{0:.2f}'.format(sensor.data.temperature) row['bme680_tempf'] = '{0:.2f}'.format((sensor.data.temperature * 1.8) + 32) row['bme680_pressure'] = '{0:.2f}'.format(sensor.data.pressure) row['bme680_gas'] = '{0:.2f}'.format(gas) row['bme680_humidity'] = '{0:.2f}'.format(hum) row['bme680_air_quality_score'] = '{0:.2f}'.format(air_quality_score) row['bme680_gas_baseline'] = '{0:.2f}'.format(gas_baseline) row['bme680_hum_baseline'] = '{0:.2f}'.format(hum_baseline) See Part 1: https://community.hortonworks.com/articles/189630/tracking-air-quality-with-hdp-and-hdfi-part-1-apac.html Newark / NYC Hazecam https://hazecam.net/images/main/newark.jpg Example {"bme680_air_quality_score": "82.45", "uuid": "20190131191921_59c5441c-47b4-4f6f-a6d6-b3943bc9cf2b", "ipaddress": "192.168.1.166", "bme680_gas_baseline": 367283.28, "bme680_pressure": "1024.51", "bme680_hum_baseline": 40.0, "memory": 11.7, "end": "1548962361.4146328", "cputemp": 47, "host": "piups", "diskusage": "9992.7", "bme680_tempf": "87.53", "te": "761.2184100151062", "starttime": "01/31/2019 14:06:40", "systemtime": "01/31/2019 14:19:21", "bme680_humidity": "13.22", "bme680_tempc": "30.85", "bme680_gas": "363274.92"} { "end" : "1548967753.7064438", "host" : "piups", "diskusage" : "9990.4", "cputemp" : 47, "starttime" : "01/31/2019 15:44:11", "bme680_hum_baseline" : "40.00", "bme680_humidity" : "13.23", "ipaddress" : "192.168.1.166", "bme680_tempc" : "30.93", "te" : "301.96490716934204", "bme680_air_quality_score" : "83.27", "systemtime" : "01/31/2019 15:49:13", "bme680_tempf" : "87.67", "bme680_gas_baseline" : "334942.60", "uuid" : "20190131204913_4984a635-8dcd-408a-ba23-c0d225ba2d86", "bme680_pressure" : "1024.69", "memory" : 12.6, "bme680_gas" : "336547.19" }
... View more
Labels:
02-09-2019
06:30 PM
1 Kudo
Use Case:
We have data stored in a MongoDB from a third party application in Amazon.
Export from MongoDB to Parquet.
Moving data from a single purpose data silo to your Enterprise Data Lake is a common use case. Using Apache NiFi we can easily save your data from this remote silo and bring it streaming into your analytics store for machine learning and deep analytics with Impala, Hive and Spark. It doesn't matter which cloud which are coming from or going to or from cloud to on-premise or various Hybrid situations. Apache NiFi will work in all of these situations which full data lineage and provenance on what it did when.
I have created a mock dataset with Mockaroo. It's all about yummy South Jersey sandwiches.
Our Easy MongoDB Flows to Ingest Mongo data to our Date Lake and another flow to load MongoDB.
In our test, we loaded all the data from our Mock REST API into a MongoDB in the cloud. In the real world an application populated that dataset and now we need to bring it into our central data lake for analytics.
We use Jolt to replace the non-Hadoop friendly built-in MongoDB _id with a friendly name mongo_id.
Storing to Parquet on HDFS is Easy (Let's compress with Snappy)
Connecting to MongoDB is easy, setup a controller and specify the database and collection.
Our MongoDB Connection Service, just enter your URI with username/password@server.
GetHTTP URL https://my.api.mockaroo.com/hoagie.json GetHTTP Filename ${filename:append('hoagie.'):append(${now():format('yyyyMMddHHmmSS'):append(${md5}):append('.json')})} JSON Path Expression $.* JOLT Chain [{ "operation": "shift", "spec": { "_id": "mongo_id", "*": "&" } }] Mongo URI mongodb://user:userpassword@server.cloud.com:13916/nifi
Many files stored in HDFS as Parquet
... View more
Labels:
02-09-2019
05:59 PM
3 Kudos
Series: Integration of Apache NiFi and Cloudera Data Science Workbench Part 2: Using Cloudera Data Science Workbench with Apache NiFi and Apache MXNet for GluonCV YOLO Workloads Summary Now that we have shown it's easy to do standard NLP, next up is Deep Learning. As you can see NLP, Machine Learning, Deep Learning and more are all in our reach for building your own AI as a Service using tools from Cloudera. These can run in public or private clouds as scale. Now you can run and integrate machine learning services, computer vision APIs and anything you have created in house with your own Data Scientists or with the help of Cloudera Fast Forward Labs (https://www.cloudera.com/products/fast-forward-labs-research.html). The YOLO pretrained model will download the image to /tmp from the URL to process it. The Python 3 script will also download the GLUONCV model for YOLO3 as well. Using Pre-trained Model: yolo3_darknet53_voc Image Sources https://github.com/tspannhw/images and/or https://picsum.photos/400/600 Example Input { "url": "https://raw.githubusercontent.com/tspannhw/images/master/89389-nifimountains.jpg" } Sample Call to Our REST Service curl -H "Content-Type: application/json" -X POST http://myurliscoolerthanyours.com/api/altus-ds-1/models/call-model -d '{"accessKey":"longkeyandstuff","request":{"url":"https://raw.githubusercontent.com/tspannhw/images/master/89389-nifimountains.jpg"}}' Sample JSON Result Set {"class1": "cat", "pct1": "98.15670800000001", "host": "gluoncv-apache-mxnet-29-49-67dfdf4c86-vcpvr", "shape": "(1, 3, 566, 512)", "end": "1549671127.877511", "te": "10.178656578063965", "systemtime": "02/09/2019 00:12:07", "cpu": 17.0, "memory": 12.8} Example Deployment Model Resources Replicas 1 Total CPU 1 vCPUs <-- An extra vCPU wouldn't hurt. Total Memory 8.00 GiB <-- Make sure you have enough RAM. I recommend for Deep Learning models to use more vCPUs and more memory as you will be manipulating images and large tensors. I also recommend more replicas for production use cases. You can have up to 9. https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_models.html. I like the idea of 3, 5 or 7 replicas. How-To Step 1: Let's Clean Up and Test Some Python 3 Code So first I take an existing example Python3 GluonCV Apache MXNet YOLO code that I already have. As you can see it uses a pretrained model from Apache MXNet's rich model zoo. This started here: https://github.com/tspannhw/nifi-gluoncv-yolo3 I paired down the libraries as I used an interactive Python 3 session to test and refine my code. As before, I set a variable to pass in my data, this time a URL pointing to an image. As you can see in my interactive session I can run my yolo function and get results. I had a library in there to display the annotated image while I was testing. I took this code off to save time, memory and reduce libraries. This was needed while testing though. The model seems to be working it identified me as a person and my cat as a cat. Step 2: Create, Build and Deploy a Model I got to models, point to my new file and the function I used (yolo) and put in a sample Input and response. I deploy it, then it is in the list of available models. You goes through a few steps as the docker container is deployed to K8 and all the required pips are installed during a build process. Once it is built, you can see the build(s) in the Build screen. Step 3: Test the Model Once it is done building and marked deployed we can use the built in tester from Overview. We can see the result in JSON ready to travel over an HTTP REST API. Step 4: Monitor the Deployed Model We can see the standard output and error and see how many times we are called and success. You can see it downloaded the model from the Apache MXNet zoo. If you need to stop, rebuild or replace a model, it's easy. Step 5: Apache NiFi Flow As you can see it's a few steps to run the flow. I am using GenerateFlowFile to get us started, but I could have a cron scheduler starting us or react to a Kafka/MQTT/JMS message or another trigger. I then build the JSON needed to call the REST API. Example: {"accessKey":"accesskey","request":{"url":"${url}"}} Then we call the REST API via an HTTP Post (http://myurliscoolerthanyours.com/api/altus-ds-1/models/call-model). We then parse the JSON it returns to just give us the fields we want, we don't really need status. We name our schema so we can run Apache Calcite SQL queries against it. Let's only save Cats and People to our Amazon S3 bucket. At this point I can add more queries and destinations. I can store it everywhere or anywhere. Example Output {
"success": true,
"response":
{ "class1": "cat", "cpu": 38.3, "end": "1549672761.1262221", "host": "gluoncv-apache-mxnet-29-50-7fb5cfc5b9-sx6dg", "memory": 14.9, "pct1": "98.15670800000001", "shape": "(1, 3, 566, 512)", "systemtime": "02/09/2019 00:39:21", "te": "3.380652666091919" }}
Build a Schema for the Data and store it in Apache NiFi AVRO Schema Registry or Cloudera Schema Registry { "type" : "record", "name" : "gluon", "fields" : [ { "name" : "class1", "type" : ["string","null"] }, { "name" : "cpu", "type" : ["double","null"] }, { "name" : "end", "type" : ["string","null"]}, { "name" : "host", "type" : ["string","null"]}, { "name" : "memory", "type" : ["double","null"]}, { "name" : "pct1", "type" : ["string","null"] }, { "name" : "shape", "type" : ["string","null"] }, { "name" : "systemtime", "type" : ["string","null"] }, { "name" : "te", "type" : ["string","null"] } ] } I like to allow for nulls in case we have missing data, but that is up to your Data Steward and team. If you need to add a version of the schema with a new field, you must add "null" as an option since old data won't have that if you wish to share a schema. Source: https://github.com/tspannhw/nifi-cdsw-gluoncv cdswmxnet.xml
... View more
02-06-2019
05:49 PM
4 Kudos
Using Deployed Models as a Function as a Service
Using Cloudera Data Science Workbench with Apache NiFi we can easily call functions within our deployed models from Apache NiFi as part of flows. I am working against CDSW on HDP (https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_hdp.html), but it will work for all CDSW regardless of install type. In my simple example, I built a Python model that uses TextBlob to run sentiment against a passed in sentence. It returns Sentiment Polarity and Subjectivity which we can immediately act upon in our flow. CDSW is extremely easy to work with and I was up and running in a few minutes. For my model, I created a python 3 script and a shell script for install details. Both of these artifacts are available here: https://github.com/tspannhw/nifi-cdsw My Apache NiFi 1.8 flow is here (I use no custom processors): cdsw-twitter-sentiment.xml Deploying a Machine Learning Model as a REST Service Once you login to CDSW and create a project or choose an existing one (https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_projects.html). From your project, open workbench and you can install some libraries and test some Python. I am using a Python 3 session to download the TextBlob/NLTK Corpora for NLP. Let's Pip Install some libraries for testing Let's Create a new Model You choose your file (mine is sentiment.py see github). The function name is actually sentiment. Notice a typo I had to rebuild this and deploy. You setup an example input (sentence is the input parameter name) and an example output. Input and output will be JSON since this is a REST API. Let's Deploy It (Python 3) The deploy will build it for deployment. We can see standard output, standard error, status, # of REST calls received and success. Once a Model is Deployed We Can Control It We can stop it, rebuild it or replace the files if need be. I had to update things a few times. The amount of resources used for the model rest hosting if your choice from a drop down. Since I am doing something small I picked the smallest model with only 1 virtual CPU and 2 GB of RAM. All of this is running in Docker on Kubernetes! Once Deployed, It's Ready To Test and Use From Apache NiFi Just click test. See the JSON results and we can now call it from an Apache NiFi flow. Once Deployed We Can Monitor The Model Let's Run the Test See the status and response! Apache NiFi Example Flow Step 1: Call Twitter Step 2: Extract Social Attributes of Interest Step 3: Build our web call with our access key and function parameter Step 4: Extract our string as a flow file to send to the HTTP Post Step 5: Call Our Cloudera Data Science Workbench REST API (see tester). Step 6: Extract the two result values. Step 7: Let's route on the sentiment We can have negative (<0), neutral (0), positive (>0) and very positive (1) polarity of the sentiment. See TextBlob for more information on how this works. Step 8: Send bad sentiment to a slack channel for human analysis. We send all the related information to a slack channel including the message. Example Message Sent to Slack Step 9: Store all the results (or some) in either Phoenix/HBase, Hive LLAP, Impala, Kudu or HDFS. Results as Attributes Slack Message Call
${msg:append(" User:"):append(${user_name}):append(${handle}):append(" Geo:"):append(${coordinates}):append(${geo}):append(${location}):append(${place}):append(" Hashtags:"):append(${hashtags}):append(" Polarity:"):append(${polarity}):append(" Subjectivity:"):append(${subjectivity}):append(" Friends Count:"):append(${friends_count}):append(" Followers Count:"):append(${followers_count}):append(" Retweet Count:"):append(${retweet_count}):append(" Source:"):append(${source}):append(" Time:"):append(${time}):append(" Tweet ID:"):append(${tweet_id})}
REST CALL to Model
{"accessKey":"from your workbench","request":{"sentence":"${msg:replaceAll('\"', ''):replaceAll('\n','')}"}}
Resources
https://textblob.readthedocs.io/en/dev/api_reference.html#textblob.blob.TextBlob.sentiment https://community.hortonworks.com/articles/222605/converting-powerpoint-presentations-into-french-fr.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html
... View more
02-04-2019
06:56 PM
4 Kudos
Log Log Log Sudo logs have a lot of useful information on hosts, users and auditable actions that may be useful for cybersecurity, capacity planning, user tracking, data lake population, user management and general security. Symbol Model 1 Step 1 - Get a File Step 2 - Split Into Lines Step 3 - Set the Mime Type to Plain Text Step 4 - Extract Grok Step 5 - Action! Checking for More Options (All Named elements in the GrokPatterns) Example Sudo Log (could be /auth.log, /var/log/sudo.log, secure, ...) Jan 31 19:17:20 princeton0 su: pam_unix(su-l:session): session opened for user ambari-qa by (uid=0)
Jan 31 19:17:20 princeton0 su: pam_unix(su-l:session): session closed for user ambari-qa
Jan 31 19:18:19 princeton0 su: pam_unix(su-l:session): session opened for user zeppelin by (uid=0)
Jan 31 19:18:19 princeton0 su: pam_unix(su-l:session): session closed for user zeppelin
Jan 31 19:18:20 princeton0 su: pam_unix(su-l:session): session opened for user ambari-qa by (uid=0)
Jan 31 19:18:20 princeton0 su: pam_unix(su-l:session): session closed for user ambari-qa Grok Patterns SUDO_TTY TTY=%{NOTSPACE:sudo_tty}
SUDO_PWD PWD=%{DATA:sudo_pwd}
SUDO_COMMAND COMMAND=%{DATA:sudo_command}
SUDO_USER %{NOTSPACE:sudo_user}
SUDO_RUNAS USER=%{SUDO_USER:sudo_runas}
SUDO_REMOVE_SESSION %{SYSLOGTIMESTAMP:timestamp8} %{NOTSPACE:hostname8} %{NOTSPACE:appcaller} \[%{NOTSPACE:pid7}\]: %{GREEDYDATA:sessionremoval}
SUDO_INFO_COMMAND_SUCCESSFUL %{SUDO_USER:sudo_user2} : %{SUDO_TTY:sudo_tty2} ; %{SUDO_PWD:sudo_pwd2} ; %{SUDO_RUNAS:sudo_runas2} ; %{SUDO_COMMAND:sudo_command2}
SUDO_INFO_PAM_UNIX_SESSION_OPENED pam_unix\(%{NOTSPACE:user1}:session\): session opened for user %{NOTSPACE:sudo_runas3} by %{SUDO_USER:sudo_user3}\(uid=%{NUMBER:uid3}\)
SUDO_INFO_PAM_UNIX_SESSION_CLOSED pam_unix\(%{NOTSPACE:user4}:session\): session closed for user %{NOTSPACE:sudo_runas4}
SUDO_PAM_OPEN2 %{SYSLOGTIMESTAMP:timestamp8} %{NOTSPACE:hostname8} %{NOTSPACE:appcaller}: pam_unix\(%{NOTSPACE:user1}:session\): session opened for user %{NOTSPACE:sudo_runas81} by \(uid=%{NUMBER:uid81}\)
SUDO_SEAT %{SYSLOGTIMESTAMP:timestamp77} %{NOTSPACE:hostname77} %{NOTSPACE:appcaller77}\[%{NOTSPACE:pid77}\]: %{GREEDYDATA:message77}
SUDO_INFO %{SUDO_INFO_COMMAND_SUCCESSFUL:cmdsuccess}|%{SUDO_INFO_PAM_UNIX_SESSION_OPENED:pam_opened}|%{SUDO_INFO_PAM_UNIX_SESSION_CLOSED:pam_closed}
SUDO_ERROR_INCORRECT_PASSWORD_ATTEMPTS %{SUDO_USER} : %{NUMBER} incorrect password attempts ; %{SUDO_TTY:sudo_tty5} ; %{SUDO_PWD:sudo_pwd5} ; %{SUDO_RUNAS:sudo_runas5} ; %{SUDO_COMMAND:sudo_cmd5}
SUDO_ERROR_FAILED_TO_GET_PASSWORD %{NOTSPACE:person6} failed to get password: %{NOTSPACE:autherror6} authentication error
SUDO_PUBLICKEY %{SYSLOGTIMESTAMP:timestamp7} %{NOTSPACE:hostname7} sshd\[%{NOTSPACE:pid7}\]: Accepted publickey for %{NOTSPACE:username} from %{NOTSPACE:sourceip} port %{NOTSPACE:port} ssh2: RSA %{NOTSPACE:rsakey}
SUDO_OPEN_PAM %{SYSLOGTIMESTAMP:timestamp8} %{NOTSPACE:hostname8} %{NOTSPACE:appcaller}\[%{NOTSPACE:pid8}\]: pam_unix\(%{NOTSPACE:user1}:session\): session opened for user %{NOTSPACE:sudo_runas} by \(uid=%{NUMBER:uid}\)
SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp9}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{NOTSPACE:pam_by})?
CRON_ACTION [A-Z ]+
CRONLOG %{SYSLOGBASE} \(%{USER:user9}\) %{CRON_ACTION:action9} \(%{DATA:message9}\)
SYSLOGLINE %{SYSLOGBASE2} %{GREEDYDATA:message10}
SUDO_ERROR %{SUDO_ERROR_FAILED_TO_GET_PASSWORD}|%{SUDO_ERROR_INCORRECT_PASSWORD_ATTEMPTS}
GREEDYMULTILINE (.|\n)*
AUTH1 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname11} sshd(?:\[%{POSINT:systemauthpid11}\])?: %{DATA:systemauthsshevent} %{DATA:systemauthsshmethod} for (invalid user )?%{DATA:systemauthuser} from %{IPORHOST:systemauthsship} port %{NUMBER:systemauthsshport} ssh2(: %{GREEDYDATA:systemauthsshsignature})?
AUTH2 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname12} sshd(?:\[%{POSINT:systemauthpid12}\])?: %{DATA:systemauthsshevent} user %{DATA:systemauthuser} from %{IPORHOST:systemauthsship}
AUTH3 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname14} sshd(?:\[%{POSINT:systemauthpid13}\])?: Did not receive identification string from %{IPORHOST:systemauthsshdroppedip}
AUTH4 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname15} sudo(?:\[%{POSINT:systemauthpid14}\])?: \s*%{DATA:systemauthuser} :( %{DATA:systemauthsudoerror} ;)? TTY=%{DATA:systemauthsudotty} ; PWD=%{DATA:systemauthsudopwd} ; USER=%{DATA:systemauthsudouser} ; COMMAND=%{GREEDYDATA:systemauthosudocmd}
AUTH5 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname16} groupadd(?:\[%{POSINT:systemauthpid15}\])?: new group: name=%{DATA:systemauthgroupaddname}, GID=%{NUMBER:systemauthgroupaddgid}
AUTH6 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname17} useradd(?:\[%{POSINT:systemauthpid16}\])?: new user: name=%{DATA:systemauthuseraddname}, UID=%{NUMBER:systemauthuseradduid}, GID=%{NUMBER:systemauthuseraddgid}, home=%{DATA:systemauthuseraddhome}, shell=%{DATA:systemauthuseraddshell}$
AUTH7 %{SYSLOGTIMESTAMP:systemauthtimestamp} %{SYSLOGHOST:systemauthhostname18} %{DATA:systemauthprogram17}(?:\[%{POSINT:systemauthpid17}\])?: %{GREEDYMULTILINE:systemauthmessage}"] }
AUTH_LOG %{AUTH1}|%{AUTH2}|%{AUTH3}|%{AUTH4}|%{AUTH5}|%{AUTH6}|%{AUTH7}
SU \+\s+%{DATA:su_tty19}\s+%{USER:su_user19}:%{USER:su_targetuser19}
SSH_AUTHFAIL_WRONGUSER Failed %{WORD:ssh_authmethod} for invalid user %{USERNAME:ssh_user} from %{IP:ssh_client_ip} port %{NUMBER:ssh_client_port} %{GREEDYDATA:message}
SSH_AUTHFAIL_WRONGCREDS Failed %{WORD:ssh_authmethod} for %{USERNAME:ssh_user} from %{IP:ssh_client_ip} port %{NUMBER:ssh_client_port} %{GREEDYDATA:message}
SSH_AUTH_SUCCESS Accepted %{WORD:ssh_authmethod} for %{USERNAME:ssh_user} from %{IP:ssh_client_ip} port %{NUMBER:ssh_client_port} %{WORD:ssh_x} %{WORD:ssh_pubkey_type} %{GREEDYDATA:ssh_pubkey_fingerprint}
SSH_DISCONNECT Received disconnect from %{IP:ssh_client_ip} port %{INT:ssh_client_port}.*?:\s+%{GREEDYDATA:ssh_disconnect_reason}
SSH %{SSH_DISCONNECT}|%{SSH_AUTH_SUCCESS}|%{SSH_AUTHFAIL_WRONGUSER}|%{SSH_AUTHFAIL_WRONGCREDS}
SUDO %{SUDO_INFO}|%{SUDO_ERROR}|%{SUDO_PUBLICKEY}|%{SSH}|%{SUDO_OPEN_PAM}|%{SUDO_REMOVE_SESSION}|%{SUDO_PAM_OPEN2}|%{SUDO_SEAT}
Using some experimentation with http://grokdebug.herokuapp.com/ and finding some known patterns online. You can easily add more patterns to grab a lot of different log types. All of these can be pulled out in a processor, as seen in the above diagram. Source Code https://github.com/tspannhw/nifi-logs Example Template sudo.xml
... View more
Labels:
01-25-2019
09:01 PM
2 Kudos
Introduction
SoChain provides a fast set of public freely available APIs (don't abuse them) to access information on various networks.
If you need this for critical work, please donate: https://chain.so/address/devfund.
One of the things you will see in this simple flow is that NiFi excels in ingesting REST and working with JSON. As you can see with split it up, shred it, filter it, manipulate and extract from it. With the resulting usable objects we build a schema that will also us to do record processing. Once we have a set of records with a schema I can store it to
I just hosted a Future of Data Princeton Meetup in Woodbridge New Jersey with some amazing speakers sponsored by ChainNinja. While this was all about Blockchain for Enterprise and no cryptocurrency was involved it made we want to investigate some cryptocurrency data. As you can see, manipulating complex JSON data, filtering, modifying, routing and scripting with it's values is trivial in Apache NiFi.
In my next article I am investigating Hyperledger and Ethereum for enterprise solutions integration with Apache NiFi, Impala, Hive, Kudu, HBase, Spark, Kafka and other enterprise technologies. Steps We read from the URL I send the original file to immutable HDFS storage. In another branch, I will use EvaluateJSONPath to pull out one attribute to use to get detail records. $.data.blocks I use that attribute to build a deeper REST call to get the details for the latest block. https://chain.so/api/v2/block/BTC/${block_no} This is in invokeHTTP which is a scriptable HTTP(s) call. This comes in handy often. In the next EvaluateJSONPath I pull out all the high level attributes of the JSON file. I want these for all the records as master fields. These are repeated. After that I split out the two arrays of data beneath that into two separate branches. I will breach these down into individual records for parsing. I could also apply a schema and handle these are groups of records. This is an example of reading a REST API and creating a unique name per call. Also notice it's easy to handle HTTPS as well as HTTP.
SoChain Ingest Flow for REST APIs Calls
Example Unique File Name we can script
${filename:append('btc.'):append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})}
REST URLs
https://chain.so/api/v2/get_info/BTC
https://chain.so/api/v2/get_price/BTC/USD
https://chain.so/api/v2/get_info/DOGE
https://chain.so/api/v2/get_info/LTC
Example of the Value of the Apache NiFi Provenance. (These are the attributes acquired for one flowfile).
Attribute Values
Access-Control-Allow-Headers
Origin,Accept,Content-Type,X-Requested-With,X-CSRF-Token
Access-Control-Allow-Methods
GET,POST
Access-Control-Allow-Origin
*
CF-RAY
49e564b17e23923c-EWR
Cache-Control
no-cache, no-store, max-age=0, must-revalidate
Connection
keep-alive
Content-Type
application/json; charset=utf-8
Date
Thu, 24 Jan 2019 20:54:07 GMT
Expect-CT
max-age=604800, report-uri="https://report-uri.cloudflare.com/cdn-cgi/beacon/expect-ct"
Expires
Fri, 01 Jan 1990 00:00:00 GMT
Pragma
no-cache
Server
cloudflare
Set-Cookie
__cfduid=d6f52ee1552c73223442296ff7230e9fd1548363246; expires=Fri, 24-Jan-20 20:54:06 GMT; path=/; domain=.chain.so; HttpOnly, _mkra_ctxt=1a7dafd219c4972a7562f232dc63f524--200; path=/; max-age=5
Status
200 OK
Strict-Transport-Security
max-age=31536000;includeSubDomains
Transfer-Encoding
chunked
X-Content-Type-Options
nosniff
X-Download-Options
noopen
X-Frame-Options
SAMEORIGIN
X-Request-Id
20d3f592-50b6-40cf-a496-a6f915eb463b
X-Runtime
1.018401
X-XSS-Protection
1; mode=block
bits
172fd633
block_no
559950
blockhash
0000000000000000001c68f61ddcc30568536a583c843a7d0c9606b9582fd7e5
fee
0.05142179
filename
btc.201949241501759.json
fragment.count
1
fragment.identifier
cec10691-82e9-402b-84a9-7901b084f10a
fragment.index
0
gethttp.remote.source
chain.so
invokehttp.remote.dn
CN=ssl371663.cloudflaressl.com,OU=PositiveSSL Multi-Domain,OU=Domain Control Validated
invokehttp.request.url
https://chain.so/api/v2/block/BTC/559950
invokehttp.status.code
200
invokehttp.status.message
OK
invokehttp.tx.id
bc8a0a18-0685-4a2c-97fa-34541b9ea929
merkleroot
41eb6f68477e96c9239ae1bbe4e5d4d02529c6f7faebc4ad801730d09609a0ef
mime.type
application/json; charset=utf-8
mining_difficulty
5883988430955.408
network
BTC
next_blockhash
Empty string set
nonce
1358814296
path
./
previous_blockhash
0000000000000000001b2b3d3b5741462fe31981a6c0ae9335ed8851e936664b
schema
chainsotxinputinfo
schema.name
chainsotxinputinfo
segment.original.filename
btc.201949241501759.json
sent_value
3977.10078351
size
470242
time
1548362873
uuid
3c1d72b4-e993-4b32-a679-0741a44aeefb
An example input record:
{
"input_no" : 0,
"address" : "3N7Vid17hE1ofGcWR6bWEmtQBQ8kKQ7iKW",
"value" : "0.20993260",
"received_from" : {
"txid" : "4e0f00cddb8e3d98de7f645684dc7526468d1dc33efbbf0bc173ed19c6556896",
"output_no" : 4
}
}
An Example LiteCoin Record
{
"status" : "success",
"data" : {
"name" : "Litecoin",
"acronym" : "LTC",
"network" : "LTC",
"symbol_htmlcode" : "Ł",
"url" : "http://www.litecoin.com/",
"mining_difficulty" : "6399667.35869154",
"unconfirmed_txs" : 8,
"blocks" : 1567929,
"price" : "0.00000000",
"price_base" : "BTC",
"price_update_time" : 1548451214,
"hashrate" : "178582229079753"
}
}
Example NiFi Flow chainso.xml
... View more
Labels:
01-24-2019
09:34 AM
2 Kudos
See https://community.hortonworks.com/questions/107816/unable-to-get-schema-registry-working-in-hdf-30.html https://github.com/hortonworks/registry/issues/339 You need permissions to your database from SAM. Did you install from HDF Ambari? Did you setup the database for SAM? https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/getting-started-with-streaming-analytics/content/building_an_end-to-end_stream_application.html SAM needs HDP https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.3.1/installing-hdf-and-hdp/content/deploying_an_hdp_cluster_using_ambari.html
... View more
01-22-2019
09:50 PM
4 Kudos
Working with a Proximity Beacon Network Part 1 Introduction: In a retail environment, we want to be able to interact with people within the store. Our beacons can provide hyperlocalized information and also help us determine what's going on in the store for traffic patterns. Our beacons are also giving us temperature and other reading as any sensor we may have in our IoT retail environment. I have set up three Estimote Proximity beacons in an indoor environment broadcasting both Estimote and IBeacon messages. iBeacon is a Bluetooth advertising protocol by Apple that is built into iPhones. In this article we ingest, filter and route the data based on the beacon IDs. In a following article we will stream our data to a data store(s) and run machine learning and analytics on our streaming time series data. I tested the BLE library from the command line with a Python script: Cloud Setup Since we are using our own IoT gateways and networks, I am only using the Estimote Cloud to check the beacons and make sure they are registered. Estimote provides an IPhone Application (Estimote) that you can download and use to do some basic programming and range testing of the beacons. MiniFi Setup: We run our shell script that has the Python BLE scanner run for 40 seconds and add JSON rows to a file. We then continuously tail that file and read new lines and send them to NiFi for processing. As you can see MiniFi is sending a steady stream of data to an Apache NiFi instance via HTTP S2S API. NiFi Setup: The flow is pretty simple. We add a schema name to it, split the text to get one JSON row per line. We calculate record stats, just to check. The main logic is the Partition Record to help us partition records into different categories based on their Estimote Beacon IDs. We then route based on those partitions. We can do different filtering and handling after that if we need to. We partition our JSON records into AVRO records and pull out the estimote id as a new attribute to use for routing. In the Route we look at the partition key which is an address for the device. Software: Apache NiFi 1.8.0, MiniFi 0.5.0, Java JDK 1.8, Ubuntu 16.04, Apple IPhone SE, Python BLE / Beacon Libraries. Beacon Types: iBeacon, Estimote Networks: BLE and WiFi Source Code: https://github.com/tspannhw/minifi-estimote Schema: { "type" : "record", "name" : "estimote", "fields" : [ { "name" : "battery", "type" : "int", "doc" : "Type inferred from '80'" }, { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"47a038d5eb032640\"'" }, { "name" : "magnetic_fieldz", "type" : "double", "doc" : "Type inferred from '-0.1484375'" }, { "name" : "magnetic_fieldx", "type" : "double", "doc" : "Type inferred from '-0.3125'" }, { "name" : "magnetic_fieldy", "type" : "double", "doc" : "Type inferred from '0.515625'" }, { "name" : "end", "type" : "string", "doc" : "Type inferred from '\"1547679071.5\"'" }, { "name" : "temperature", "type" : "double", "doc" : "Type inferred from '25.5'" }, { "name" : "cputemp1", "type" : "double", "doc" : "Type inferred from '38.0'" }, { "name" : "memory", "type" : "double", "doc" : "Type inferred from '26.1'" }, { "name" : "protocol_version", "type" : "int", "doc" : "Type inferred from '2'" }, { "name" : "current_motion", "type" : "int", "doc" : "Type inferred from '420'" }, { "name" : "te", "type" : "string", "doc" : "Type inferred from '\"0.362270116806\"'" }, { "name" : "systemtime", "type" : "string", "doc" : "Type inferred from '\"01/16/2019 17:51:11\"'" }, { "name" : "cputemp", "type" : "double", "doc" : "Type inferred from '39.0'" }, { "name" : "uptime", "type" : "int", "doc" : "Type inferred from '4870800'" }, { "name" : "host", "type" : "string", "doc" : "Type inferred from '\"Laptop\"'" }, { "name" : "diskusage", "type" : "string", "doc" : "Type inferred from '\"418487.1\"'" }, { "name" : "ipaddress", "type" : "string", "doc" : "Type inferred from '\"192.168.1.241\"'" }, { "name" : "uuid", "type" : "string", "doc" : "Type inferred from '\"20190116225111_2cbbac13-fed0-4d81-a24a-3aa593b5f674\"'" }, { "name" : "is_moving", "type" : "boolean", "doc" : "Type inferred from 'false'" }, { "name" : "accelerationy", "type" : "double", "doc" : "Type inferred from '0.015748031496062992'" }, { "name" : "accelerationx", "type" : "double", "doc" : "Type inferred from '0.0'" }, { "name" : "accelerationz", "type" : "double", "doc" : "Type inferred from '1.0236220472440944'" }, { "name" : "starttime", "type" : "string", "doc" : "Type inferred from '\"01/16/2019 17:51:11\"'" }, { "name" : "rssi", "type" : "int", "doc" : "Type inferred from '-60'" }, { "name" : "bt_addr", "type" : "string", "doc" : "Type inferred from '\"fa:e2:20:6e:d4:a5\"'" } ] }
Python Snippet: from beacontools import parse_packet
from beacontools import BeaconScanner, EstimoteTelemetryFrameA, EstimoteTelemetryFrameB, EstimoteFilter telemetry_b_packet = b"\x02\x01\x04\x03\x03\x9a\xfe\x17\x16\x9a\xfe\x22\x47\xa0\x38\xd5" b"\xeb\x03\x26\x40\x01\xd8\x42\xed\x73\x49\x25\x66\xbc\x2e\x50"
telemetry_b = parse_packet(telemetry_b_packet)
telemetry_a_packet = b"\x02\x01\x04\x03\x03\x9a\xfe\x17\x16\x9a\xfe\x22\x47\xa0\x38\xd5" b"\xeb\x03\x26\x40\x00\x00\x01\x41\x44\x47\xfa\xff\xff\xff\xff"
telemetry = parse_packet(telemetry_a_packet) Example Data: {"battery": 80, "id": "47a038d5eb032640", "magnetic_fieldz": -0.1484375, "magnetic_fieldx": -0.3125, "magnetic_fieldy": 0.515625, "end": "1548194024.99", "temperature": 25.5, "cputemp1": 45.0, "memory": 42.6, "protocol_version": 2, "current_motion": 420, "te": "39.767373085", "systemtime": "01/22/2019 16:53:44", "cputemp": 43.0, "uptime": 4870800, "host": "Laptop", "diskusage": "418124.2", "ipaddress": "192.168.1.241", "uuid": "20190122215344_2a41168e-31da-4ae7-bf62-0b300c69cd5b", "is_moving": false, "accelerationy": 0.015748031496062992, "accelerationx": 0.0, "accelerationz": 1.0236220472440944, "starttime": "01/22/2019 16:53:05", "rssi": -63, "bt_addr": "fa:e2:20:6e:d4:a5"} We have several values from the Ubuntu MiniFi host machine:
host diskuage ipaddress cputemp memory We have important values from the three beacons:
battery magnetic_field(x, y, z) current_motion id bt_addr rssi estimoteid temperature Reference Articles:
https://community.hortonworks.com/articles/99861/ingesting-ibeacon-data-via-ble-to-mqtt-wifi-gatewa.html https://community.hortonworks.com/articles/108947/minifi-for-ble-bluetooth-low-energy-beacon-data-in.html https://community.hortonworks.com/articles/131320/using-partitionrecord-grokreaderjsonwriter-to-pars.html Resources:
https://en.wikipedia.org/wiki/Bluetooth_low_energy_beacon https://cloud.estimote.com/#/beacons https://developer.estimote.com/ibeacon/ https://developer.apple.com/ibeacon/Getting-Started-with-iBeacon.pdf https://pypi.org/project/beacontools/ https://www.instructables.com/id/iBeacon-Entry-System-with-the-Raspberry-Pi-and-Azu/#step0 https://github.com/switchdoclabs/iBeacon-Scanner- https://developer.estimote.com/android/tutorial/part-1-setting-up/ https://developer.estimote.com/ https://github.com/flyinactor91/RasPi-iBeacons https://github.com/GillisWerrebrouck/BeaconScanner https://github.com/emanuele-falzone/pedestrian-gate-automation https://github.com/biagiobotticelli/SmartTeamTrackingServer https://github.com/citruz/beacontools/blob/master/examples/parser_example.py https://github.com/citruz/beacontools/blob/master/beacontools/packet_types/ibeacon.py
... View more
Labels:
01-18-2019
09:29 PM
3 Kudos
I need to parse Kerberos KDC Log files (including the currently filling file) to find users with their host that are connecting. It seems using Grok in NiFi we can parse out a lot of different parts of these files and use them for filtering and alerting with ease. This is what many of the lines in the log file look like: Jan 01 03:31:01 somenewserver-310 krb5kdc[28593](info): AS_REQ (4 etypes {18 17 16 23}) 192.168.237.220: ISSUE: authtime 1546278185, etypes {rep=18 tkt=16 ses=18}, nn/somenewserver-310.field.hortonworks.com@HWX.COM for nn/somenewserver-310.field.hortonworks.com@HWX.COM State of the Tail Processor Tail a File We also have the option of using the GrokReader listed in an article included to immediately convert matching records to output formats like JSON or Avro and then partition into groups. We'll do that in a later article. In this one, we can get a line from the file via Tail, read a list of files and fetch one at a time or generate a flow file for testing. Once we had some data we'll start parsing into different message types. These messages can then be use for alerting, routing, permanent storage in Hive/Impala/HBase/Kudu/Druid/S3/Object Storage/etc... In the next step we will do some routing and alerting. Follow up by some natural language processing (NLP), machine learning and then we'll use various tools to search, aggregate, query, catalog, report on and build dashboards from this type of log and others.
Example Output JSON Formatted
PREAUTH
{
"date" : "Jan 07 02:25:15",
"etypes" : "2 etypes {23 16}",
"MONTH" : "Jan",
"HOUR" : "02",
"emailhost" : "cloudera.net",
"TIME" : "02:25:15",
"pid" : "21546",
"loghost" : "KDCHOST1",
"kuser" : "krbtgt",
"message" : "Additional pre-authentication required",
"emailuser" : "user1",
"MINUTE" : "25",
"SECOND" : "15",
"LOGLEVEL" : "info",
"MONTHDAY" : "01",
"apphost" : "APP_HOST1",
"kuserhost" : "cloudera.net@cloudera.net"
}
ISSUE
{
"date" : "Jan 01 03:20:09",
"etypes" : "2 etypes {23 18}",
"MONTH" : "Jan",
"HOUR" : "03",
"BASE10NUM" : "1546330809",
"emailhost" : "cloudera.net",
"TIME" : "03:20:09",
"pid" : "24546",
"loghost" : "KDCHOST1",
"kuser" : "krbtgt",
"message" : "",
"emailuser" : "user1",
"authtime" : "1546330809",
"MINUTE" : "20",
"SECOND" : "09",
"etypes2" : "rep=23 tkt=18 ses=23",
"LOGLEVEL" : "info",
"MONTHDAY" : "01",
"apphost" : "APP_HOST1",
"kuserhost" : "cloudera.net@cloudera.net"
}
Grok Expressions
For Parsing Failure Records
%{SYSLOGTIMESTAMP:date} %{HOSTNAME:loghost} krb5kdc\[%{POSINT:pid}\]\(%{LOGLEVEL}\): %{GREEDYDATA:premessage}failure%{GREEDYDATA:postmessage}
For Parsing PREAUTH Records
%{SYSLOGTIMESTAMP:date} %{HOSTNAME:loghost} krb5kdc\[%{POSINT:pid}\]\(%{LOGLEVEL}\): AS_REQ \(%{GREEDYDATA:etypes}\) %{GREEDYDATA:apphost}: NEEDED_PREAUTH: %{USERNAME:emailuser}@%{HOSTNAME:emailhost} for %{GREEDYDATA:kuser}/%{GREEDYDATA:kuserhost}, %{GREEDYDATA:message}
For Parsing ISSUE Records
%{SYSLOGTIMESTAMP:date} %{HOSTNAME:loghost} krb5kdc\[%{POSINT:pid}\]\(%{LOGLEVEL}\): AS_REQ \(%{GREEDYDATA:etypes}\) %{GREEDYDATA:apphost}: ISSUE: authtime %{NUMBER:authtime}, etypes \{%{GREEDYDATA:etypes2}\}, %{USERNAME:emailuser}@%{HOSTNAME:emailhost} for %{GREEDYDATA:kuser}/%{GREEDYDATA:kuserhost}%{GREEDYDATA:message}
Resources:
For Testing Grok Against Your Files
http://grokdebug.herokuapp.com/
A Great Article on Using GrokReader for Record Oriented Processing
https://community.hortonworks.com/articles/131320/using-partitionrecord-grokreaderjsonwriter-to-pars.html More About Grok https://datahovel.com/2018/07/ https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.7.1/org.apache.nifi.grok.GrokReader/additionalDetails.html http://grokconstructor.appspot.com/do/automatic?example=0 https://gist.github.com/acobaugh/5aecffbaaa593d80022b3534e5363a2d
... View more
Labels:
01-11-2019
07:44 PM
Very nice! I did two articles a few years ago, but things have advanced greatly. https://community.hortonworks.com/articles/88404/adding-and-using-hplsql-and-hivemall-with-hive-mac.html https://community.hortonworks.com/articles/67983/apache-hive-with-apache-hivemall.html
... View more