Community Articles
Find and share helpful community-sourced technical articles.
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.
Labels (1)
Super Guru

Ethereum: Accessing Feeds from Etherscan on Volume, Price and Transaction (Apache NiFi) - Cryptocurrency Part 2

As opposed to EtherDelta that we accessed in our first post ( Etherscan has a familiar SSL HTTP REST call returning clean JSON. We are reading three REST APIs for acquiring three sets of interesting Ethereum data including logs, current supply and current price.



Overview of Flow


REST API Ingest feed for getLogs


We split into individual JSON records for processing.

Ether Supply and Ether Last Price Flows


For these two flows they return very minimal amounts of data with some flags I don't care about (status, message). So I use EvaluateJsonPath to extract just the good fields. Then with UpdateAttribute I add a just created time stamp (now()). Finally I use AttributesToJSON to replace the existing flow file with a new JSON file built from the fields I want only. You can choose to include the system generated metadata attributes automagically, but I turned that off. Sometimes it's nice to have the uuid, filename and other system metadata for tracking. You can also send that data somewhere else via built-in Apache NiFi reporting tools.

Our Save Everything Flow


We added these three feeds:


I added three new feeds to our save everything as Apache ORC files in HDFS flow from the EtherDelta ingest. You can see them on the right in Orange, Indigo and Violet. A rainbow of cryptocurrency storage, I am not sure what colors I will add next for additional feeds. The process works the same as it did for the other data, set a schema name and storage location in HDFS, QueryRecord processor to limit the data and convert to Apache AVRO and then finally merge these smaller Apache AVRO files into one bigger file (HDFS likes fewer, bigger files. In Hadoop 3.1, this is not as big a deal). Final step is to save in HDFS at the location field passed in. This last step in the Data Provenance will have the hive.ddl and HDFS location to build an external Apache Hive table. We can copy that and do that in Apache Zeppelin or you can hook up a PutHiveQL processor and have Apache NiFi build the table for you. I do this in Apache Zeppelin to control table builds. I also add mine to version control. To build this schema, I used the InferAvroSchema earlier then removed it from the flow. I paste the generated schema I grabbed from the schema attribute in Data Provenance into a new schema in Hortonworks Schema Registry, seen below. The schema registry allows for versioning and comparing versions. It's very useful and required for doing record processing in Apache NiFi, Hortonworks Streaming Analytics Manager and other tools. It's also very helpful for Apache Kafka.



An example of our newly ingested supply data:


An example of a QueryRecord Apache Calcite query:


To learn more about querying with this awesome tool, see:

Apache Zeppelin Used to Create Our Tables


Apache Zeppelin Queries of the Data


Before we store data in HDFS, we have to build tables and possibly setup security with either hdfs dfs chmod or with Apache Range.

HDFS Build Our Directories for Apache ORC File Storage

hdfs dfs -mkdir -p /ethereum/tx

hdfs dfs -mkdir -p /ethereum/supply

hdfs dfs -mkdir -p /ethereum/price

Create Our Tables in Hive View 2 or Beeline or Apache Zeppelin

CREATE EXTERNAL TABLE IF NOT EXISTS ethereumprice (ethusd STRING, ethbtc STRING, ethusd_timestamp STRING, ethbtc_timestamp STRING, captureDate STRING) 
LOCATION '/ethereum/price';

CREATE EXTERNAL TABLE IF NOT EXISTS ethereumtx (address STRING, topics ARRAY<STRING>, data STRING, blockNumber STRING, `timeStamp` STRING, gasPrice STRING, gasUsed STRING, logIndex STRING, transactionHash STRING, transactionIndex STRING) 
LOCATION '/ethereum/tx';

CREATE EXTERNAL TABLE IF NOT EXISTS ethereumsupply (totalEther STRING, captureDate STRING) 
LOCATION '/ethereum/supply';

SQL Queries to Check out the Data from Apache Zeppelin

select * from ethereumsupply order by capturedate desc;

select max(ethusd)  from ethereumprice p;

select ethusd, capturedate from ethereumprice order by ethusd_timestamp desc limit 500;

select * from ethereumtx order by `timeStamp` desc;

select * from ethereumsupply order by capturedate desc;

getLogs API (Transactions) from EtherScan &fromBlock=379224 &toBlock=latest &address=0x33990122638b9132ca29c723bdf037f1a891a70c &topic0=0xf63780e752c6a54a94fc52715dbc5518a3b4c3c2833d301a204226548a2a8545 &apikey=YourApiKeyToken

Ethereum Price from EtherScan

Ethereum Volume / Supply from EtherScan

Raw Data Parsed Into Fields By JSON Path

  • $.status
  • $.message
  • $.result /
  • $.result.ethbtc
  • $.result.ethbtc_timestamp
  • $.result.ethusd
  • $.result.ethusd_timestamp

Apache Calcite / Apache NiFi - QueryRecord SQL

SELECT captureDate, (CAST(totalEther AS DOUBLE)/1000000000000000000) as totalEther
WHERE CAST(totalEther AS DOUBLE)  > 0 

Build a File Name



NiFi Flow Template:


Super Guru

To Build YourApiKeyToken

  • Create an account as and confirm it.
  • Login
  • Go to My Account
  • Click Developers
  • Click Create Api Key


  • Add an appname


  • Copy the API KEY to end of URL in NiFi
Don't have an account?
Version history
Last update:
‎08-17-2019 07:24 AM
Updated by:
Top Kudoed Authors