- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on
01-28-2022
03:11 PM
- edited on
04-26-2022
10:01 PM
by
subratadas
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Prerequisite
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.lat.expression' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
}
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount