Created on 05-15-2018 12:18 AM - edited 08-17-2019 07:24 AM
Ethereum: Accessing Feeds from Etherscan on Volume, Price and Transaction (Apache NiFi) - Cryptocurrency Part 2
As opposed to EtherDelta that we accessed in our first post (https://community.hortonworks.com/content/kbentry/191146/accessing-feeds-from-etherdelta-on-trades-f...) Etherscan has a familiar SSL HTTP REST call returning clean JSON. We are reading three REST APIs for acquiring three sets of interesting Ethereum data including logs, current supply and current price.
Overview of Flow
REST API Ingest feed for getLogs
We split into individual JSON records for processing.
Ether Supply and Ether Last Price Flows
For these two flows they return very minimal amounts of data with some flags I don't care about (status, message). So I use EvaluateJsonPath to extract just the good fields. Then with UpdateAttribute I add a just created time stamp (now()). Finally I use AttributesToJSON to replace the existing flow file with a new JSON file built from the fields I want only. You can choose to include the system generated metadata attributes automagically, but I turned that off. Sometimes it's nice to have the uuid, filename and other system metadata for tracking. You can also send that data somewhere else via built-in Apache NiFi reporting tools.
Our Save Everything Flow
We added these three feeds:
I added three new feeds to our save everything as Apache ORC files in HDFS flow from the EtherDelta ingest. You can see them on the right in Orange, Indigo and Violet. A rainbow of cryptocurrency storage, I am not sure what colors I will add next for additional feeds. The process works the same as it did for the other data, set a schema name and storage location in HDFS, QueryRecord processor to limit the data and convert to Apache AVRO and then finally merge these smaller Apache AVRO files into one bigger file (HDFS likes fewer, bigger files. In Hadoop 3.1, this is not as big a deal). Final step is to save in HDFS at the location field passed in. This last step in the Data Provenance will have the hive.ddl and HDFS location to build an external Apache Hive table. We can copy that and do that in Apache Zeppelin or you can hook up a PutHiveQL processor and have Apache NiFi build the table for you. I do this in Apache Zeppelin to control table builds. I also add mine to version control. To build this schema, I used the InferAvroSchema earlier then removed it from the flow. I paste the generated schema I grabbed from the schema attribute in Data Provenance into a new schema in Hortonworks Schema Registry, seen below. The schema registry allows for versioning and comparing versions. It's very useful and required for doing record processing in Apache NiFi, Hortonworks Streaming Analytics Manager and other tools. It's also very helpful for Apache Kafka.
An example of our newly ingested supply data:
An example of a QueryRecord Apache Calcite query:
To learn more about querying with this awesome tool, see: http://calcite.apache.org/docs/
Apache Zeppelin Used to Create Our Tables
Apache Zeppelin Queries of the Data
Before we store data in HDFS, we have to build tables and possibly setup security with either hdfs dfs chmod or with Apache Range.
HDFS Build Our Directories for Apache ORC File Storage
hdfs dfs -mkdir -p /ethereum/tx hdfs dfs -mkdir -p /ethereum/supply hdfs dfs -mkdir -p /ethereum/price
Create Our Tables in Hive View 2 or Beeline or Apache Zeppelin
CREATE EXTERNAL TABLE IF NOT EXISTS ethereumprice (ethusd STRING, ethbtc STRING, ethusd_timestamp STRING, ethbtc_timestamp STRING, captureDate STRING) STORED AS ORC LOCATION '/ethereum/price'; CREATE EXTERNAL TABLE IF NOT EXISTS ethereumtx (address STRING, topics ARRAY<STRING>, data STRING, blockNumber STRING, `timeStamp` STRING, gasPrice STRING, gasUsed STRING, logIndex STRING, transactionHash STRING, transactionIndex STRING) STORED AS ORC LOCATION '/ethereum/tx'; CREATE EXTERNAL TABLE IF NOT EXISTS ethereumsupply (totalEther STRING, captureDate STRING) STORED AS ORC LOCATION '/ethereum/supply';
SQL Queries to Check out the Data from Apache Zeppelin
select * from ethereumsupply order by capturedate desc; select max(ethusd) from ethereumprice p; select ethusd, capturedate from ethereumprice order by ethusd_timestamp desc limit 500; select * from ethereumtx order by `timeStamp` desc; select * from ethereumsupply order by capturedate desc;
getLogs API (Transactions) from EtherScan
https://api.etherscan.io/api?module=logs&action=getLogs &fromBlock=379224 &toBlock=latest &address=0x33990122638b9132ca29c723bdf037f1a891a70c &topic0=0xf63780e752c6a54a94fc52715dbc5518a3b4c3c2833d301a204226548a2a8545 &apikey=YourApiKeyToken
Ethereum Price from EtherScan
Ethereum Volume / Supply from EtherScan
Raw Data Parsed Into Fields By JSON Path
Apache Calcite / Apache NiFi - QueryRecord SQL
SELECT captureDate, (CAST(totalEther AS DOUBLE)/1000000000000000000) as totalEther FROM FLOWFILE WHERE CAST(totalEther AS DOUBLE) > 0
Build a File Name
NiFi Flow Template: