Member since
05-30-2018
1322
Posts
715
Kudos Received
148
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3356 | 08-20-2018 08:26 PM | |
1481 | 08-15-2018 01:59 PM | |
1881 | 08-13-2018 02:20 PM | |
3360 | 07-23-2018 04:37 PM | |
4071 | 07-19-2018 12:52 PM |
05-05-2022
07:59 PM
2 Kudos
Cyber/Log data ingestion is super simple using CDP NiFi; however, it alone does not complete the full picture InfoSec & Threat hunters are interested in. Often these uber pros are looking for breadcrumbs/clues to shorten their "hunting" exercise. CDP DataViz immediately provides rich visualizations on high fidelity cyber/log data to shorten mean time to vulnerability detection. Additionally, I've observed these teams often need the ability to run ad hoc analysis, enrichment, and/or correlation on stream using an intuitive interface; not to mention, without writing code. Ah yes, SQL on Stream. The entire demo looks less than a few hours to build; proving in cyber, we don't always have to engineer (i.e. Grok) our way through difficult data challenges. Demo:
... View more
01-28-2022
03:11 PM
2 Kudos
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Prerequisite
Quick setup
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.lat.expression' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
}
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
... View more
Labels:
01-27-2022
04:21 PM
A short how-to article on how to list CDP Data hub Kafka topics via command line: Prerequisites CDP DataHub Kafka cluster Workload username & password CDP PC trust store Steps Fetch CDP cert and store in a trust store Creating TLS truststore or simple fetch it from any node /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks ssh into any Data hub Kafka node using CDP workload username/password Copy trust store onto the node Set broker host BROKER_HOST=$(hostname -f) Create jaas.conf KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<workload username>"
password="<workload password>";
}; Set KAFKA_OPTS environment variable to point to location where jaas.conf is located export KAFKA_OPTS=-Djava.security.auth.login.config=<LOCATION-OF-YOUR-JAASCONF>/jaas.conf Create client-kerberos.properties security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.kerberos.service.name=kafka
ssl.truststore.location=/<LOCATION-OF-YOUR-TRUSTSTORE>/truststore.jks
org.apache.kafka.common.security.plain.PlainLoginModule required username=<WORKLOAD-USER-NAME> password=<WORKLOAD-PASSWORD> Run Kafka-topics /opt/cloudera/parcels/CDH/bin/kafka-topics --list --bootstrap-server $BROKER_HOST:9093 --command-config <LOCATION-OF-YOUR-CLIENT-KERB-PROP-FILE>/client-kerberos.properties
... View more
Labels:
01-25-2022
01:30 PM
I often encounter the following question - "How to connect a Java application to CDP Kafka?". The good news is, it's simple.
Public docs: Connecting Kafka clients to Data Hub provisioned clusters
CDP Kafka, by default, is secure and governed. That means CDP Kafka uses TLS and CDP workload users for security and authorization. CDP Kafka supports all Apache Kafka authentication capabilities. Again, by default, CDP Kafka is automatically secured <SASL_SSL> and governed by Apache Ranger.
Prerequisites
CDP Public Cloud with a Kafka DataHub instance.
Connectivity requirements
To connect to CDP Kafka, the following is required:
CDP TrustStore
CDP Workload Username/Password
Kafka Broker Name
JaaS File
Java Example
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "connectivity-testing");
properties.put("sasl.kerberos.service.name", "kafka");
properties.put("ssl.truststore.location", "/home/sunilemanjee/truststore.jks");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CDP-WORKLOAD-USER>\" password=\"<CDP-WORKLOAD-PASSWORD>\";");
Full code here.
TrustStore
Fetch the TrustStore for CDP is well documented here.
CDP Workload Username/Password
Set and fetch CDP workload username/password
Kafka Broker Names
Fetch broker names from the CDP management console under DataHub, Hardware Tab
JaaS File
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cdp-workload-user"
password="cdp-workload-password;
};
That's it. Enjoy
... View more
11-16-2021
11:26 AM
My previous article demonstrated how to use the CDE-managed Airflow instance to read files from CML. To close the loop, here is a short demonstration on how to write files to CML using CDE's managed Airflow instance.
Artifacts
Airflow dag
Demo
... View more
10-26-2021
11:19 AM
1 Kudo
Short demo on how to use CDE's new Airflow UI to design pipelines. Additionally leveraging Airflow triggers to orchestrate a typical job pipeline forking logic (triggers). A "Typical job pipeline" often encapsulates logic regarding success and failure paths. Artifacts Demo Video https://youtu.be/G-53Uf4qrFw Git Repo https://github.com/sunileman/cde-airflow-branch-operator
... View more
10-20-2021
09:14 AM
3 Kudos
Apache Airflow and Spark running on K8s provide unreasonable scalability. Cloudera Machine Learning also runs on K8s and often the ML users leverage Apache Airflow to orchestrate their pipelines. Apache Airflow within Cloudera Data Engineering now has the capability to orchestrate CML jobs via the new Airflow CML Operator. This time instead of writing up a detailed article, I recorded a live demo. Assets used in the demo CML Operator https://github.com/curtishoward/cml_operator/ Airflow Dag https://github.com/sunileman/cde-airflow-branch-operator Demo https://www.youtube.com/watch?v=n9o_Qimnggo Enjoy!
... View more
09-15-2021
07:08 PM
Yes the above comment on the functionality not working any longer is accurate. NiFi has gone through many enhancements and therefore not reflected in this article. I do not plan on updating the article due to the following: The objective of this article was to get nifi running in K8s because during that time NiFi on K8s offering did not exist within Hortonworks HDF. Recently NiFi on K8s has been GA'd. It's call DataFlow experience. Docs here:https://docs.cloudera.com/dataflow/cloud/index.html. If you want NiFi on K8s, that is your new playground.
... View more
08-30-2021
01:20 PM
1 Kudo
File processing + autoscaling seems to have an antinomic relationship. It doesn't have to be..really. Autoscaling often drives inefficient behaviors..why? The concomitant response... "It Autoscales". Autoscaling still requires sound design principles. Without that, It won't autoscale well.
Autoscaling within a distributed framework requires proper data dichotomization along with processing/service layer decoupling
Anti-Pattern
Taking an example of where I've seen the most amount of heartburn.
Large files (i.e. zip/tar/etc) land in s3/Storage area. The knee-jerk reaction is to feed it into a distributed processing engine that autoscales. The "autoscaling" part appropriately. It may. Most likely you're flipping a coin and lighting a candle, hoping that all works out. What if the file sizes are heterogeneous and the variances between them could be quite significant? What about error handling? Does it have to be all or nothing (meaning all files need to be processing or all fail)?
What if autoscaling is driven through smaller processing units (groups)?
Here we have taken the same payloads, but defrag them into smaller units. Each unit requires a heterogeneous compute footprint driven resource consumption efficiencies.
Technical Details
For example, a payload (myExamplePayload.zip) containers 1000 JSON files. Instead of throwing this entire payload at a single compute cluster (requiring the maximum number of resources possible, aka top-line compute profile)...defrag the payload.
As the payload arrives in s3, CDF-X's (Cloudera Data Flow Experience) s3 processor listens for any files. CDF-X will pull the files, decompress, and write all the files back to s3. For example:
s3://My-S3-Bucket/decompressed/file1.json, 3://My-S3-Bucket/decompressed/file2.json, ...
In parallel CDF-X generates CDE (Cloudera Data Engineering, Spark) a job spec, a JSON payload. The job spec includes file locations. Each job would be provided with roughly ~100 file names+location. Since CDF-X knows about the file size, it can also hint to CDE how much compute would be required to process these files. This step is not necessary as we are already defragged the unit of work to something manageable, and therefore CDE autoscaling should kick in and perform well. Once the job spec is created, CDF-X calls CDE over the rest sending over the job spec. CDE accepts the job spec and arguments (file locations) and runs the micro workloads. Each workload has its own heterogeneous compute profile and auto-scales independently.
Wrapping Up
Defragmentation of large singleton payloads enables autoscaling to run more efficiently. Autoscale is an incredibly powerful capability often misused by not applying sound design principles. Leveraging these simple patterns allow for ease of operations, cost control, and manageability.
... View more
08-27-2021
02:24 PM
2 Kudos
Recently I was tasked to build a possible pattern regarding how to handle slowing changing dimensions (Type 2 to be specific) within CDP. The obvious answer is to use Hive ACID, but clearly, that answer is too generic. I needed to build a pipeline similar to what I used to do as an Informatica developer and verify legitimacy to the solution/pattern. Here we go.
Why?
The objective was to build a possible (one of many) pattern on how to handle SCD Type 2 dimensions with CDP. Outcome: I was able to repeat a typical ETL workflow with CDP. How? By taking off my horse blinders...Really. What are SCDs?
For those who may not be family with slowly changing dimensions within a EDW context, here is a great quick read: Types Of Dimension Tables
How?
Data lands in a sourcing area
Super typical
Pull source data & run it through a cleaning/processing layer for staging
Ready for merging against SCD table
Run an ACID merge between stage and SCD Table
Not rocket science
I don't believe you...I wanna try this in CDP Public Cloud or Private Cloud
Perfect. This article runs through a demo using CDP to do exactly that.
What is required for this demo?
CDP Public Cloud Account
Cloudera Data Engineering (CDE)...which includes airflow
Cloudera Data Warehouse (HiveLLAP)
Assets required to produce this demo:
Spark Code
https://github.com/sunileman/spark-CDE-SCD
Airflow Dag
https://github.com/sunileman/airflow-scd-dag
Data
products.psv (Type 2 dim)
product_changes.psv (changes that need to be applied to the product dimension table)
Workflow Details
Airflow will orchestrate the workflow. First, a CDE Spark job will pick up the product changes from s3. Spark will perform the required ETL and then write the output to a staging table (Hive External). Lastly, using the new Airflow CDW operator; a Hive ACID merge will be executed between the staging table and product dimension. Not rocket science. I know.
Data/Tables Setup
For this demo, there is a single dimension table; product. The other table is product_ext, which is an external table with raw data which needs to be applied to the product dimension. Very common stuff.
Product DDL
Note: Hive 3 tables are by default internal full acid support
CREATE TABLE IF NOT EXISTS product(
product_id INT,
product_name STRING,
aisle_id int,
department_id int,
start_date date,
end_date date,
is_current string DEFAULT 'Y')
Product_ext Schema
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket.
CREATE EXTERNAL TABLE IF NOT EXISTS product_ext(
product_id INT,
product_name STRING,
aisle_id int,
department_id int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 's3a://<YOUR-S3-BUCKET>/sunman/products/'
tblproperties ("skip.header.line.count"="1");
Load the Production dimension table
insert into product
select product_id, product_name, aisle_id, department_id, current_date() as start_date, null as end_date, 'Y' as is_current from product_ext
Lastly, upload the product_changes.psv file to s3
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket
s3a://<YOUR-S3-BUCKET>/sunman/product_changes/
Recap: A product dimension table has been created. A file with changes that need to be applied against the product dimension table has been uploaded to s3.
Cloudera Data Warehousing
Via CDW, a Hive ACID merge statement will merge the staging table with the product dimension. This will be triggered by Airflow using the new CDW operator. More on that later.
Grab the JDBC URL from the virtual warehouse
Note: SSO must be disabled
For example:
jdbc:hive2://zzz.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3;
Create an Airflow CDW Connection
To execute a CDW HQL statement(s), an airflow connection to CDW is required. The connection is referenced in the airflow dag, more on that later.
How to create a CDW Airflow connection: Automating data pipelines using Apache Airflow in Cloudera Data Engineering.
Important: Make note of the conn Id. It will be used later in this article
Create a CDE Spark Job
This job reads product_changes.psv file (contains the changes which need to be applied to the product dimension), performs cleansing/ETL, and stages the changes as an external Hive table.
The code is available on my Github page. If you're not interested in building the code, no worries. I have also provided the Spark jar which can be downloaded instead.
https://github.com/sunileman/spark-CDE-SCD/blob/master/target/scala-2.11/spark-scd_2.11-0.1.jar
CDE Spark Job Details
Note: For this demo to work, all job specs/args/configs must match the screen shot below
Note: Update <YOUR-S3-BUCKET> with your s3 bucket
Create a CDE Airflow Job
The airflow dag flow is the following:
First, a Spark CDE job will be called to stage the product changes into an external Hive table. Then CDW will be called to perform the Hive ACID Merge between the product dimension and staging table. The code for the dag is here:
https://github.com/sunileman/airflow-scd-dag
The dag file is airflow-scd.py
Open it and update the cli_conn_id. This is the Airflow CDW connection created earlier:
##https://docs.cloudera.com/data-engineering/cloud/manage-jobs/topics/cde-airflow-dag-pipeline.html
hive_cdw_job = CDWOperator(
task_id='cdw_hive_job',
dag=dag,
cli_conn_id='<YOUR-AIRFLOW-CDW-CONNECTION-ID>',
hql=hive_merge,
schema='default',
use_proxy_user=False,
query_isolation=False
)
Run the SCD workflow
Generally, airflow jobs can be executed through the UI. Since I parametrized the dag, at this time the only way to execute airflow job with run time configs is through CLI.
Download CDE CLI: Using the Cloudera Data Engineering command line interface
Note: Update <YOUR-S3-BUCKET> with your s3 bucket name
./cde job run --config c_stageTable='product_staged' --config c_sourceLoc='s3a://<YOUR-S3-BUCKET>/sunman/product_changes/' --config c_stageCleansedTable=product_staged_cleansed --config c_dimTable=product --name EDW-SCD-WorkFlow
This will return an airflow job ID. Return to the CDE UI. After the dag completes, run the following:
SELECT * FROM PRODUCT WHERE PRODUCT_ID = 232
Product ID 232 is one of several products, which was updated.
Wrap It Up!
This end-to-end SCD pipeline demonstrated the capability to use the right tool for the right job to accomplish a highly important EDW task. All driven by using Airflow's super powerful orchestration engine to hand off work between Apache Spark and Apache Hive. Spark for data processing. Hive for EDW (Acid Merge).
... View more