Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

Introduction

 

Stock market is always a thing when it comes to data analysis and examples, so how can we easily create an ingestion pipeline that allows quick insights with Cloudera Data Platform?

 

How about using the new Iceberg table format available in CDP to get new features with everything integrated and secured? Introducing Apache Iceberg in Cloudera Data Platform

 

In this example, I'll show how to use Cloudera Public Cloud to get data from Rest API and (with only clicks/parameters input) start quickly querying your data, dashboard, and advanced analytics.

 

In this example we will:

  1. Load intraday stock data into the object storage periodically (D-1 data) with Cloudera Dataflow;
  2. Process with Spark the data to make available in Cloudera Data Engineering and schedule with Airflow)
    This process will check if the table is new and then MERGE INTO (new) stock ICEBERG (new) table; Code is here.
  3. Analyze and query data with Cloudera Data Warehouse/Cloudera Data Visualization;
  4. Perform TIME TRAVEL/SNAPSHOTS (new) in the table;

Also with Iceberg, you can perform Schema Evolution and benefit of open source optimizations and engine interoperability using open formats (like parquet).

 

Although the stock information is D-1, we will schedule to run each 10 minutes to identify if there is a new stock ticker in the parameters and ingest the data.

 

Again all without coding and only using saved templates!! Following is the architecture for this example:

 

carrossoni_0-1646089909165.png

Pre-Requisites

To download the stock information, we will use the free API (at the time of this writing) from Alpha VantageSo, first you will need to register to get your API key that will be used and save it.

 

Also, you will need the name of the bucket path where you will save the data. Now with all the information that we need (API Key, Bucket) let's start!

 

Following is the list of parameters that we will need to fill:

  • Alpha Vantage API Key;
  • Root Bucket used by CDP;
  • Workload username;
  • Workload password;

The components of Cloudera Data Platform that needs to be available are:

 

  • Cloudera Data Warehouse with Visualization enabled;
  • Cloudera Dataflow;
  • Cloudera Data Engineering (Spark 3) with Airflow enabled;

Create the flow to ingest stock data via API to Object Storage

 

  1. First download the template file located here and access Cloudera DataFlow where we can upload the template to start loading data into your object storage.
  2. In Cloudera Dataflow UI, click on "Catalog" → "Import Flow Definition"carrossoni_1-1646089909179.png
  3. Put a name for your flow, description and select the file CDF template that you've downloaded:carrossoni_2-1646089909139.png
  4. Click Import;
  5. After deploying, select your flow, and in the menu, click the blue button (Deploy):
  6. Select the CDP Flow Environment that you are using and then continue:carrossoni_3-1646089909100.png
  7. Put your deployment name and click Next
  8. Do not change any NiFi configuration and click Next;
  9. Now it will ask for the parameters that we need to provide, input your
    • CDP Password: Workload password in CDP
    • CDP User: Workload user in CDP
    • S3 Path: Subfolder that will be created under the main <bucket>/user/<youruser>/<s3path>  (only the name without s3 etc)
    • S3 Bucket: The main CDP Bucket used by CDP (only the name without s3 etc)
    • API Alpha Key: The API key that will be used (demo can only get IBM stock data)
    • Stock_list: the list of the stocks that you want to load, put in each line the ticker of the stock
      Note: It is possible to change the stock list after deploying to ingest new ticker data. We will do this to demonstrate the Iceberg time travel feature.carrossoni_4-1646089908919.png
  10. Click Next and aelect the size of the NiFi node and max scaling:carrossoni_5-1646089909269.png
  11. Here you can define any KPI indicators, we will leave it as is and click Next
  12. Review and click Deploy

Done! In minutes we will start receiving stock information into our bucket! If you want you can check in your bucket under the path s3a://<cdpbucket>/user/<yourusername>/<cdppath>/new:

 

carrossoni_6-1646089909294.png

Create Iceberg Tables

For this, you can use the script to create the tables below in an Impala Virtual Warehouse connected to your SDX environment in CDP:

CREATE DATABASE stocks;


CREATE TABLE IF NOT EXISTS stocks.stock_intraday_1min

(interv STRING,output_size STRING,time_zone STRING,open DECIMAL(8,4),high DECIMAL(8,4),low DECIMAL(8,4),close DECIMAL(8,4),volume BIGINT )

PARTITIONED BY (ticker STRING,last_refreshed string,refreshed_at string)

STORED AS iceberg;

Go to Cloudera Data Warehouse UI to access Hue and we will create an Iceberg table to be used by our queries:

 

carrossoni_0-1646250619658.png

 

Leave Hue open to query data later.

Note: You can change the database name for this and next example I'm using stocks.

Process and Ingest Iceberg using CDE

