Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Master Guru

Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.

 

Demo

Flink SSB Credit Card Fraud Demo

Prerequisite

Quick setup

Note

If you experience any challenges with running SSB, as a sanity check run the following on the cluster

 

 

flink-yarn-session -d \ 
-D security.kerberos.login.keytab=<keytab_filename>.keytab \ 
-D security.kerberos.login.principal=<workload_username>

 

 

If the above works as a sanity check, kill the yarn job.

 

yarn application -appStates RUNNING -list

That will display all running yarn apps. Your app will show up in this list.  Then run this to kill the app

yarn application -kill application_xxxx

 

customer_inferences table

 

CREATE TEMPORARY TABLE customer_inferences (
  `customer_id` INT,
  `account_number` STRING,
  `center_inferred_lat` FLOAT,
  `center_inferred_lon` FLOAT,
  `max_inferred_distance` STRING,
  `max_inferred_amount` FLOAT
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '5',
  'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.account_number.expression' = '#{IdNumber.valid}',
  'fields.center_inferred_lat.expression' = '#{Address.latitude}',
  'fields.center_inferred_lon.expression' = '#{Address.longitude}',
  'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
  'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);

 

cc_charges table

 

CREATE TEMPORARY TABLE cc_charges (
  `customer_id` INT,
  `lat` FLOAT,
  `lon` FLOAT,
  `location` STRING,
  `charge_amount` FLOAT
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '5',
  'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.lat.expression' = '#{Address.latitude}',
  'fields.lon.expression' = '#{Address.longitude}',
  'fields.location.expression' = '#{Friends.location}',
  'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);

 

SQL to detect fraud

 

select ci.account_number, cc.charge_amount,
  2 * 3961 * asin(sqrt(
                    power(
                      power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2) 
                      + cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat)) 
                      * (sin(radians((cc.lon - ci.center_inferred_lon) / 2))) 
                      , 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE 
  2 * 3961 * asin(sqrt(
                    power(
                      power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2) 
                      + cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat)) 
                      * (sin(radians((cc.lon - ci.center_inferred_lon) / 2))) 
                      , 2))) > ci.max_inferred_distance
                      OR cc.charge_amount > ci.max_inferred_amount

 

DISTANCE_BETWEEN function

 

function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
  function toRad(x) {
    return x * Math.PI / 180;
  }

  lat1 = parseFloat(lat1);
  lon1 = parseFloat(lon1);
  lat2 = parseFloat(lat2);
  lon2 = parseFloat(lon2);
  
  var R = 6371; // km
  
  var x1 = lat2 - lat1;
  var dLat = toRad(x1);
  var x2 = lon2 - lon1;
  var dLon = toRad(x2)
  var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
    Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
    Math.sin(dLon / 2) * Math.sin(dLon / 2); 
  var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
  var d = R * c;
  
  // convert to miles
  return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)

 

SQL to detect fraud using DISTANCE_BETWEEN function

 

select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance 
OR cc.charge_amount > ci.max_inferred_amount

 

1,872 Views