- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on
12-22-2020
09:16 AM
- edited on
12-22-2020
08:36 PM
by
subratadas
Smart Stocks with FLaNK (NiFi, Kafka, Flink SQL)
- Kafka Topic
- Kafka Schema
- Kudu Table
- Flink Prep
- Flink SQL Client Run
- Flink SQL Client Configuration
If you know your data, build a schema, share to the registry
{ "name" : "dt", "type" : ["long"], "default": 1, "logicalType": "timestamp-millis"}
You can see the entire schema here:
https://raw.githubusercontent.com/tspannhw/SmartStocks/main/stocks.avsc
We will also want a topic for Stock Alerts that we will create later with Flink SQL., so let's define a schema for that as well.
https://raw.githubusercontent.com/tspannhw/SmartStocks/main/stockalerts.avsc
For our data today, we will use AVRO data with AVRO schemas for use inside Kafka topics and whoever will consume it.
How to Build a Smart Stock DataFlow in X Easy Steps
- Retrieve data from source (example: InvokeHTTP against SSL REST Feed - say TwelveData) with a schedule
- Set a Schema Name (UpdateAttribute)
- ForkRecord: We use this to split out records from the header (/values) using RecordPath syntax.
- QueryRecord: Convert type and manipulate data with SQL. We aren't doing anything in this one, but this is an option to change fields, add fields, etc.
- UpdateRecord: This first one I am setting some fields in the record from attributes and adding a current timestamp. I also reformat by timestamp for conversion.
- UpdateRecord: I am making dt make numeric UNIX timestamp.
- UpdateRecord: I am making datetime my formatted String date time.
- (LookupRecord): I don't have this step yet as I don't have an internal record for this company in my Real-Time Data Mart. Apache Kudu architecture in a CDP public cloud deployment. I will probably add this step to augment or check my data.
- (ValidateRecord): For less reliable data sources, I may want to validate my data against our schema, otherwise we will get warnings or errors.
- PublishKafkaRecord_2_0: Convert from JSON to AVRO, send to our Kafka topic with headers, including reference to the correct schema stocks and it's version 1.0.
Viewing, Monitoring, Checking and Alerting On Our Streaming Data in Kafka
How to Store Our Streaming Data to Our Real-Time DataMart in the Cloud
CREATE TABLE stocks(uuid STRING,`datetime` STRING,`symbol` STRING,`open` STRING,`close` STRING,`high` STRING,`volume` STRING,`ts` TIMESTAMP,`dt` TIMESTAMP,`low` STRING,PRIMARY KEY (uuid,`datetime`) )PARTITION BY HASH PARTITIONS 4STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
Using Apache Hue integrated in CDP, I can examine my Real-Time Data Mart table and then query my table.
How to Build a Smart Stock Streaming Analytics in X Easy Steps
I can connect to Flink SQL from the command line Flink SQL Client to start exploring my Kafka and Kudu data, create temporary tables and launch some applications (Insert statements). The environment lets me see all the different catalogs available, including registry (Cloudera Cloud Schema Registry), Hive (Cloud Native Database table) and Kudu (Cloudera Real-Time Cloud Data Mart) tables.
flink-yarn-session -tm 2048 -s 2 -d
flink-sql-client embedded -e sql-env.yaml
Run Flink SQL
Cross Catalog Query to Stocks Kafka Topic
select * from registry.default_database.stocks;
Cross Catalog Query to Stocks Kudu/Impala Table
select * from kudu.default_database.impala::default.stocks;
Default Catalog
use catalog default_catalog;
CREATE TABLE stockEvents ( symbol STRING, uuid STRING, ts BIGINT, dt BIGINT, datetime STRING, open STRING, close STRING, high STRING, volume STRING, low STRING,
event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND )
WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal',
'connector.topic' = 'stocks',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'edge2ai-1.dim.local:9092',
'format.type' = 'registry',
'format.registry.properties.schema.registry.url' = 'http://edge2ai-1.dim.local:7788/api/v1' );
root |-- symbol: STRING |-- uuid: STRING |-- ts: BIGINT |-- dt: BIGINT |-- datetime: STRING |-- open: STRING |-- close: STRING |-- high: STRING |-- volume: STRING |-- low: STRING |-- event_time: TIMESTAMP(3) AS CAST(FROM_UNIXTIME(FLOOR(ts / 1000)) AS TIMESTAMP(3)) |-- WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
Simple Select All Query
Tumbling Window
SELECT symbol,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as tumbleStart,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as tumbleEnd,
AVG(CAST(high as DOUBLE)) as avgHigh
FROM stockEvents
WHERE symbol is not null
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), symbol;
Top 3
SELECT *
FROM
( SELECT * , ROW_NUMBER() OVER
( PARTITION BY window_start ORDER BY num_stocks desc ) AS rownum
FROM (
SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start,
symbol,
COUNT(*) AS num_stocks
FROM stockEvents
GROUP BY symbol,
TUMBLE(event_time, INTERVAL '10' MINUTE) ) )
WHERE rownum <=3;
Stock Alerts
INSERT INTO stockalerts
/*+ OPTIONS('sink.partitioner'='round-robin') */
SELECT CAST(symbol as STRING) symbol,
CAST(uuid as STRING) uuid, ts, dt, open, close, high, volume, low,
datetime, 'new-high' message, 'nh' alertcode, CAST(CURRENT_TIMESTAMP AS BIGINT) alerttime FROM stocks st
WHERE symbol is not null
AND symbol <> 'null'
AND trim(symbol) <> ''
AND CAST(close as DOUBLE) > 11;
Monitoring Flink Jobs
Data Lineage and Governance
References
- https://github.com/cloudera/flink-tutorials/tree/master/flink-sql-tutorial
- https://github.com/tspannhw/FlinkSQLWithCatalogsDemo
- https://github.com/tspannhw/ClouderaFlinkSQLForPartners/blob/main/README.md
- https://github.com/tspannhw/ApacheConAtHome2020/tree/main/scripts
- https://github.com/tspannhw/SmartWeather
- https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
- https://www.datainmotion.dev/2020/11/flank-smart-weather-applications-with.html
- https://www.datainmotion.dev/2020/11/flank-smart-weather-websocket.html
Created on 12-22-2020 11:42 AM
- Mark as Read
- Mark as New
- Bookmark
- Permalink
- Report Inappropriate Content
Amazing work here sir!