Now we will use Cloudera Data Engineering to check the files in the object storage, compare if it's new data, and insert them into the Iceberg table. For this, download the jar, and in Cloudera CDE UI, go your Virtual Spark Cluster and click View Jobs

 

carrossoni_8-1646089908938.png

 

  1. Click in Jobs → Create Jobs:
    carrossoni_9-1646089909074.png
    • Name: Put the job name Ex: StockIceberg
    • File: Upload the jar file stockdatabase_2.11-1.0.jar (Create a resource in the drop-down button)
    • Main Class com.cloudera.cde.stocks.StockProcessIceberg
    • Arguments:
      • <databasename> → (ex: stocks)
      • <S3 Bucket> → (Same bucket used in Dataflow with the complete path ex: s3a://carrossoni-sa-east/)
      • <S3 Path> →  (Same path used in Dataflow ex: stocks)
      • <CDP User> → (Same user used in Dataflow ex: carrossoni)
  2. Schedule: Enable and change to run every 10 minutes with the crontab configuration */10 * * * *
    Click in Schedule

 

This job will run each 10 minutes to check if there's any new ticker. To run now for first time, click the 3 dots under actions of the job and click "Run Now":

carrossoni_10-1646089909255.png

Click in "Job Runs" to check if the job is done. It will take around 3 minutes to spin up the resources in Kubernetes and execute the pipeline to ingest into the final table the new data.

carrossoni_11-1646089909200.png

 

Also, you can check the cluster:

carrossoni_12-1646089908955.png

This application is very simple, it will:

  • Check new files in the new directory;
  • Create a temp table in Spark/cache this table and identify duplicated rows (in case that NiFi loaded the same data again);
  • MERGE INTO the final table, INSERT new data or UPDATE if exists;
  • Archive files in the bucket;

After execution, the processed files will be in your bucket but under the "processed"+date directory:

carrossoni_13-1646089909219.png

 

Now let's query data!

Query Iceberg Tables in Hue and Cloudera Data Visualization

Now we should have the data ingested, let's go back to Hue and select the data in the table stocks.stock_intraday_1min:

carrossoni_14-1646089909384.png

In Cloudera Data Visualization, I can also select this table to create a new Dataset called "Stocks" and create visualizations:

carrossoni_1-1646250860983.png

For example, stocks by volume:

carrossoni_16-1646089909296.png

Also, you can use Cloudera CDP tools to ingest data from other sources and create your own stock analyzer platform!

Iceberg Advanced Features

Apache Iceberg delivers the ability to:

  • Time Travel
  • Schema Evolution
  • Partition Evolution

And a lot of other things that you can benefit from it. Also, it's engine-independent and will use the optimization that each engine already has implemented natively.

 

Our example will load the intraday stock daily since the free API does not give real-time data, but we can change the Cloudera Dataflow Parameter to add one more ticker and we've scheduled to run hourly the CDE process. After this we will be able to see the new ticker information in the dashboard and also perform time travel using Iceberg!

Go to Dataflow, and in the Dashboard, click in your deployed flow and then Manage Deployment:

carrossoni_17-1646089909362.png

Now click Parameters:

carrossoni_23-1646090298462.png

Scroll down and change the stock_list to add the new ticker. I'm adding NVDA ticker but you can choose another one, after this click in Apply Changes:

carrossoni_24-1646090315712.png

 

The flow will be redeployed and it will also execute each minute, so you can check later if the new ticker is processed/loaded into the iceberg table by our CDE process that is scheduled to run periodically.

After some minutes (around 10 which is our schedule) you can check the table snapshots with the following query:

DESCRIBE HISTORY stocks.stock_intraday_1min;

In my case, the Spark process executed sometimes, and I can see the snapshot for each execution:

carrossoni_25-1646090336205.png

I'll query the tickers that I had before the last snapshot with the query below. Change the snapshotid to the value that you got with the first query:

SELECT count(*), ticker

FROM stocks.stock_intraday_1min

FOR SYSTEM_VERSION AS OF <snapshotid>

GROUP BY ticker;
carrossoni_21-1646089909242.png

Now let's query the table without the snapshot id:

carrossoni_22-1646089909197.png

We can see NVDA is reflected in the last snapshot!!!

Summary 

We've created a full ingestion, processing pipeline in a few clicks. That's the power of Cloudera Data Platform, it's an end-to-end use case that can be easily deployed following only parameters.

 

You can extend your work with Cloudera Machine Learning; there's an example in this blog where some changes will be needed to point to the table that we've created.

 

Lastly, we've seen a little of the power of Apache Iceberg, already integrated into Cloudera Data Platform in Public Cloud.

5,485 Views
Comments

Excellent work @carrossoni .  I really see value in the time travel feature!!