Stock market is always a thing when it comes to data analysis and examples, so how can we easily create an ingestion pipeline that allows quick insights with Cloudera Data Platform?
How about using the new Iceberg table format available in CDP to get new features with everything integrated and secured? Introducing Apache Iceberg in Cloudera Data Platform
In this example, I'll show how to use Cloudera Public Cloud to get data from Rest API and (with only clicks/parameters input) start quickly querying your data, dashboard, and advanced analytics.
In this example we will:
Also with Iceberg, you can perform Schema Evolution and benefit of open source optimizations and engine interoperability using open formats (like parquet).
Although the stock information is D-1, we will schedule to run each 10 minutes to identify if there is a new stock ticker in the parameters and ingest the data.
Again all without coding and only using saved templates!! Following is the architecture for this example:
To download the stock information, we will use the free API (at the time of this writing) from Alpha Vantage. So, first you will need to register to get your API key that will be used and save it.
Also, you will need the name of the bucket path where you will save the data. Now with all the information that we need (API Key, Bucket) let's start!
Following is the list of parameters that we will need to fill:
The components of Cloudera Data Platform that needs to be available are:
Done! In minutes we will start receiving stock information into our bucket! If you want you can check in your bucket under the path s3a://<cdpbucket>/user/<yourusername>/<cdppath>/new:
For this, you can use the script to create the tables below in an Impala Virtual Warehouse connected to your SDX environment in CDP:
CREATE DATABASE stocks; CREATE TABLE IF NOT EXISTS stocks.stock_intraday_1min (interv STRING,output_size STRING,time_zone STRING,open DECIMAL(8,4),high DECIMAL(8,4),low DECIMAL(8,4),close DECIMAL(8,4),volume BIGINT ) PARTITIONED BY (ticker STRING,last_refreshed string,refreshed_at string) STORED AS iceberg;
Go to Cloudera Data Warehouse UI to access Hue and we will create an Iceberg table to be used by our queries:
Leave Hue open to query data later.
Note: You can change the database name for this and next example I'm using stocks.
Now we will use Cloudera Data Engineering to check the files in the object storage, compare if it's new data, and insert them into the Iceberg table. For this, download the jar, and in Cloudera CDE UI, go your Virtual Spark Cluster and click View Jobs
This job will run each 10 minutes to check if there's any new ticker. To run now for first time, click the 3 dots under actions of the job and click "Run Now":
Click in "Job Runs" to check if the job is done. It will take around 3 minutes to spin up the resources in Kubernetes and execute the pipeline to ingest into the final table the new data.
Also, you can check the cluster:
This application is very simple, it will:
After execution, the processed files will be in your bucket but under the "processed"+date directory:
Now let's query data!
Now we should have the data ingested, let's go back to Hue and select the data in the table stocks.stock_intraday_1min:
In Cloudera Data Visualization, I can also select this table to create a new Dataset called "Stocks" and create visualizations:
For example, stocks by volume:
Also, you can use Cloudera CDP tools to ingest data from other sources and create your own stock analyzer platform!
Apache Iceberg delivers the ability to:
And a lot of other things that you can benefit from it. Also, it's engine-independent and will use the optimization that each engine already has implemented natively.
Our example will load the intraday stock daily since the free API does not give real-time data, but we can change the Cloudera Dataflow Parameter to add one more ticker and we've scheduled to run hourly the CDE process. After this we will be able to see the new ticker information in the dashboard and also perform time travel using Iceberg!
Go to Dataflow, and in the Dashboard, click in your deployed flow and then Manage Deployment:
Now click Parameters:
Scroll down and change the stock_list to add the new ticker. I'm adding NVDA ticker but you can choose another one, after this click in Apply Changes:
The flow will be redeployed and it will also execute each minute, so you can check later if the new ticker is processed/loaded into the iceberg table by our CDE process that is scheduled to run periodically.
After some minutes (around 10 which is our schedule) you can check the table snapshots with the following query:
DESCRIBE HISTORY stocks.stock_intraday_1min;
In my case, the Spark process executed sometimes, and I can see the snapshot for each execution:
I'll query the tickers that I had before the last snapshot with the query below. Change the snapshotid to the value that you got with the first query:
SELECT count(*), ticker FROM stocks.stock_intraday_1min FOR SYSTEM_VERSION AS OF <snapshotid> GROUP BY ticker;
Now let's query the table without the snapshot id:
We can see NVDA is reflected in the last snapshot!!!
We've created a full ingestion, processing pipeline in a few clicks. That's the power of Cloudera Data Platform, it's an end-to-end use case that can be easily deployed following only parameters.
You can extend your work with Cloudera Machine Learning; there's an example in this blog where some changes will be needed to point to the table that we've created.
Lastly, we've seen a little of the power of Apache Iceberg, already integrated into Cloudera Data Platform in Public Cloud.