1973
Posts
1225
Kudos Received
124
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2486 | 04-03-2024 06:39 AM | |
| 3836 | 01-12-2024 08:19 AM | |
| 2076 | 12-07-2023 01:49 PM | |
| 3062 | 08-02-2023 07:30 AM | |
| 4195 | 03-29-2023 01:22 PM |
05-17-2018
01:19 PM
https://www.rosehosting.com/blog/how-to-install-python-3-6-4-on-centos-7/
... View more
05-17-2018
12:55 PM
sudo apt-get update sudo apt-get install -y wget python gcc wget https://bootstrap.pypa.io/get-pip.py && sudo python get-pip.py sudo apt-get install graphviz pip install graphviz pip install mxnet --pre
... View more
05-16-2018
08:15 PM
To Build YourApiKeyToken Create an account as Etherscan.io and confirm it. Login Go to My Account Click Developers Click Create Api Key Add an appname Copy the API KEY to end of URL in NiFi
... View more
05-15-2018
02:40 AM
3 Kudos
Integrating Darknet YOLOv3 Into Apache NiFi Workflows Darknet has released a new version of YOLO, version 3. This one is a faster and perhaps more accurate. It's new and shiny and I had to try it. I am liking the results. Flow to Execute Script We call the shell script, then I route out the empty results. I use SplitText to split into individual lines. I use Extract Text with ([^:]+):(.*) to split into our name, value pairs. We also want to process the images produced by YOLOv3. We grab the newest ones from the output directory. I also add meta data extraction and Tensorflow analysis. This data is stored in attributes and can be saved independently via using AttributesToJSON to build a new flow file that we save off separate probably converting into Apache ORC and storing in HDFS for Apache Hive querying. The image file we can store in the cloud, another file system, send to a front end or save in HDFS. Or even email it to someone. The parsed YOLOv3 results in Apache NiFi Attributes. As you can see we would grab labelvalue.1 and labelvalue.2 to do our processing. We may want to send this to JMS or MQTT or Apache Kafka for further display in an application or dashboard. This is an example of the result of our Extract Text. This is the output that we parse with Apache NiFi YOLOv3 also generates an image with rectangles and labels. YOLOv3 does some great classification on multiple items in a picture. I use Python to capture an image from my webcam via OpenCV2. I wrap my call in a shell script that captures the image sends it to Darknet's build of YOLOv3 and send errors to /dev/null. If you have a good GPU, you can compile with CUDA and OPENCV to do real-time off a webcam Example Output: /Volumes/seagate/models/darknet-master/images/yolo_image_img_20180514183707.jpg: Predicted in 26.351510 seconds.
cell phone: 72%
chair: 78%
chair: 72%
chair: 59%
person: 100%
chair: 83% Example Run: ./darknet detect cfg/yolov3.cfg cfg/yolov3.weights /Volumes/seagate/StrataNYC2018/kafka.jpg Source: https://github.com/tspannhw/nifi-yolo3/tree/master Reference: See: https://github.com/pjreddie/darknet See: https://pjreddie.com/darknet/yolo/ Download the training weights and data (https://pjreddie.com/media/files/yolov3.weights) See: https://pjreddie.com/media/files/papers/YOLOv3.pd @article{yolov3,
title={YOLOv3: An Incremental Improvement},
author={Redmon, Joseph and Farhadi, Ali},
journal = {arXiv},
year={2018}
}
... View more
Labels:
05-15-2018
12:18 AM
4 Kudos
Ethereum: Accessing Feeds from Etherscan on Volume, Price and Transaction (Apache NiFi) - Cryptocurrency Part 2 As opposed to EtherDelta that we accessed in our first post (https://community.hortonworks.com/content/kbentry/191146/accessing-feeds-from-etherdelta-on-trades-funds-bu.html) Etherscan has a familiar SSL HTTP REST call returning clean JSON. We are reading three REST APIs for acquiring three sets of interesting Ethereum data including logs, current supply and current price. Overview of Flow REST API Ingest feed for getLogs We split into individual JSON records for processing. Ether Supply and Ether Last Price Flows For these two flows they return very minimal amounts of data with some flags I don't care about (status, message). So I use EvaluateJsonPath to extract just the good fields. Then with UpdateAttribute I add a just created time stamp (now()). Finally I use AttributesToJSON to replace the existing flow file with a new JSON file built from the fields I want only. You can choose to include the system generated metadata attributes automagically, but I turned that off. Sometimes it's nice to have the uuid, filename and other system metadata for tracking. You can also send that data somewhere else via built-in Apache NiFi reporting tools. Our Save Everything Flow We added these three feeds: I added three new feeds to our save everything as Apache ORC files in HDFS flow from the EtherDelta ingest. You can see them on the right in Orange, Indigo and Violet. A rainbow of cryptocurrency storage, I am not sure what colors I will add next for additional feeds. The process works the same as it did for the other data, set a schema name and storage location in HDFS, QueryRecord processor to limit the data and convert to Apache AVRO and then finally merge these smaller Apache AVRO files into one bigger file (HDFS likes fewer, bigger files. In Hadoop 3.1, this is not as big a deal). Final step is to save in HDFS at the location field passed in. This last step in the Data Provenance will have the hive.ddl and HDFS location to build an external Apache Hive table. We can copy that and do that in Apache Zeppelin or you can hook up a PutHiveQL processor and have Apache NiFi build the table for you. I do this in Apache Zeppelin to control table builds. I also add mine to version control. To build this schema, I used the InferAvroSchema earlier then removed it from the flow. I paste the generated schema I grabbed from the schema attribute in Data Provenance into a new schema in Hortonworks Schema Registry, seen below. The schema registry allows for versioning and comparing versions. It's very useful and required for doing record processing in Apache NiFi, Hortonworks Streaming Analytics Manager and other tools. It's also very helpful for Apache Kafka. An example of our newly ingested supply data: An example of a QueryRecord Apache Calcite query: To learn more about querying with this awesome tool, see: http://calcite.apache.org/docs/ Apache Zeppelin Used to Create Our Tables Apache Zeppelin Queries of the Data Before we store data in HDFS, we have to build tables and possibly setup security with either hdfs dfs chmod or with Apache Range. HDFS Build Our Directories for Apache ORC File Storage hdfs dfs -mkdir -p /ethereum/tx
hdfs dfs -mkdir -p /ethereum/supply
hdfs dfs -mkdir -p /ethereum/price Create Our Tables in Hive View 2 or Beeline or Apache Zeppelin CREATE EXTERNAL TABLE IF NOT EXISTS ethereumprice (ethusd STRING, ethbtc STRING, ethusd_timestamp STRING, ethbtc_timestamp STRING, captureDate STRING)
STORED AS ORC
LOCATION '/ethereum/price';
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumtx (address STRING, topics ARRAY<STRING>, data STRING, blockNumber STRING, `timeStamp` STRING, gasPrice STRING, gasUsed STRING, logIndex STRING, transactionHash STRING, transactionIndex STRING)
STORED AS ORC
LOCATION '/ethereum/tx';
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumsupply (totalEther STRING, captureDate STRING)
STORED AS ORC
LOCATION '/ethereum/supply'; SQL Queries to Check out the Data from Apache Zeppelin select * from ethereumsupply order by capturedate desc;
select max(ethusd) from ethereumprice p;
select ethusd, capturedate from ethereumprice order by ethusd_timestamp desc limit 500;
select * from ethereumtx order by `timeStamp` desc;
select * from ethereumsupply order by capturedate desc;
getLogs API (Transactions) from EtherScan https://api.etherscan.io/api?module=logs&action=getLogs
&fromBlock=379224
&toBlock=latest
&address=0x33990122638b9132ca29c723bdf037f1a891a70c
&topic0=0xf63780e752c6a54a94fc52715dbc5518a3b4c3c2833d301a204226548a2a8545
&apikey=YourApiKeyToken Ethereum Price from EtherScan https://api.etherscan.io/api?module=stats&action=ethprice&apikey=YourAPIKeyToken Ethereum Volume / Supply from EtherScan https://api.etherscan.io/api?module=stats&action=ethsupply&apikey=YourApiKeyToken Raw Data Parsed Into Fields By JSON Path
$.status $.message $.result / $.result.ethbtc $.result.ethbtc_timestamp $.result.ethusd $.result.ethusd_timestamp Apache Calcite / Apache NiFi - QueryRecord SQL SELECT captureDate, (CAST(totalEther AS DOUBLE)/1000000000000000000) as totalEther
FROM FLOWFILE
WHERE CAST(totalEther AS DOUBLE) > 0 Build a File Name ${filename:append('ethprice.'):append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})} References
https://etherscan.io/apis#logs https://etherscan.io/ https://www.ethereum.org/ https://etherscan.io/apis#stats https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs https://calcite.apache.org/docs/reference.html NiFi Flow Template: allet.xml
... View more
Labels:
05-12-2018
04:08 AM
4 Kudos
Accessing Feeds from EtherDelta on Trades, Funds, Buys and Sells (Cryptocurrency Analysis) EtherDelta lets you trade Ether or Ethereum-based tokens. Ethereum (https://www.ethereum.org/) is an open-source block chain platform for running smart contracts. They provide a fast web-socket feed of all data coming through the system. We can tap this websocket feed with Apache NiFi to examine and ingest all the trades, funds, buys and sells coming through the system as JSON. Once we ingest, clean up, parse and schematize the data we can run queries on it with Apache Spark SQL and Apache Hive in Apache Zeppelin notebooks. Now the data scientists have a continuing stream of data to play with. Next we will start adding additional feeds from other Ethereum exchanges, Bit Coin APIs and other sources of data. Some of the APIs are REST, some are websockets and some are SDKs. All of these are easy to ingest with Apache NiFi. Initial Ingest from EtherDelta via Web Sockets API Route To The Correct Type To process this feed with Apache NiFi 1.6+: ConnectWebSocket - wss://socket.etherdelta.com/socket.io/?transport=websocket ReplaceText - three to remove extraneous data outside of the JSON (40, 42, 0 - websocket junk text) RouteOnAttribute - filter out too small files RouteOnContent - sell, buy, trades and funds SplitJson buys arrays ($.*.buys) or ($.*.sells) SplitJson buys and sells into individual JSON Records ($.[*]) UpdateAttribute add a schema name Send to Remote Cluster via HTTP/HTTPS On Remote Cluster process and query to limit and convert data to Hive friendly format Store in HDFS NiFi generates DDL for an external Hive table Query with Zeppelin Hand off to other Zeppelin user who is a Data Scientist for machine learning and statistics. Profit! Break apart Sell Records Setup Our Web Socket Client Macintosh Default Java Certificate for SSL (default password is changeit) Connect to the Web Socket Remove the extra '40' Throw away small files! Route for Buys, Funds, Sells and Trades EtherDelta Exchange
https://etherdelta.com/#PPT-ETH https://github.com/etherdelta/bots https://github.com/etherdelta/etherdelta.github.io/blob/master/docs/API.md EtherDelta provides a WebSocket feed of their data, so I am ingesting that with Apache NiFi and breaking out different types of data being published. This will let us ingest into different Apache Hive tables and run some queries and analytics in Apache Zeppelin on this data. We can then make it available to Data Scientists. JsonPath Expressions JsonPath Expression for trades and funds
$.*.*
Split Json Orders Buys
$.*.buys
Split Json Orders Sells
$.*.sells Schemas ethereumfunds
{ "type" : "record", "name" : "ethereumfunds", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0xc40e227f3d5c2e125791cd865e8dd36c4a1a86538f13905ce91d65d2ac721742\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:52:19.000Z\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'" }, { "name" : "kind", "type" : "string", "doc" : "Type inferred from '\"Deposit\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" }, { "name" : "balance", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" } ] }
ethereumtrades
{ "type" : "record", "name" : "ethereumtrades", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0x0350b7b479c9372c07188d69aa642ced7637b05444735653d61316dd852a673c\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T18:32:27.000Z\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.000049874\"'" }, { "name" : "side", "type" : "string", "doc" : "Type inferred from '\"buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"8800\"'" }, { "name" : "amountBase", "type" : "string", "doc" : "Type inferred from '\"0.4388912\"'" }, { "name" : "buyer", "type" : "string", "doc" : "Type inferred from '\"0xd170db528cd2dd6ca67b0b2e3f7cd6e24942dba2\"'" }, { "name" : "seller", "type" : "string", "doc" : "Type inferred from '\"0xecfd625bfc433e8f6c8ce4abb92b9e8f1db3e401\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0x6888a16ea9792c15a4dcf2f6c623d055c8ede792\"'" } ] }
ethereumbuy
{ "type" : "record", "name" : "ethereumbuy", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"9c946737c29c807255c3aac7334e182e375cc3a32684c66ccda03e9f5c52e47e_buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0002743\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x99ea4db9ee77acd40b119bd1dc4e33e1c070b80d\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5590043\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"7586490717308181\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xd04a8f0a1f86fe8e3bdefb717f1bc461cfdb998f705dff3a5fc5567d023ca116\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x7a8b687a8b7faea852ac873c02428acd5c26457282a4105fe12b60776fd87d55\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x0b419BCE1Cb87ADEa84A913Fa903593fB68D33B1\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:41:10.058Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"6853.81\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1.880000083\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }
ethereumsell
{ "type" : "record", "name" : "ethereumsell", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"27b06f4b8caf4aaa6d05841f8daa077f5f2131145331489ae94febc5eddd8c56_sell\"'" }, { "name" : "deleted", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"-1.1627879935162941e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0004473\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"520115069499838335\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0xe3818504c1B32bF1557b16C238B2E01Fd3149C17\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1162787993516294126989\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5589988\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"57125161\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xafd5497f6159ac6589fd1804d27fe05436ed13706e64002f0e82e93b471e1780\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x76dc38410d35069d1a62c08a1976548fbb915d4c52d8c9789157149720b04a33\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x7418b4B9327b2DD18AC90Ef2eF846b36F286adA4\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:36:55.000Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"1.16278799351629411637773277515287799587448e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"1162.7879935162941\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"520115069499838340\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"0.5201150694998383\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }
Example JSON Data {"id":"237ab7693be71d35783941da9686f340d32b6d1e7332eedd0636b3e7b3725b93_sell","deleted":true,"amount":"-6000000000000000000","price":"0.02","tokenGet":"0x0000000000000000000000000000000000000000","amountGet":"120000000000000000","tokenGive":"0x219218f117dc9348b358b8471c55a073e5e0da0b","amountGive":"6000000000000000000","expires":"5594927","nonce":"4013451455","v":27,"r":"0x3965d8a9b074c6dcf25ebe10d39833f5ec6aa2d892aaec057dffd4368cd39f46","s":"0x01820eb919fe725c142f560c501742a89d41ca86b5594fdedc94f0e6f91bc97f","user":"0x0d4F98cb588c18FCC2695e2341112f066A915f80","updated":"2018-05-11T13:58:48.064Z","availableVolume":"798131841361720650","ethAvailableVolume":"0.7981318413617207","availableVolumeBase":"15962636827234412","ethAvailableVolumeBase":"0.01596263682723441","amountFilled":null}
{
"type": "record",
"name": "ethereumfunds",
"fields": [
{
"name": "txHash",
"type": [
"string",
"null"
]
},
{
"name": "date",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"2018-05-10T15:52:19.000Z\"'"
},
{
"name": "tokenAddr",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'"
},
{
"name": "kind",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"Deposit\"'"
},
{
"name": "user",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'"
},
{
"name": "amount",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"5986.826\"'"
},
{
"name": "balance",
"type": [
"string",
"null"
],
"doc": "Type inferred from '\"5986.826\"'"
}
]
}
SQL Table DDL %jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumsell (id STRING, deleted BOOLEAN, amount STRING, price STRING, tokenGet STRING, amountGet STRING, tokenGive STRING, amountGive STRING, expires STRING, nonce STRING, v INT, r STRING, s STRING, `user` STRING, updated STRING, availableVolume STRING, ethAvailableVolume STRING, availableVolumeBase STRING, ethAvailableVolumeBase STRING, amountFilled STRING) STORED AS ORC
LOCATION '/etherdelta/sell'
%sql
select * from ethereumsell order by cast(price as double) desc
%jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumtrades (txHash STRING, `date` STRING, price STRING, side STRING, amount STRING, amountBase STRING, buyer STRING, seller STRING, tokenAddr STRING) STORED AS ORC
LOCATION '/etherdelta/trade'
%jdbc(hive)
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumbuy (id STRING, amount STRING, price STRING, tokenGet STRING, amountGet STRING, tokenGive STRING, amountGive STRING, expires STRING, nonce STRING, v INT, r STRING, s STRING, `user` STRING, updated STRING, availableVolume STRING, ethAvailableVolume STRING, availableVolumeBase STRING, ethAvailableVolumeBase STRING, amountFilled STRING) STORED AS ORC LOCATION '/etherdelta/buy'
SELECT * FROM ethereumbuy
order by cast(price as double) desc
We now have four tables full of different Ethereum Trades, Sells, Buys and Funds Schemas in Hortonworks Schema Registry Storing The Data Run SQL on the Flows Reference:
https://community.hortonworks.com/articles/68378/nifi-websocket-support.html https://gist.github.com/ijokarumawak/60b9ab2038ef906731ebf4c0eee97176 http://ijokarumawak.github.io/nifi/2016/11/04/nifi-websocket/ https://www.ethereum.org/cli http://web3py.readthedocs.io/en/stable/ Source etherdelta-analysis-1.json etherdelta-storage.xml
... View more
Labels:
05-08-2018
01:17 PM
Quick tip: When you create a free Twitter app at apps.twitter.com, they will let you read and write but not a lot. Don't tweet every 5 minutes for hours or you will get that app's write ability blocked. Also be sure you are no reposting SPAM.
... View more
05-07-2018
08:22 PM
That is in com.dataflowdeveloper. It is a one method class I wrote to hold the string.
... View more
05-07-2018
08:19 PM
2 Kudos
Flow We can remove Sentiment if you don't want to install my custom processor: This is what they look like: I grab some fields I like: These are fields I want to save: This is a simple version of the flow to just ingest tweets, run sentiment analysis and store in directory as clean JSON. You can drop the sentiment analysis and do it later. You can also run a python script for that. We could make this simpler and just have GetTwitter then PutFile. This will store the RAW Twitter JSON file which is a very sparse nested JSON file. if you want the raw data, that is an option. It's a pain to work with that format and not perfect for analytics. I flatten it and just grab what I have seen as the core attributes, you can add more or drop some of them easily. This is a simple version that could be used for Art or Personal Projects or anyone who wants to store their own tweets and related items. Get Your Twitter ID: https://tweeterid.com/ Documentation: https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object Create Your Application: https://apps.twitter.com/ https://apps.twitter.com/app/new Application Settings You Need: Consumer Key (API Key) and Consumer Secret (API Secret) Your Access Token You Need: Access Token and Access Token Secret. Make sure you keep the secrets secure as you don't want people tweeting in your name or reading your stuff. You will place these in the GetTwitter processor. Click start once you add that. You can filter out languages like en for English and es for Spanish. We just save these JSON files to a directory for later use. We could also aggregate them and compress them if you like. Or send them to an Amazon S3, email them, or whatever. We can also retweet those, but now we are getting fancy and we already wrote that article this morning. Custom Processor: https://github.com/tspannhw/nifi-corenlp-processor Example Tweet in JSON Stored: {
"msg" : "RT @PaasDev Tim said @ApacheNiFi is awesome",
"unixtime" : "1525724645676",
"friends_count" : "5268",
"sentiment" : "POSITIVE",
"hashtags" : "[\"ApacheNiFi\"]",
"listed_count" : "25",
"tweet_id" : "993587294715203584",
"user_name" : "Tim Spann",
"favourites_count" : "5348",
"source" : "NiFiTweetBot",
"placename" : "",
"media_url" : "[]",
"retweet_count" : "0",
"user_mentions_name" : "[]",
"geo" : "",
"urls" : "[]",
"countryCode" : "",
"user_url" : "",
"place" : "",
"timestamp" : "1525724645676",
"coordinates" : "",
"handle" : "PaasDev",
"profile_image_url" : "http://pbs.twimg.com/profile_images/34343/34343.jpg",
"time_zone" : "Eastern Time (US & Canada)",
"ext_media" : "[]",
"statuses_count" : "5994",
"followers_count" : "1963",
"location" : "Princeton, NJ",
"time" : "Mon May 07 20:24:05 +0000 2018",
"user_mentions" : "[]",
"user_description" : "Tim NiFi Guy"
} Download and Import to Apache NiFi This Template simplenifitwitter.xml Setup Get some Apache NiFi https://www.apache.org/dyn/closer.lua?path=/nifi/1.6.0/nifi-1.6.0-bin.zip Unzip it. On some Linux's you may need to apt-get install unzip or yum install unzip. You may need to be root, so you can do something like sudo su. You will need Java installed. For a low cost small Linux server, you can use one of these two services, and they also tell you how to install Java. There are many low cost options. This application is small enough to also run on your laptop, an old desktop PC or a small cloud instance. https://www.digitalocean.com/community/tutorials/how-to-install-java-on-centos-and-fedora https://www.linode.com/docs/development/java/install-java-on-centos/ https://www.digitalocean.com/community/tutorials/how-to-install-java-with-apt-get-on-ubuntu-16-04 Generally something like this:
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update sudo apt-get install oracle-java8-installer or sudo yum install java-1.8.0-openjdk-devel OpenJDK 8 or Oracle JDK 8 are perfect. You can also run some Docker containers if you like that sort of thing: https://github.com/minyk/nifi-sandbox You can also download one of the Hortonworks HDF 3.1 Sandboxes to run this as well: https://hortonworks.com/downloads/#sandbox Those have Apache NiFi and Java preinstalled! Here are some Docker Instructions: https://hortonworks.com/tutorial/sandbox-deployment-and-install-guide/section/3/ https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.1.1/bk_installing-nifi/content/ch_nifi-installation.xml.html Resources: https://github.com/tspannhw?utf8=%E2%9C%93&tab=repositories&q=&type=source&language= https://community.hortonworks.com/articles/81270/adding-stanford-corenlp-to-big-data-pipelines-apac-1.html https://community.hortonworks.com/articles/80418/open-nlp-example-apache-nifi-processor.html https://community.hortonworks.com/articles/177370/extracting-html-from-pdf-excel-and-word-documents.html https://community.hortonworks.com/articles/76935/using-sentiment-analysis-and-nlp-tools-with-hdp-25.html https://community.hortonworks.com/articles/163776/parsing-any-document-with-apache-nifi-15-with-apac.html https://community.hortonworks.com/content/kbentry/189735/automating-social-media-sending-tweets-with-apache.html
... View more
Labels:
05-07-2018
01:39 PM
3 Kudos
Automating Social Media: Sending Tweets with Apache NiFi: Building A Smart Bot This is the new processor, PutTwitterProcessor, it will use Twitter4J to send tweets from a parameter you set. This is an example tweet and my reply to it. It's a regular tweet. This is the result of a run. I put the tweet id and datetime stamp in the result attribute. Another example: You need to create a Twitter application and obtain the Consumer Key, Consumer Secret, Access Token and Access secret. If you have a lat and long in your data you can send that. It's optional. The message takes expression language and lets you build a message. JUnit Test of my Processor JUnit Run Results Most of the code is using the very easy to use Twitter4J library. So what can I do with this? In my flow I am ingesting Tweets and with a streaming SQL query I can retweet ones where there's no existing retweet and the followers of the tweet are more than 1,000. I can also feed all this information to a Hive table and then run some machine learning on it to figure out other parameters to filter on for my bot. This is a start. I put a scheduler on my PutTweet to only tweet every 10 minutes (600 seconds). Let's see how this goes. SQL for QueryRecord SELECT * FROM FLOWFILE WHERE CAST(retweet_count AS DOUBLE) <= 0 AND CAST(followers_count AS DOUBLE) > 1000 Example Message Building ${'user_name':append( ' '):append( ${'location'} ):append(' '):append( ${'hashtags'} ):append(' ' ):append(${'msg'})} Source: https://github.com/tspannhw/nifi-puttwitter-processor Download NAR to install in Apache NiFi lib directory and then restart: https://github.com/tspannhw/nifi-puttwitter-processor/releases/tag/1.0
... View more
Labels: