Created on 
    
	
		
		
		01-28-2022
	
		
		03:11 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
 - edited on 
    
	
		
		
		04-26-2022
	
		
		10:01 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
 by 
				
		
		
			subratadas
		
		
		
		
		
		
		
		
	
			
		
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \ 
-D security.kerberos.login.keytab=<keytab_filename>.keytab \ 
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list.  Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
  `customer_id` INT,
  `account_number` STRING,
  `center_inferred_lat` FLOAT,
  `center_inferred_lon` FLOAT,
  `max_inferred_distance` STRING,
  `max_inferred_amount` FLOAT
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '5',
  'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.account_number.expression' = '#{IdNumber.valid}',
  'fields.center_inferred_lat.expression' = '#{Address.latitude}',
  'fields.center_inferred_lon.expression' = '#{Address.longitude}',
  'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
  'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
  `customer_id` INT,
  `lat` FLOAT,
  `lon` FLOAT,
  `location` STRING,
  `charge_amount` FLOAT
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '5',
  'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.lat.expression' = '#{Address.latitude}',
  'fields.lon.expression' = '#{Address.longitude}',
  'fields.location.expression' = '#{Friends.location}',
  'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
  2 * 3961 * asin(sqrt(
                    power(
                      power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2) 
                      + cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat)) 
                      * (sin(radians((cc.lon - ci.center_inferred_lon) / 2))) 
                      , 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE 
  2 * 3961 * asin(sqrt(
                    power(
                      power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2) 
                      + cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat)) 
                      * (sin(radians((cc.lon - ci.center_inferred_lon) / 2))) 
                      , 2))) > ci.max_inferred_distance
                      OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
  function toRad(x) {
    return x * Math.PI / 180;
  }
  lat1 = parseFloat(lat1);
  lon1 = parseFloat(lon1);
  lat2 = parseFloat(lat2);
  lon2 = parseFloat(lon2);
  
  var R = 6371; // km
  
  var x1 = lat2 - lat1;
  var dLat = toRad(x1);
  var x2 = lon2 - lon1;
  var dLon = toRad(x2)
  var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
    Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
    Math.sin(dLon / 2) * Math.sin(dLon / 2); 
  var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
  var d = R * c;
  
  // convert to miles
  return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance 
OR cc.charge_amount > ci.max_inferred_amount
