1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 1843 | 04-03-2024 06:39 AM | |
| 2874 | 01-12-2024 08:19 AM | |
| 1584 | 12-07-2023 01:49 PM | |
| 2349 | 08-02-2023 07:30 AM | |
| 3241 | 03-29-2023 01:22 PM |
05-12-2018
04:08 AM
4 Kudos
Accessing Feeds from EtherDelta on Trades, Funds, Buys and Sells (Cryptocurrency Analysis) EtherDelta lets you trade Ether or Ethereum-based tokens. Ethereum (https://www.ethereum.org/) is an open-source block chain platform for running smart contracts. They provide a fast web-socket feed of all data coming through the system. We can tap this websocket feed with Apache NiFi to examine and ingest all the trades, funds, buys and sells coming through the system as JSON. Once we ingest, clean up, parse and schematize the data we can run queries on it with Apache Spark SQL and Apache Hive in Apache Zeppelin notebooks. Now the data scientists have a continuing stream of data to play with. Next we will start adding additional feeds from other Ethereum exchanges, Bit Coin APIs and other sources of data. Some of the APIs are REST, some are websockets and some are SDKs. All of these are easy to ingest with Apache NiFi. Initial Ingest from EtherDelta via Web Sockets API Route To The Correct Type To process this feed with Apache NiFi 1.6+: ConnectWebSocket - wss://socket.etherdelta.com/socket.io/?transport=websocket ReplaceText - three to remove extraneous data outside of the JSON (40, 42, 0 - websocket junk text) RouteOnAttribute - filter out too small files RouteOnContent - sell, buy, trades and funds SplitJson buys arrays ($.*.buys) or ($.*.sells) SplitJson buys and sells into individual JSON Records ($.[*]) UpdateAttribute add a schema name Send to Remote Cluster via HTTP/HTTPS On Remote Cluster process and query to limit and convert data to Hive friendly format Store in HDFS NiFi generates DDL for an external Hive table Query with Zeppelin Hand off to other Zeppelin user who is a Data Scientist for machine learning and statistics. Profit! Break apart Sell Records Setup Our Web Socket Client Macintosh Default Java Certificate for SSL (default password is changeit) Connect to the Web Socket Remove the extra '40' Throw away small files! Route for Buys, Funds, Sells and Trades EtherDelta Exchange
https://etherdelta.com/#PPT-ETH https://github.com/etherdelta/bots https://github.com/etherdelta/etherdelta.github.io/blob/master/docs/API.md EtherDelta provides a WebSocket feed of their data, so I am ingesting that with Apache NiFi and breaking out different types of data being published. This will let us ingest into different Apache Hive tables and run some queries and analytics in Apache Zeppelin on this data. We can then make it available to Data Scientists. JsonPath Expressions JsonPath Expression for trades and funds
$.*.*
Split Json Orders Buys
$.*.buys
Split Json Orders Sells
$.*.sells Schemas ethereumfunds
{ "type" : "record", "name" : "ethereumfunds", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0xc40e227f3d5c2e125791cd865e8dd36c4a1a86538f13905ce91d65d2ac721742\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:52:19.000Z\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'" }, { "name" : "kind", "type" : "string", "doc" : "Type inferred from '\"Deposit\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" }, { "name" : "balance", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" } ] }
ethereumtrades
{ "type" : "record", "name" : "ethereumtrades", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0x0350b7b479c9372c07188d69aa642ced7637b05444735653d61316dd852a673c\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T18:32:27.000Z\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.000049874\"'" }, { "name" : "side", "type" : "string", "doc" : "Type inferred from '\"buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"8800\"'" }, { "name" : "amountBase", "type" : "string", "doc" : "Type inferred from '\"0.4388912\"'" }, { "name" : "buyer", "type" : "string", "doc" : "Type inferred from '\"0xd170db528cd2dd6ca67b0b2e3f7cd6e24942dba2\"'" }, { "name" : "seller", "type" : "string", "doc" : "Type inferred from '\"0xecfd625bfc433e8f6c8ce4abb92b9e8f1db3e401\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0x6888a16ea9792c15a4dcf2f6c623d055c8ede792\"'" } ] }
ethereumbuy
{ "type" : "record", "name" : "ethereumbuy", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"9c946737c29c807255c3aac7334e182e375cc3a32684c66ccda03e9f5c52e47e_buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0002743\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x99ea4db9ee77acd40b119bd1dc4e33e1c070b80d\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5590043\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"7586490717308181\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xd04a8f0a1f86fe8e3bdefb717f1bc461cfdb998f705dff3a5fc5567d023ca116\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x7a8b687a8b7faea852ac873c02428acd5c26457282a4105fe12b60776fd87d55\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x0b419BCE1Cb87ADEa84A913Fa903593fB68D33B1\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:41:10.058Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"6853.81\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1.880000083\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }
ethereumsell
{ "type" : "record", "name" : "ethereumsell", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"27b06f4b8caf4aaa6d05841f8daa077f5f2131145331489ae94febc5eddd8c56_sell\"'" }, { "name" : "deleted", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"-1.1627879935162941e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0004473\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"520115069499838335\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0xe3818504c1B32bF1557b16C238B2E01Fd3149C17\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1162787993516294126989\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5589988\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"57125161\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xafd5497f6159ac6589fd1804d27fe05436ed13706e64002f0e82e93b471e1780\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x76dc38410d35069d1a62c08a1976548fbb915d4c52d8c9789157149720b04a33\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x7418b4B9327b2DD18AC90Ef2eF846b36F286adA4\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:36:55.000Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"1.16278799351629411637773277515287799587448e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"1162.7879935162941\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"520115069499838340\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"0.5201150694998383\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }
Example JSON Data {"id":"237ab7693be71d35783941da9686f340d32b6d1e7332eedd0636b3e7b3725b93_sell","deleted":true,"amount":"-6000000000000000000","price":"0.02","tokenGet":"0x0000000000000000000000000000000000000000","amountGet":"120000000000000000","tokenGive":"0x219218f117dc9348b358b8471c55a073e5e0da0b","amountGive":"6000000000000000000","expires":"5594927","nonce":"4013451455","v":27,"r":"0x3965d8a9b074c6dcf25ebe10d39833f5ec6aa2d892aaec057dffd4368cd39f46","s":"0x01820eb919fe725c142f560c501742a89d41ca86b5594fdedc94f0e6f91bc97f","user":"0x0d4F98cb588c18FCC2695e2341112f066A915f80","updated":"2018-05-11T13:58:48.064Z","availableVolume":"798131841361720650","ethAvailableVolume":"0.7981318413617207","availableVolumeBase":"15962636827234412","ethAvailableVolumeBase":"0.01596263682723441","amountFilled":null}
{
"type": "record",
"name": "ethereumfunds",
"fields": [
{
"name": "txHash",
"type": [
"string",
"null"
]
},
{
"name": "date",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"2018-05-10T15:52:19.000Z\"'"
},
{
"name": "tokenAddr",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'"
},
{
"name": "kind",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"Deposit\"'"
},
{
"name": "user",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'"
},
{
"name": "amount",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"5986.826\"'"
},
{
"name": "balance",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"5986.826\"'"
}
]
}
SQL Table DDL %jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumsell (id STRING, deleted BOOLEAN, amount STRING, price STRING, tokenGet STRING, amountGet STRING, tokenGive STRING, amountGive STRING, expires STRING, nonce STRING, v INT, r STRING, s STRING, `user` STRING, updated STRING, availableVolume STRING, ethAvailableVolume STRING, availableVolumeBase STRING, ethAvailableVolumeBase STRING, amountFilled STRING) STORED AS ORC
LOCATION '/etherdelta/sell'
%sql
select * from ethereumsell order by cast(price as double) desc
%jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumtrades (txHash STRING, `date` STRING, price STRING, side STRING, amount STRING, amountBase STRING, buyer STRING, seller STRING, tokenAddr STRING) STORED AS ORC
LOCATION '/etherdelta/trade'
%jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumbuy (id STRING, amount STRING, price STRING, tokenGet STRING, amountGet STRING, tokenGive STRING, amountGive STRING, expires STRING, nonce STRING, v INT, r STRING, s STRING, `user` STRING, updated STRING, availableVolume STRING, ethAvailableVolume STRING, availableVolumeBase STRING, ethAvailableVolumeBase STRING, amountFilled STRING) STORED AS ORC LOCATION '/etherdelta/buy'
SELECT * FROM ethereumbuy
order by cast(price as double) desc
We now have four tables full of different Ethereum Trades, Sells, Buys and Funds Schemas in Hortonworks Schema Registry Storing The Data Run SQL on the Flows Reference:
https://community.hortonworks.com/articles/68378/nifi-websocket-support.html https://gist.github.com/ijokarumawak/60b9ab2038ef906731ebf4c0eee97176 http://ijokarumawak.github.io/nifi/2016/11/04/nifi-websocket/ https://www.ethereum.org/cli http://web3py.readthedocs.io/en/stable/ Source etherdelta-analysis-1.json etherdelta-storage.xml
... View more
Labels:
05-08-2018
01:17 PM
Quick tip: When you create a free Twitter app at apps.twitter.com, they will let you read and write but not a lot. Don't tweet every 5 minutes for hours or you will get that app's write ability blocked. Also be sure you are no reposting SPAM.
... View more
05-07-2018
08:19 PM
2 Kudos
Flow We can remove Sentiment if you don't want to install my custom processor: This is what they look like: I grab some fields I like: These are fields I want to save: This is a simple version of the flow to just ingest tweets, run sentiment analysis and store in directory as clean JSON. You can drop the sentiment analysis and do it later. You can also run a python script for that. We could make this simpler and just have GetTwitter then PutFile. This will store the RAW Twitter JSON file which is a very sparse nested JSON file. if you want the raw data, that is an option. It's a pain to work with that format and not perfect for analytics. I flatten it and just grab what I have seen as the core attributes, you can add more or drop some of them easily. This is a simple version that could be used for Art or Personal Projects or anyone who wants to store their own tweets and related items. Get Your Twitter ID: https://tweeterid.com/ Documentation: https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object Create Your Application: https://apps.twitter.com/ https://apps.twitter.com/app/new Application Settings You Need: Consumer Key (API Key) and Consumer Secret (API Secret) Your Access Token You Need: Access Token and Access Token Secret. Make sure you keep the secrets secure as you don't want people tweeting in your name or reading your stuff. You will place these in the GetTwitter processor. Click start once you add that. You can filter out languages like en for English and es for Spanish. We just save these JSON files to a directory for later use. We could also aggregate them and compress them if you like. Or send them to an Amazon S3, email them, or whatever. We can also retweet those, but now we are getting fancy and we already wrote that article this morning. Custom Processor: https://github.com/tspannhw/nifi-corenlp-processor Example Tweet in JSON Stored: {
"msg" : "RT @PaasDev Tim said @ApacheNiFi is awesome",
"unixtime" : "1525724645676",
"friends_count" : "5268",
"sentiment" : "POSITIVE",
"hashtags" : "[\"ApacheNiFi\"]",
"listed_count" : "25",
"tweet_id" : "993587294715203584",
"user_name" : "Tim Spann",
"favourites_count" : "5348",
"source" : "NiFiTweetBot",
"placename" : "",
"media_url" : "[]",
"retweet_count" : "0",
"user_mentions_name" : "[]",
"geo" : "",
"urls" : "[]",
"countryCode" : "",
"user_url" : "",
"place" : "",
"timestamp" : "1525724645676",
"coordinates" : "",
"handle" : "PaasDev",
"profile_image_url" : "http://pbs.twimg.com/profile_images/34343/34343.jpg",
"time_zone" : "Eastern Time (US & Canada)",
"ext_media" : "[]",
"statuses_count" : "5994",
"followers_count" : "1963",
"location" : "Princeton, NJ",
"time" : "Mon May 07 20:24:05 +0000 2018",
"user_mentions" : "[]",
"user_description" : "Tim NiFi Guy"
} Download and Import to Apache NiFi This Template simplenifitwitter.xml Setup Get some Apache NiFi https://www.apache.org/dyn/closer.lua?path=/nifi/1.6.0/nifi-1.6.0-bin.zip Unzip it. On some Linux's you may need to apt-get install unzip or yum install unzip. You may need to be root, so you can do something like sudo su. You will need Java installed. For a low cost small Linux server, you can use one of these two services, and they also tell you how to install Java. There are many low cost options. This application is small enough to also run on your laptop, an old desktop PC or a small cloud instance. https://www.digitalocean.com/community/tutorials/how-to-install-java-on-centos-and-fedora https://www.linode.com/docs/development/java/install-java-on-centos/ https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-get-on-ubuntu-16-04 Generally something like this:
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update sudo apt-get install oracle-java8-installer or sudo yum install java-1.8.0-openjdk-devel OpenJDK 8 or Oracle JDK 8 are perfect. You can also run some Docker containers if you like that sort of thing: https://github.com/minyk/nifi-sandbox You can also download one of the Hortonworks HDF 3.1 Sandboxes to run this as well: https://hortonworks.com/downloads/#sandbox Those have Apache NiFi and Java preinstalled! Here are some Docker Instructions: https://hortonworks.com/tutorial/sandbox-deployment-and-install-guide/section/3/ https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_installing-nifi/content/ch_nifi-installation.xml.html Resources: https://github.com/tspannhw?utf8=%E2%9C%93&tab=repositories&q=&type=source&language= https://community.hortonworks.com/articles/81270/adding-stanford-corenlp-to-big-data-pipelines-apac-1.html https://community.hortonworks.com/articles/80418/open-nlp-example-apache-nifi-processor.html https://community.hortonworks.com/articles/177370/extracting-html-from-pdf-excel-and-word-documents.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html https://community.hortonworks.com/articles/163776/parsing-any-document-with-apache-nifi-15-with-apac.html https://community.hortonworks.com/content/kbentry/189735/automating-social-media-sending-tweets-with-apache.html
... View more
Labels:
05-05-2018
01:51 PM
3 Kudos
Tracking Air Quality with HDP and HDF: Part 1 - Apache NiFi Ingest Part 2: Plan Data Storage. Store to Apache Hive, Apache Druid and Apache HBase. Part 3: Query and Visualize Data with Apache Zeppelin and Superset There was an Air Quality alert a few days ago near me and I was curious how I could keep track of this important environmental information. So NiFi! This data is different from weather data, but makes a lot of sense for analytics to add in data from Weather, Social and locally captured cameras. It's very easy to ingest these JSON and Camera Images via Apache NiFi. In the next section we will analyze the datasets and determine how we can aggregate and accumulate massive quantities of this data for tracking air quality in various areas over time and use that as a dimension with other relevant data like weather. We are tracking contaminants and particles in the air. These include:
pm25, pm10 - atmospheric particulate matter so2 - sulfur dioxide no2 - nitrogen dioxide o3 - ozone co - carbon monoxide Photos Courtesy of HazeCam - Brigantine, NJ Example Data {"location":"ARB OER","city":"CA8 - ARB","country":"US","distance":3848728.319714322,"measurements":[{"parameter":"pm25","value":-4,"lastUpdated":"2016-08-08T16:00:00.000Z","unit":"µg/m³","sourceName":"AirNow","averagingPeriod":{"value":1,"unit":"hours"}}],"coordinates":{"latitude":38.568504,"longitude":-121.493256}} {
"location" : "MONTG",
"parameter" : "o3",
"date" : {
"utc" : "2018-05-05T12:00:00.000Z",
"local" : "2018-05-05T06:00:00-06:00"
},
"value" : 0.004,
"unit" : "ppm",
"coordinates" : {
"latitude" : 32.4069,
"longitude" : -86.2564
},
"country" : "US",
"city" : "Montgomery"
} Most of the data is arrays of JSON, so we can easily break that down into individual JSON records, derive an AVRO Schema from that data and then process it as we want. We can join them together and then convert into ORC files or HBase rows. Data Feed Links Haze Cam Provides Web Camera Images of Potential Haze http://hazecam.net/images/main/brigantine_right.jpg OpenAQ (https://openaq.org/#/?_k=7mfsz6) Provides Open Air Quality Data https://api.openaq.org/v1/latest?country=US https://api.openaq.org/v1/measurements?country=US&date_from=2018-05-04 Air NOW API (Provides forecasts and current conditions) http://www.airnowapi.org/aq/observation/zipCode/current/?format=application/json&zipCode=08520&distance=50&API_KEY=SIGNUPFORANAPIKEY http://www.airnowapi.org/aq/forecast/zipCode/?format=application/json&zipCode=08520&date=2018-05-02&distance=25&API_KEY=SIGNUPFORANAPIKEY EPA's Air Quality Notifications http://feeds.enviroflash.info/ https://www.airnow.gov/index.cfm?action=airnow.national http://feeds.enviroflash.info/rss/realtime/445.xml Other Sources http://feeds.enviroflash.info/cap/aggregate.xml https://docs.openaq.org/
... View more
Labels:
05-03-2018
04:00 PM
3 Kudos
Converting CSV Files to Apache Hive Tables with Apache ORC Files I received some CSV files of data to load into Apache Hive. There are many ways to do this, but I wanted to see how easy it was to do in Apache NiFi with zero code. I read CSV files from a directory of files. Then I can Convert the CSV to AVRO directly with ConvertRecord. I will need a schema, so I use the below settings for InferAvroSchema. if ever file is different, you will need to do this every time. CSV Reader I use the Jackson CSV parser which works very well. The first line of the CSV is a header. It can figure out the fields from the header. Once I have an Apache AVRO file it's easy to convert to Apache ORC and then store in HDFS. Template: csvprocess.xml
... View more
Labels:
04-27-2018
04:39 PM
2 Kudos
ETL With Lookups with Apache HBase and Apache NiFi (Microservices Style ETL) When we are ingesting tabular / record-oriented data, we often want to enrich the data by replacing ids with descriptions or visa-versa. There are many transformations that may need to happen before the data is in a happy state. When you are denormalizing your data in Hadoop and usually building very wide tables you often want descriptions or other data to enhance it's usability. Only one call to get everything you need is nice, especially when you have 100 trillion records. We are utilizing a lot of things built already (https://community.hortonworks.com/articles/146198/data-flow-enrichment-with-nifi-part-3-lookuprecord.html). Make sure you read Abdelkrim's first 3 lookup articles. I added some fields to his generated data for testing. I want to do my lookups against HBase which is a great NoSQL store for lookup tables and generate datasets. First I created an HBase Table to use for lookups. Create HBase Table For Lookups create 'lookup_', 'family' Table With Data Most people would have a pre-populated table for lookups. I don't and since we are using a generator to build the lookup ids, I am building the lookup descriptions with a REST CALL at the same time. We could also have a flow that if you don't find the lookup add it, we could also have another flow ingesting the lookup values and add/update those when needed. REST API To Generate Product Descriptions https://baconipsum.com/api/?type=meat&sentences=1&format=text I found this cool API that returns a sentence of meat words. I use this as our description, because MEAT! Call the Bacon API!!! Let's turn our plain text into a clean JSON document Then I store it in HBase as my lookup table. You probably already have a lookup table. This is a demo and I am filling it with my generator. This is not a best practice or a good design pattern. This is a lazy way to populate a table. Example Apache NiFi Flow (Using Apache NiFi 1.5) Generate Some Test Data (https://community.hortonworks.com/articles/146198/data-flow-enrichment-with-nifi-part-3-lookuprecord.html) Generate A Json Document (Note the Empty prod_desc) {
"ts" : "${now():format('yyyymmddHHMMSS')}",
"updated_dt" : "${now()}",
"id_store" : ${random():mod(5):toNumber():plus(1)},
"event_type" : "generated",
"uuid" : "${UUID()}",
"hostname" : "${hostname()}",
"ip" : "${ip()}",
"counter" : "${nextInt()}",
"id_transaction" : "${random():toString()}",
"id_product" : ${random():mod(500000):toNumber()},
"value_product" : ${now():toNumber()},
"prod_desc": ""
}
Lookup Your Record This is the magic. We take in our records, in this case we are reading JSON records and writing JSON records, we could choose CSV, AVRO or others. We connect to the HBase Record Lookup Service. We replace the current prod_desc field in the record with what is returned by the lookup. We use the id_product field as the lookup key. There is nothing else needed to change records in stream. HBase Record Lookup Service HBase Client Service Used by HBase Record Lookup Service We can use UpdateRecord to cleanup, transform or modify any field in the records in stream. Original File {
"ts" : "201856271804499",
"updated_dt" : "Fri Apr 27 18:56:15 UTC 2018",
"id_store" : 1,
"event_type" : "generated",
"uuid" : "0d16967d-102d-4864-b55a-3f1cb224a0a6",
"hostname" : "princeton1",
"ip" : "172.26.217.170",
"counter" : "7463",
"id_transaction" : "5307056748245491959",
"id_product" : 430672,
"value_product" : 1524855375500,
"prod_desc": ""
}
Final File (Note we have populated prod_desc with MEAT!) [ {
"ts" : "201856271804499",
"prod_desc" : "Pork chop leberkas brisket chuck, filet mignon turducken hamburger.",
"updated_dt" : "Fri Apr 27 18:56:15 UTC 2018",
"id_store" : 1,
"event_type" : "generated",
"uuid" : "0d16967d-102d-4864-b55a-3f1cb224a0a6",
"hostname" : "princeton1",
"ip" : "172.26.217.170",
"counter" : "7463",
"id_transaction" : "5307056748245491959",
"id_product" : 430672,
"value_product" : 1524855375500
} ]
References:
https://community.hortonworks.com/articles/171787/hdf-31-executing-apache-spark-via-executesparkinte.html https://community.hortonworks.com/articles/155527/ingesting-golden-gate-records-from-apache-kafka-an.html https://community.hortonworks.com/questions/174144/lookuprecord-and-simplecsvfilelookupservice-in-nif.html https://community.hortonworks.com/articles/138632/data-flow-enrichment-with-nifi-lookuprecord-proces.html https://community.hortonworks.com/articles/64122/incrementally-streaming-rdbms-data-to-your-hadoop.html For those wishing to not include meat in their data, there are alternatives: https://www.vegguide.org/site/api-docs Example Flow etlv2.xml
... View more
Labels:
04-24-2018
06:44 PM
Vision Thing Part 3: Image Analytics Open Source Computer Vision with TensorFlow, Apache MiniFi, Apache NiFi, OpenCV, Apache Tika and Python In preparation for this talk, I am releasing some articles detailing how to work with images. In this one For Linux machines I recommend building OpenCV yourself and installing the Python connector. sudo yum install -y https://centos7.iuscommunity.org/ius-release.rpm
sudo yum update -y
sudo yum groupinstall 'Development Tools' -y
sudo yum install cmake git pkgconfig -y
sudo yum install libpng-devel libjpeg-turbo-devel jasper-devel openexr-devel libtiff-devel libwebp-devel -y
sudo yum install libdc1394-devel libv4l-devel gstreamer-plugins-base-devel -y
sudo yum install gtk2-devel -y
sudo yum install tbb-devel eigen3-devel -y
sudo yum install -y python36u python36u-libs python36u-devel python36u-pip -y
pip3.6 install numpy
cd ~
git clone https://github.com/Itseez/opencv.git
cd opencv
git checkout 3.1.0
git clone https://github.com/Itseez/opencv_contrib.git
cd opencv_contrib
git checkout 3.1.0
cd ~/opencv
mkdir build
cd build
cmake -D CMAKE_BUILD_TYPE=RELEASE \
-D CMAKE_INSTALL_PREFIX=/usr/local \
-D OPENCV_EXTRA_MODULES_PATH=~/opencv_contrib/modules \
-D INSTALL_C_EXAMPLES=OFF \
-D INSTALL_PYTHON_EXAMPLES=ON \
-D BUILD_EXAMPLES=ON \
-D BUILD_OPENCV_PYTHON2=ON -D BUILD_OPENCV_PYTHON3=ON ..
sudo make
sudo make install
sudo ldconfig
pip3.6 install opencv-python
... View more
Labels:
04-23-2018
09:33 PM
Thanks, I did see that but it looked a bit hard to follow how to do it from scratch
... View more
04-15-2018
09:55 AM
3 Kudos
TIBCO Enterprise Message Service https://www.tibco.com/products/tibco-enterprise-message-service I tested this against the most recent release of TIBCO Enterprise Message Service and their JMS driver available via trial download. I followed the very easy install directions. I downloaded it to a Centos 7 server. Expanded my download to TIB_ems-dev_8.4.0_linux_x86_64 Then made it executable and ran TIBCOUniversalInstaller-lnx-x86-64.bin --console. I used all the defaults (I picked server and client) and then quickly ran the finished install server. Running Tibco on Centos 7 cd /opt/tibco/ems/8.4/bin/
./tibemsd64 -config ~/TIBCO_HOME/tibco/cfgmgmt/ems/data/tibemsd.conf Example JMS Queue Settings URL: tcp://servername:7222
class: com.tibco.tibjms.TibjmsQueueConnectionFactory
Directory: /opt/tibco/ems/8.4/lib/ I believe it just uses these files from that directory:
tibjms.jar jms-2.0.jar Once I have my server and port shown, it's easy to add those settings to Apache NiFi. The settings I need to Publish messages is below. After you enter your username and queue, you need to create (or use) a controller service. Then we use our settings for our server, mine are the default ones. Make sure you enter the lib directory containing your jars and that it is on the Apache NiFi server and Apache NiFi user has permissions to read them. You can also use this same controller to Consume JMS messages from TIBCO EMS. These are example metadata attributes that Apache NiFi provides to you on message receipt. Example Run Log of my TIBCO EMS v8.4.0 Server running on Linux. Example Flow tibco-jms.xml Example Data {
"top1pct" : "43.1",
"top5" : "n09428293 seashore, coast, seacoast, sea-coast",
"top4" : "n04371774 swing",
"top3" : "n02894605 breakwater, groin, groyne, mole, bulwark, seawall, jetty",
"top2" : "n03933933 pier",
"top1" : "n03216828 dock, dockage, docking facility",
"top2pct" : "34.3",
"imagefilename" : "/opt/demo/images/201817121004997.jpg",
"top3pct" : "3.8",
"uuid" : "mxnet_uuid_img_20180413140808",
"top4pct" : "2.7",
"top5pct" : "2.4",
"runtime" : "1.0"
} This is example JSON data, we could use any TEXT. References https://docs.tibco.com/pub/sb-lv/2.2.1/doc/html/authoring/jmsoperator.html http://www.sourcefreak.com/2013/06/tibco-ems-sender-and-receiver-in-java/ https://docs.tibco.com/pub/adr3bs/1.2.0/doc/html/GUID-7111BD21-86C6-4F3A-89B3-B03BFCE15E0D.html http://tutorialspedia.com/tibco-ems-how-to-send-and-receive-jms-messages-in-queues/ https://github.com/SolaceLabs/solace-integration-guides/tree/master/src/nifi-jms-jndi https://docs.tibco.com/pub/activematrix_businessworks/6.2.0/doc/html/GUID-624942EB-89A3-400F-A9D1-B906107E6985.html https://github.com/mcqueary/spring-jms-tibco/blob/master/README.md
... View more
Labels:
04-06-2018
09:54 PM
These setup steps may help for your particular machine.
apt-get install curl wget -y
wget https://github.com/bazelbuild/bazel/releases/download/0.11.1/bazel-0.11.1-installer-linux-x86_64.sh
./bazel-0.11.1-installer-linux-x86_64.sh
apt-get install libblas-dev liblapack-dev python-dev libatlas-base-dev gfortran python-setuptools python-h5py -y
pip3 install six numpy wheel
pip3 install --user numpy scipy matplotlib pandas sympy nose
pip3 install --upgrade tensorflow
git clone --recurse-submodules https://github.com/tensorflow/tensorflow
wget http://mirror.jax.hugeserver.com/apache/nifi/minifi/0.4.0/minifi-0.4.0-bin.zip
wget https://storage.googleapis.com/download.tensorflow.org/models/inception5h.zip
wget http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz
... View more