Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
Labels (1)
Super Guru

Accessing Feeds from EtherDelta on Trades, Funds, Buys and Sells (Cryptocurrency Analysis)


EtherDelta lets you trade Ether or Ethereum-based tokens. Ethereum (https://www.ethereum.org/) is an open-source block chain platform for running smart contracts. They provide a fast web-socket feed of all data coming through the system. We can tap this websocket feed with Apache NiFi to examine and ingest all the trades, funds, buys and sells coming through the system as JSON. Once we ingest, clean up, parse and schematize the data we can run queries on it with Apache Spark SQL and Apache Hive in Apache Zeppelin notebooks. Now the data scientists have a continuing stream of data to play with. Next we will start adding additional feeds from other Ethereum exchanges, Bit Coin APIs and other sources of data. Some of the APIs are REST, some are websockets and some are SDKs. All of these are easy to ingest with Apache NiFi.

72765-pahab.png

Initial Ingest from EtherDelta via Web Sockets API

72769-feedingestpart1.png

Route To The Correct Type

72770-feedingestpart2.png

To process this feed with Apache NiFi 1.6+:

  1. ConnectWebSocket - wss://socket.etherdelta.com/socket.io/?transport=websocket
  2. ReplaceText - three to remove extraneous data outside of the JSON (40, 42, 0 - websocket junk text)
  3. RouteOnAttribute - filter out too small files
  4. RouteOnContent - sell, buy, trades and funds
  5. SplitJson buys arrays ($.*.buys) or ($.*.sells)
  6. SplitJson buys and sells into individual JSON Records ($.[*])
  7. UpdateAttribute add a schema name
  8. Send to Remote Cluster via HTTP/HTTPS
  9. On Remote Cluster process and query to limit and convert data to Hive friendly format
  10. Store in HDFS
  11. NiFi generates DDL for an external Hive table
  12. Query with Zeppelin
  13. Hand off to other Zeppelin user who is a Data Scientist for machine learning and statistics.
  14. Profit!



Break apart Sell Records

72768-sells.png

Setup Our Web Socket Client

72772-websocketconfiguration.png

Macintosh Default Java Certificate for SSL (default password is changeit)

72773-sslonmac.png

Connect to the Web Socket

72774-websocketclient.png

Remove the extra '40'

72775-removeleading40.png

Throw away small files!

72776-throwawaysmallorempty.png

Route for Buys, Funds, Sells and Trades

72777-routeoncontent.png


EtherDelta Exchange

EtherDelta provides a WebSocket feed of their data, so I am ingesting that with Apache NiFi and breaking out different types of data being published. This will let us ingest into different Apache Hive tables and run some queries and analytics in Apache Zeppelin on this data. We can then make it available to Data Scientists.

JsonPath Expressions

JsonPath Expression for trades and funds
$.*.* 

Split Json Orders Buys
$.*.buys 

Split Json Orders Sells
$.*.sells


Schemas

ethereumfunds
{ "type" : "record", "name" : "ethereumfunds", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0xc40e227f3d5c2e125791cd865e8dd36c4a1a86538f13905ce91d65d2ac721742\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:52:19.000Z\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'" }, { "name" : "kind", "type" : "string", "doc" : "Type inferred from '\"Deposit\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" }, { "name" : "balance", "type" : "string", "doc" : "Type inferred from '\"5986.826\"'" } ] }
ethereumtrades
{ "type" : "record", "name" : "ethereumtrades", "fields" : [ { "name" : "txHash", "type" : "string", "doc" : "Type inferred from '\"0x0350b7b479c9372c07188d69aa642ced7637b05444735653d61316dd852a673c\"'" }, { "name" : "date", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T18:32:27.000Z\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.000049874\"'" }, { "name" : "side", "type" : "string", "doc" : "Type inferred from '\"buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"8800\"'" }, { "name" : "amountBase", "type" : "string", "doc" : "Type inferred from '\"0.4388912\"'" }, { "name" : "buyer", "type" : "string", "doc" : "Type inferred from '\"0xd170db528cd2dd6ca67b0b2e3f7cd6e24942dba2\"'" }, { "name" : "seller", "type" : "string", "doc" : "Type inferred from '\"0xecfd625bfc433e8f6c8ce4abb92b9e8f1db3e401\"'" }, { "name" : "tokenAddr", "type" : "string", "doc" : "Type inferred from '\"0x6888a16ea9792c15a4dcf2f6c623d055c8ede792\"'" } ] }
ethereumbuy
{ "type" : "record", "name" : "ethereumbuy", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"9c946737c29c807255c3aac7334e182e375cc3a32684c66ccda03e9f5c52e47e_buy\"'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0002743\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x99ea4db9ee77acd40b119bd1dc4e33e1c070b80d\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5590043\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"7586490717308181\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xd04a8f0a1f86fe8e3bdefb717f1bc461cfdb998f705dff3a5fc5567d023ca116\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x7a8b687a8b7faea852ac873c02428acd5c26457282a4105fe12b60776fd87d55\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x0b419BCE1Cb87ADEa84A913Fa903593fB68D33B1\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:41:10.058Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"6.85381e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"6853.81\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1880000083000000000\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"1.880000083\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }
ethereumsell
{ "type" : "record", "name" : "ethereumsell", "fields" : [ { "name" : "id", "type" : "string", "doc" : "Type inferred from '\"27b06f4b8caf4aaa6d05841f8daa077f5f2131145331489ae94febc5eddd8c56_sell\"'" }, { "name" : "deleted", "type" : "boolean", "doc" : "Type inferred from 'true'" }, { "name" : "amount", "type" : "string", "doc" : "Type inferred from '\"-1.1627879935162941e+21\"'" }, { "name" : "price", "type" : "string", "doc" : "Type inferred from '\"0.0004473\"'" }, { "name" : "tokenGet", "type" : "string", "doc" : "Type inferred from '\"0x0000000000000000000000000000000000000000\"'" }, { "name" : "amountGet", "type" : "string", "doc" : "Type inferred from '\"520115069499838335\"'" }, { "name" : "tokenGive", "type" : "string", "doc" : "Type inferred from '\"0xe3818504c1B32bF1557b16C238B2E01Fd3149C17\"'" }, { "name" : "amountGive", "type" : "string", "doc" : "Type inferred from '\"1162787993516294126989\"'" }, { "name" : "expires", "type" : "string", "doc" : "Type inferred from '\"5589988\"'" }, { "name" : "nonce", "type" : "string", "doc" : "Type inferred from '\"57125161\"'" }, { "name" : "v", "type" : "int", "doc" : "Type inferred from '28'" }, { "name" : "r", "type" : "string", "doc" : "Type inferred from '\"0xafd5497f6159ac6589fd1804d27fe05436ed13706e64002f0e82e93b471e1780\"'" }, { "name" : "s", "type" : "string", "doc" : "Type inferred from '\"0x76dc38410d35069d1a62c08a1976548fbb915d4c52d8c9789157149720b04a33\"'" }, { "name" : "user", "type" : "string", "doc" : "Type inferred from '\"0x7418b4B9327b2DD18AC90Ef2eF846b36F286adA4\"'" }, { "name" : "updated", "type" : "string", "doc" : "Type inferred from '\"2018-05-10T15:36:55.000Z\"'" }, { "name" : "availableVolume", "type" : "string", "doc" : "Type inferred from '\"1.16278799351629411637773277515287799587448e+21\"'" }, { "name" : "ethAvailableVolume", "type" : "string", "doc" : "Type inferred from '\"1162.7879935162941\"'" }, { "name" : "availableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"520115069499838340\"'" }, { "name" : "ethAvailableVolumeBase", "type" : "string", "doc" : "Type inferred from '\"0.5201150694998383\"'" }, { "name" : "amountFilled", "type" : "null", "doc" : "Type inferred from 'null'" } ] }




Example JSON Data

{"id":"237ab7693be71d35783941da9686f340d32b6d1e7332eedd0636b3e7b3725b93_sell","deleted":true,"amount":"-6000000000000000000","price":"0.02","tokenGet":"0x0000000000000000000000000000000000000000","amountGet":"120000000000000000","tokenGive":"0x219218f117dc9348b358b8471c55a073e5e0da0b","amountGive":"6000000000000000000","expires":"5594927","nonce":"4013451455","v":27,"r":"0x3965d8a9b074c6dcf25ebe10d39833f5ec6aa2d892aaec057dffd4368cd39f46","s":"0x01820eb919fe725c142f560c501742a89d41ca86b5594fdedc94f0e6f91bc97f","user":"0x0d4F98cb588c18FCC2695e2341112f066A915f80","updated":"2018-05-11T13:58:48.064Z","availableVolume":"798131841361720650","ethAvailableVolume":"0.7981318413617207","availableVolumeBase":"15962636827234412","ethAvailableVolumeBase":"0.01596263682723441","amountFilled":null}

{
 "type": "record",
 "name": "ethereumfunds",
 "fields": [
  {
   "name": "txHash",
   "type": [
    "string",
    "null"
   ]
  },
  {
   "name": "date",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"2018-05-10T15:52:19.000Z\"'"
  },
  {
   "name": "tokenAddr",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"0xfb1e5f5e984c28ad7e228cdaa1f8a0919bb6a09b\"'"
  },
  {
   "name": "kind",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"Deposit\"'"
  },
  {
   "name": "user",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"0xb975cf6c40f9cc5ad5cb7a335f16bdaab6cdcf0d\"'"
  },
  {
   "name": "amount",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"5986.826\"'"
  },
  {
   "name": "balance",
   "type": [
    "string",
    "null"
   ],
   "doc": "Type inferred from '\"5986.826\"'"
  }
 ]
}






SQL Table DDL

%jdbc(hive)


CREATE EXTERNAL TABLE IF NOT EXISTS ethereumsell (id STRING, deleted BOOLEAN, amount STRING, price STRING, tokenGet STRING, amountGet STRING, tokenGive STRING, amountGive STRING, expires STRING, nonce STRING, v INT, r STRING, s STRING, `user` STRING, updated STRING, availableVolume STRING, ethAvailableVolume STRING, availableVolumeBase STRING, ethAvailableVolumeBase STRING, amountFilled STRING) STORED AS ORC
LOCATION '/etherdelta/sell'

%sql

select * from ethereumsell order by cast(price as double) desc


%jdbc(hive)


CREATE EXTERNAL TABLE IF NOT EXISTS ethereumtrades (txHash STRING, `date` STRING, price STRING, side STRING, amount STRING, amountBase STRING, buyer STRING, seller STRING, tokenAddr STRING) STORED AS ORC
LOCATION '/etherdelta/trade'

%jdbc(hive)


CREATE EXTERNAL TABLE IF NOT EXISTS ethereumbuy (id STRING, amount STRING, price STRING, tokenGet STRING, amountGet STRING, tokenGive STRING, amountGive STRING, expires STRING, nonce STRING, v INT, r STRING, s STRING, `user` STRING, updated STRING, availableVolume STRING, ethAvailableVolume STRING, availableVolumeBase STRING, ethAvailableVolumeBase STRING, amountFilled STRING) STORED AS ORC LOCATION '/etherdelta/buy'




SELECT * FROM ethereumbuy
 order by  cast(price as double) desc











We now have four tables full of different Ethereum Trades, Sells, Buys and Funds

72788-zepbuyandfunds.png

72789-zepethereumbuy.png

72790-zeptrades.png

Schemas in Hortonworks Schema Registry

72791-ethereumschema2.png

Storing The Data

72792-flowremoteinputs.png

72797-etherdeltastorage2.png

Run SQL on the Flows

72793-queryinstream.png

Reference:

Source


flowpricesql.pngprocessingpart1.png
963 Views
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
2 of 2
Last update:
‎08-17-2019 07:31 AM
Updated by:
 
Contributors
Top Kudoed Authors