Created on 01-28-2022 03:11 PM - edited on 04-26-2022 10:01 PM by subratadas
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.lat.expression' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
}
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount