Created on 12-22-2020 09:16 AM - edited on 12-22-2020 08:36 PM by subratadas
{ "name" : "dt", "type" : ["long"], "default": 1, "logicalType": "timestamp-millis"}
You can see the entire schema here:
https://raw.githubusercontent.com/tspannhw/SmartStocks/main/stocks.avsc
We will also want a topic for Stock Alerts that we will create later with Flink SQL., so let's define a schema for that as well.
https://raw.githubusercontent.com/tspannhw/SmartStocks/main/stockalerts.avsc
For our data today, we will use AVRO data with AVRO schemas for use inside Kafka topics and whoever will consume it.
CREATE TABLE stocks(uuid STRING,`datetime` STRING,`symbol` STRING,`open` STRING,`close` STRING,`high` STRING,`volume` STRING,`ts` TIMESTAMP,`dt` TIMESTAMP,`low` STRING,PRIMARY KEY (uuid,`datetime`) )PARTITION BY HASH PARTITIONS 4STORED AS KUDU TBLPROPERTIES ('kudu.num_tablet_replicas' = '1');
Using Apache Hue integrated in CDP, I can examine my Real-Time Data Mart table and then query my table.
I can connect to Flink SQL from the command line Flink SQL Client to start exploring my Kafka and Kudu data, create temporary tables and launch some applications (Insert statements). The environment lets me see all the different catalogs available, including registry (Cloudera Cloud Schema Registry), Hive (Cloud Native Database table) and Kudu (Cloudera Real-Time Cloud Data Mart) tables.
flink-yarn-session -tm 2048 -s 2 -d
flink-sql-client embedded -e sql-env.yaml
select * from registry.default_database.stocks;
select * from kudu.default_database.impala::default.stocks;
use catalog default_catalog;
CREATE TABLE stockEvents ( symbol STRING, uuid STRING, ts BIGINT, dt BIGINT, datetime STRING, open STRING, close STRING, high STRING, volume STRING, low STRING,
event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND )
WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal',
'connector.topic' = 'stocks',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = 'edge2ai-1.dim.local:9092',
'format.type' = 'registry',
'format.registry.properties.schema.registry.url' = 'http://edge2ai-1.dim.local:7788/api/v1' );
root |-- symbol: STRING |-- uuid: STRING |-- ts: BIGINT |-- dt: BIGINT |-- datetime: STRING |-- open: STRING |-- close: STRING |-- high: STRING |-- volume: STRING |-- low: STRING |-- event_time: TIMESTAMP(3) AS CAST(FROM_UNIXTIME(FLOOR(ts / 1000)) AS TIMESTAMP(3)) |-- WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
SELECT symbol,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) as tumbleStart,
TUMBLE_END(event_time, INTERVAL '1' MINUTE) as tumbleEnd,
AVG(CAST(high as DOUBLE)) as avgHigh
FROM stockEvents
WHERE symbol is not null
GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), symbol;
SELECT *
FROM
( SELECT * , ROW_NUMBER() OVER
( PARTITION BY window_start ORDER BY num_stocks desc ) AS rownum
FROM (
SELECT TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS window_start,
symbol,
COUNT(*) AS num_stocks
FROM stockEvents
GROUP BY symbol,
TUMBLE(event_time, INTERVAL '10' MINUTE) ) )
WHERE rownum <=3;
INSERT INTO stockalerts
/*+ OPTIONS('sink.partitioner'='round-robin') */
SELECT CAST(symbol as STRING) symbol,
CAST(uuid as STRING) uuid, ts, dt, open, close, high, volume, low,
datetime, 'new-high' message, 'nh' alertcode, CAST(CURRENT_TIMESTAMP AS BIGINT) alerttime FROM stocks st
WHERE symbol is not null
AND symbol <> 'null'
AND trim(symbol) <> ''
AND CAST(close as DOUBLE) > 11;
Created on 12-22-2020 11:42 AM
Amazing work here sir!