07:59 PM
2 Kudos
Cyber/Log data ingestion is super simple using CDP NiFi; however, it alone does not complete the full picture InfoSec & Threat hunters are interested in. Often these uber pros are looking for breadcrumbs/clues to shorten their "hunting" exercise. CDP DataViz immediately provides rich visualizations on high fidelity cyber/log data to shorten mean time to vulnerability detection. Additionally, I've observed these teams often need the ability to run ad hoc analysis, enrichment, and/or correlation on stream using an intuitive interface; not to mention, without writing code. Ah yes, SQL on Stream. The entire demo looks less than a few hours to build; proving in cyber, we don't always have to engineer (i.e. Grok) our way through difficult data challenges. Demo:
03:11 PM
2 Kudos
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Flink SSB Credit Card Fraud Demo
Quick setup
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
cc_charges table
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power((sin(radians(( - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
2 * 3961 * asin(sqrt(
power((sin(radians(( - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
04:21 PM
A short how-to article on how to list CDP Data hub Kafka topics via command line: Prerequisites CDP DataHub Kafka cluster Workload username & password CDP PC trust store Steps Fetch CDP cert and store in a trust store Creating TLS truststore or simple fetch it from any node /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks ssh into any Data hub Kafka node using CDP workload username/password Copy trust store onto the node Set broker host BROKER_HOST=$(hostname -f) Create jaas.conf KafkaClient { required
username="<workload username>"
password="<workload password>";
}; Set KAFKA_OPTS environment variable to point to location where jaas.conf is located export<LOCATION-OF-YOUR-JAASCONF>/jaas.conf Create security.protocol=SASL_SSL
ssl.truststore.location=/<LOCATION-OF-YOUR-TRUSTSTORE>/truststore.jks required username=<WORKLOAD-USER-NAME> password=<WORKLOAD-PASSWORD> Run Kafka-topics /opt/cloudera/parcels/CDH/bin/kafka-topics --list --bootstrap-server $BROKER_HOST:9093 --command-config <LOCATION-OF-YOUR-CLIENT-KERB-PROP-FILE>/
01:30 PM
I often encounter the following question - "How to connect a Java application to CDP Kafka?". The good news is, it's simple.
Public docs: Connecting Kafka clients to Data Hub provisioned clusters
CDP Kafka, by default, is secure and governed. That means CDP Kafka uses TLS and CDP workload users for security and authorization. CDP Kafka supports all Apache Kafka authentication capabilities. Again, by default, CDP Kafka is automatically secured <SASL_SSL> and governed by Apache Ranger.
CDP Public Cloud with a Kafka DataHub instance.
Connectivity requirements
To connect to CDP Kafka, the following is required:
CDP TrustStore
CDP Workload Username/Password
Kafka Broker Name
JaaS File
Java Example
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "connectivity-testing");
properties.put("", "kafka");
properties.put("ssl.truststore.location", "/home/sunilemanjee/truststore.jks");
" required username=\"<CDP-WORKLOAD-USER>\" password=\"<CDP-WORKLOAD-PASSWORD>\";");
Full code here.
Fetch the TrustStore for CDP is well documented here.
CDP Workload Username/Password
Set and fetch CDP workload username/password
Kafka Broker Names
Fetch broker names from the CDP management console under DataHub, Hardware Tab
JaaS File
KafkaClient { required
That's it. Enjoy
11:26 AM
My previous article demonstrated how to use the CDE-managed Airflow instance to read files from CML. To close the loop, here is a short demonstration on how to write files to CML using CDE's managed Airflow instance.
Airflow dag
11:19 AM
1 Kudo
Short demo on how to use CDE's new Airflow UI to design pipelines. Additionally leveraging Airflow triggers to orchestrate a typical job pipeline forking logic (triggers). A "Typical job pipeline" often encapsulates logic regarding success and failure paths. Artifacts Demo Video Git Repo
09:14 AM
3 Kudos
Apache Airflow and Spark running on K8s provide unreasonable scalability. Cloudera Machine Learning also runs on K8s and often the ML users leverage Apache Airflow to orchestrate their pipelines. Apache Airflow within Cloudera Data Engineering now has the capability to orchestrate CML jobs via the new Airflow CML Operator. This time instead of writing up a detailed article, I recorded a live demo. Assets used in the demo CML Operator Airflow Dag Demo Enjoy!
07:08 PM
Yes the above comment on the functionality not working any longer is accurate. NiFi has gone through many enhancements and therefore not reflected in this article. I do not plan on updating the article due to the following: The objective of this article was to get nifi running in K8s because during that time NiFi on K8s offering did not exist within Hortonworks HDF. Recently NiFi on K8s has been GA'd. It's call DataFlow experience. Docs here: If you want NiFi on K8s, that is your new playground.
01:20 PM
1 Kudo
File processing + autoscaling seems to have an antinomic relationship. It doesn't have to be..really. Autoscaling often drives inefficient behaviors..why? The concomitant response... "It Autoscales". Autoscaling still requires sound design principles. Without that, It won't autoscale well.
Autoscaling within a distributed framework requires proper data dichotomization along with processing/service layer decoupling
Taking an example of where I've seen the most amount of heartburn.
Large files (i.e. zip/tar/etc) land in s3/Storage area. The knee-jerk reaction is to feed it into a distributed processing engine that autoscales. The "autoscaling" part appropriately. It may. Most likely you're flipping a coin and lighting a candle, hoping that all works out. What if the file sizes are heterogeneous and the variances between them could be quite significant? What about error handling? Does it have to be all or nothing (meaning all files need to be processing or all fail)?
What if autoscaling is driven through smaller processing units (groups)?
Here we have taken the same payloads, but defrag them into smaller units. Each unit requires a heterogeneous compute footprint driven resource consumption efficiencies.
Technical Details
For example, a payload ( containers 1000 JSON files. Instead of throwing this entire payload at a single compute cluster (requiring the maximum number of resources possible, aka top-line compute profile)...defrag the payload.
As the payload arrives in s3, CDF-X's (Cloudera Data Flow Experience) s3 processor listens for any files. CDF-X will pull the files, decompress, and write all the files back to s3. For example:
s3://My-S3-Bucket/decompressed/file1.json, 3://My-S3-Bucket/decompressed/file2.json, ...
In parallel CDF-X generates CDE (Cloudera Data Engineering, Spark) a job spec, a JSON payload. The job spec includes file locations. Each job would be provided with roughly ~100 file names+location. Since CDF-X knows about the file size, it can also hint to CDE how much compute would be required to process these files. This step is not necessary as we are already defragged the unit of work to something manageable, and therefore CDE autoscaling should kick in and perform well. Once the job spec is created, CDF-X calls CDE over the rest sending over the job spec. CDE accepts the job spec and arguments (file locations) and runs the micro workloads. Each workload has its own heterogeneous compute profile and auto-scales independently.
Wrapping Up
Defragmentation of large singleton payloads enables autoscaling to run more efficiently. Autoscale is an incredibly powerful capability often misused by not applying sound design principles. Leveraging these simple patterns allow for ease of operations, cost control, and manageability.
02:24 PM
2 Kudos
Recently I was tasked to build a possible pattern regarding how to handle slowing changing dimensions (Type 2 to be specific) within CDP. The obvious answer is to use Hive ACID, but clearly, that answer is too generic. I needed to build a pipeline similar to what I used to do as an Informatica developer and verify legitimacy to the solution/pattern. Here we go.
The objective was to build a possible (one of many) pattern on how to handle SCD Type 2 dimensions with CDP. Outcome: I was able to repeat a typical ETL workflow with CDP. How? By taking off my horse blinders...Really. What are SCDs?
For those who may not be family with slowly changing dimensions within a EDW context, here is a great quick read: Types Of Dimension Tables
Data lands in a sourcing area
Super typical
Pull source data & run it through a cleaning/processing layer for staging
Ready for merging against SCD table
Run an ACID merge between stage and SCD Table
Not rocket science
I don't believe you...I wanna try this in CDP Public Cloud or Private Cloud
Perfect. This article runs through a demo using CDP to do exactly that.
What is required for this demo?
CDP Public Cloud Account
Cloudera Data Engineering (CDE)...which includes airflow
Cloudera Data Warehouse (HiveLLAP)
Assets required to produce this demo:
Spark Code
Airflow Dag
products.psv (Type 2 dim)
product_changes.psv (changes that need to be applied to the product dimension table)
Workflow Details
Airflow will orchestrate the workflow. First, a CDE Spark job will pick up the product changes from s3. Spark will perform the required ETL and then write the output to a staging table (Hive External). Lastly, using the new Airflow CDW operator; a Hive ACID merge will be executed between the staging table and product dimension. Not rocket science. I know.
Data/Tables Setup
For this demo, there is a single dimension table; product. The other table is product_ext, which is an external table with raw data which needs to be applied to the product dimension. Very common stuff.
Product DDL
Note: Hive 3 tables are by default internal full acid support
product_id INT,
product_name STRING,
aisle_id int,
department_id int,
start_date date,
end_date date,
is_current string DEFAULT 'Y')
Product_ext Schema
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket.
product_id INT,
product_name STRING,
aisle_id int,
department_id int)
LOCATION 's3a://<YOUR-S3-BUCKET>/sunman/products/'
tblproperties ("skip.header.line.count"="1");
Load the Production dimension table
insert into product
select product_id, product_name, aisle_id, department_id, current_date() as start_date, null as end_date, 'Y' as is_current from product_ext
Lastly, upload the product_changes.psv file to s3
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket
Recap: A product dimension table has been created. A file with changes that need to be applied against the product dimension table has been uploaded to s3.
Cloudera Data Warehousing
Via CDW, a Hive ACID merge statement will merge the staging table with the product dimension. This will be triggered by Airflow using the new CDW operator. More on that later.
Grab the JDBC URL from the virtual warehouse
Note: SSO must be disabled
For example:
Create an Airflow CDW Connection
To execute a CDW HQL statement(s), an airflow connection to CDW is required. The connection is referenced in the airflow dag, more on that later.
How to create a CDW Airflow connection: Automating data pipelines using Apache Airflow in Cloudera Data Engineering.
Important: Make note of the conn Id. It will be used later in this article
Create a CDE Spark Job
This job reads product_changes.psv file (contains the changes which need to be applied to the product dimension), performs cleansing/ETL, and stages the changes as an external Hive table.
The code is available on my Github page. If you're not interested in building the code, no worries. I have also provided the Spark jar which can be downloaded instead.
CDE Spark Job Details
Note: For this demo to work, all job specs/args/configs must match the screen shot below
Note: Update <YOUR-S3-BUCKET> with your s3 bucket
Create a CDE Airflow Job
The airflow dag flow is the following:
First, a Spark CDE job will be called to stage the product changes into an external Hive table. Then CDW will be called to perform the Hive ACID Merge between the product dimension and staging table. The code for the dag is here:
The dag file is
Open it and update the cli_conn_id. This is the Airflow CDW connection created earlier:
hive_cdw_job = CDWOperator(
Run the SCD workflow
Generally, airflow jobs can be executed through the UI. Since I parametrized the dag, at this time the only way to execute airflow job with run time configs is through CLI.
Download CDE CLI: Using the Cloudera Data Engineering command line interface
Note: Update <YOUR-S3-BUCKET> with your s3 bucket name
./cde job run --config c_stageTable='product_staged' --config c_sourceLoc='s3a://<YOUR-S3-BUCKET>/sunman/product_changes/' --config c_stageCleansedTable=product_staged_cleansed --config c_dimTable=product --name EDW-SCD-WorkFlow
This will return an airflow job ID. Return to the CDE UI. After the dag completes, run the following:
Product ID 232 is one of several products, which was updated.
Wrap It Up!
This end-to-end SCD pipeline demonstrated the capability to use the right tool for the right job to accomplish a highly important EDW task. All driven by using Airflow's super powerful orchestration engine to hand off work between Apache Spark and Apache Hive. Spark for data processing. Hive for EDW (Acid Merge).
