Created on 01-25-2019 09:01 PM - edited 08-17-2019 04:56 AM
Introduction
SoChain provides a fast set of public freely available APIs (don't abuse them) to access information on various networks.
If you need this for critical work, please donate: https://chain.so/address/devfund.
One of the things you will see in this simple flow is that NiFi excels in ingesting REST and working with JSON. As you can see with split it up, shred it, filter it, manipulate and extract from it. With the resulting usable objects we build a schema that will also us to do record processing. Once we have a set of records with a schema I can store it to
I just hosted a Future of Data Princeton Meetup in Woodbridge New Jersey with some amazing speakers sponsored by ChainNinja. While this was all about Blockchain for Enterprise and no cryptocurrency was involved it made we want to investigate some cryptocurrency data. As you can see, manipulating complex JSON data, filtering, modifying, routing and scripting with it's values is trivial in Apache NiFi.
In my next article I am investigating Hyperledger and Ethereum for enterprise solutions integration with Apache NiFi, Impala, Hive, Kudu, HBase, Spark, Kafka and other enterprise technologies.
Steps
We read from the URL
I send the original file to immutable HDFS storage.
In another branch, I will use EvaluateJSONPath to pull out one attribute to use to get detail records.
$.data.blocks
I use that attribute to build a deeper REST call to get the details for the latest block.
https://chain.so/api/v2/block/BTC/${block_no}
This is in invokeHTTP which is a scriptable HTTP(s) call. This comes in handy often.
In the next EvaluateJSONPath I pull out all the high level attributes of the JSON file. I want these for all the records as master fields. These are repeated.
After that I split out the two arrays of data beneath that into two separate branches. I will breach these down into individual records for parsing. I could also apply a schema and handle these are groups of records.
This is an example of reading a REST API and creating a unique name per call. Also notice it's easy to handle HTTPS as well as HTTP.
SoChain Ingest Flow for REST APIs Calls
Example Unique File Name we can script
${filename:append('btc.'):append(${now():format('yyyymmddHHMMSS'):append(${md5}):append('.json')})}
REST URLs
https://chain.so/api/v2/get_info/BTC
https://chain.so/api/v2/get_price/BTC/USD
https://chain.so/api/v2/get_info/DOGE
https://chain.so/api/v2/get_info/LTC
Example of the Value of the Apache NiFi Provenance. (These are the attributes acquired for one flowfile).
Attribute Values Access-Control-Allow-Headers Origin,Accept,Content-Type,X-Requested-With,X-CSRF-Token Access-Control-Allow-Methods GET,POST Access-Control-Allow-Origin * CF-RAY 49e564b17e23923c-EWR Cache-Control no-cache, no-store, max-age=0, must-revalidate Connection keep-alive Content-Type application/json; charset=utf-8 Date Thu, 24 Jan 2019 20:54:07 GMT Expect-CT max-age=604800, report-uri="https://report-uri.cloudflare.com/cdn-cgi/beacon/expect-ct" Expires Fri, 01 Jan 1990 00:00:00 GMT Pragma no-cache Server cloudflare Set-Cookie __cfduid=d6f52ee1552c73223442296ff7230e9fd1548363246; expires=Fri, 24-Jan-20 20:54:06 GMT; path=/; domain=.chain.so; HttpOnly, _mkra_ctxt=1a7dafd219c4972a7562f232dc63f524--200; path=/; max-age=5 Status 200 OK Strict-Transport-Security max-age=31536000;includeSubDomains Transfer-Encoding chunked X-Content-Type-Options nosniff X-Download-Options noopen X-Frame-Options SAMEORIGIN X-Request-Id 20d3f592-50b6-40cf-a496-a6f915eb463b X-Runtime 1.018401 X-XSS-Protection 1; mode=block bits 172fd633 block_no 559950 blockhash 0000000000000000001c68f61ddcc30568536a583c843a7d0c9606b9582fd7e5 fee 0.05142179 filename btc.201949241501759.json fragment.count 1 fragment.identifier cec10691-82e9-402b-84a9-7901b084f10a fragment.index 0 gethttp.remote.source chain.so invokehttp.remote.dn CN=ssl371663.cloudflaressl.com,OU=PositiveSSL Multi-Domain,OU=Domain Control Validated invokehttp.request.url https://chain.so/api/v2/block/BTC/559950 invokehttp.status.code 200 invokehttp.status.message OK invokehttp.tx.id bc8a0a18-0685-4a2c-97fa-34541b9ea929 merkleroot 41eb6f68477e96c9239ae1bbe4e5d4d02529c6f7faebc4ad801730d09609a0ef mime.type application/json; charset=utf-8 mining_difficulty 5883988430955.408 network BTC next_blockhash Empty string set nonce 1358814296 path ./ previous_blockhash 0000000000000000001b2b3d3b5741462fe31981a6c0ae9335ed8851e936664b schema chainsotxinputinfo schema.name chainsotxinputinfo segment.original.filename btc.201949241501759.json sent_value 3977.10078351 size 470242 time 1548362873 uuid 3c1d72b4-e993-4b32-a679-0741a44aeefb
An example input record:
{ "input_no" : 0, "address" : "3N7Vid17hE1ofGcWR6bWEmtQBQ8kKQ7iKW", "value" : "0.20993260", "received_from" : { "txid" : "4e0f00cddb8e3d98de7f645684dc7526468d1dc33efbbf0bc173ed19c6556896", "output_no" : 4 } }
{ "status" : "success", "data" : { "name" : "Litecoin", "acronym" : "LTC", "network" : "LTC", "symbol_htmlcode" : "Ł", "url" : "http://www.litecoin.com/", "mining_difficulty" : "6399667.35869154", "unconfirmed_txs" : 8, "blocks" : 1567929, "price" : "0.00000000", "price_base" : "BTC", "price_update_time" : 1548451214, "hashrate" : "178582229079753" } }
Example NiFi Flow