Member since
05-30-2018
1322
Posts
715
Kudos Received
148
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2483 | 08-20-2018 08:26 PM | |
995 | 08-15-2018 01:59 PM | |
1287 | 08-13-2018 02:20 PM | |
2220 | 07-23-2018 04:37 PM | |
2855 | 07-19-2018 12:52 PM |
05-05-2022
07:59 PM
2 Kudos
Cyber/Log data ingestion is super simple using CDP NiFi; however, it alone does not complete the full picture InfoSec & Threat hunters are interested in. Often these uber pros are looking for breadcrumbs/clues to shorten their "hunting" exercise. CDP DataViz immediately provides rich visualizations on high fidelity cyber/log data to shorten mean time to vulnerability detection. Additionally, I've observed these teams often need the ability to run ad hoc analysis, enrichment, and/or correlation on stream using an intuitive interface; not to mention, without writing code. Ah yes, SQL on Stream. The entire demo looks less than a few hours to build; proving in cyber, we don't always have to engineer (i.e. Grok) our way through difficult data challenges. Demo:
... View more
01-28-2022
03:11 PM
2 Kudos
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Prerequisite
Quick setup
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.lat.expression' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
}
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
... View more
Labels:
01-27-2022
04:21 PM
A short how-to article on how to list CDP Data hub Kafka topics via command line: Prerequisites CDP DataHub Kafka cluster Workload username & password CDP PC trust store Steps Fetch CDP cert and store in a trust store Creating TLS truststore or simple fetch it from any node /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks ssh into any Data hub Kafka node using CDP workload username/password Copy trust store onto the node Set broker host BROKER_HOST=$(hostname -f) Create jaas.conf KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<workload username>"
password="<workload password>";
}; Set KAFKA_OPTS environment variable to point to location where jaas.conf is located export KAFKA_OPTS=-Djava.security.auth.login.config=<LOCATION-OF-YOUR-JAASCONF>/jaas.conf Create client-kerberos.properties security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.kerberos.service.name=kafka
ssl.truststore.location=/<LOCATION-OF-YOUR-TRUSTSTORE>/truststore.jks
org.apache.kafka.common.security.plain.PlainLoginModule required username=<WORKLOAD-USER-NAME> password=<WORKLOAD-PASSWORD> Run Kafka-topics /opt/cloudera/parcels/CDH/bin/kafka-topics --list --bootstrap-server $BROKER_HOST:9093 --command-config <LOCATION-OF-YOUR-CLIENT-KERB-PROP-FILE>/client-kerberos.properties
... View more
Labels:
01-25-2022
01:30 PM
I often encounter the following question - "How to connect a Java application to CDP Kafka?". The good news is, it's simple.
Public docs: Connecting Kafka clients to Data Hub provisioned clusters
CDP Kafka, by default, is secure and governed. That means CDP Kafka uses TLS and CDP workload users for security and authorization. CDP Kafka supports all Apache Kafka authentication capabilities. Again, by default, CDP Kafka is automatically secured <SASL_SSL> and governed by Apache Ranger.
Prerequisites
CDP Public Cloud with a Kafka DataHub instance.
Connectivity requirements
To connect to CDP Kafka, the following is required:
CDP TrustStore
CDP Workload Username/Password
Kafka Broker Name
JaaS File
Java Example
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "connectivity-testing");
properties.put("sasl.kerberos.service.name", "kafka");
properties.put("ssl.truststore.location", "/home/sunilemanjee/truststore.jks");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CDP-WORKLOAD-USER>\" password=\"<CDP-WORKLOAD-PASSWORD>\";");
Full code here.
TrustStore
Fetch the TrustStore for CDP is well documented here.
CDP Workload Username/Password
Set and fetch CDP workload username/password
Kafka Broker Names
Fetch broker names from the CDP management console under DataHub, Hardware Tab
JaaS File
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cdp-workload-user"
password="cdp-workload-password;
};
That's it. Enjoy
... View more
11-16-2021
11:26 AM
My previous article demonstrated how to use the CDE-managed Airflow instance to read files from CML. To close the loop, here is a short demonstration on how to write files to CML using CDE's managed Airflow instance.
Artifacts
Airflow dag
Demo
... View more
11-09-2021
08:29 AM
1 Kudo
Cloudera docs run through a written example of using SparklyR within Cloudera Machine Learning (CML) to read from a Hive ACID table (Direct Reader Mode). Sometimes seeing is believing, and therefore, I'm posting a live demo of doing just that. All the artifacts used in the demo are publicly available.
CML SparklyR code
Use Direct Reader Mode with SparklyR
Demo
... View more
10-26-2021
11:19 AM
1 Kudo
Short demo on how to use CDE's new Airflow UI to design pipelines. Additionally leveraging Airflow triggers to orchestrate a typical job pipeline forking logic (triggers). A "Typical job pipeline" often encapsulates logic regarding success and failure paths. Artifacts Demo Video https://youtu.be/G-53Uf4qrFw Git Repo https://github.com/sunileman/cde-airflow-branch-operator
... View more
10-20-2021
09:14 AM
3 Kudos
Apache Airflow and Spark running on K8s provide unreasonable scalability. Cloudera Machine Learning also runs on K8s and often the ML users leverage Apache Airflow to orchestrate their pipelines. Apache Airflow within Cloudera Data Engineering now has the capability to orchestrate CML jobs via the new Airflow CML Operator. This time instead of writing up a detailed article, I recorded a live demo. Assets used in the demo CML Operator https://github.com/curtishoward/cml_operator/ Airflow Dag https://github.com/sunileman/cde-airflow-branch-operator Demo https://www.youtube.com/watch?v=n9o_Qimnggo Enjoy!
... View more
09-15-2021
07:08 PM
Yes the above comment on the functionality not working any longer is accurate. NiFi has gone through many enhancements and therefore not reflected in this article. I do not plan on updating the article due to the following: The objective of this article was to get nifi running in K8s because during that time NiFi on K8s offering did not exist within Hortonworks HDF. Recently NiFi on K8s has been GA'd. It's call DataFlow experience. Docs here:https://docs.cloudera.com/dataflow/cloud/index.html. If you want NiFi on K8s, that is your new playground.
... View more
08-30-2021
01:20 PM
1 Kudo
File processing + autoscaling seems to have an antinomic relationship. It doesn't have to be..really. Autoscaling often drives inefficient behaviors..why? The concomitant response... "It Autoscales". Autoscaling still requires sound design principles. Without that, It won't autoscale well.
Autoscaling within a distributed framework requires proper data dichotomization along with processing/service layer decoupling
Anti-Pattern
Taking an example of where I've seen the most amount of heartburn.
Large files (i.e. zip/tar/etc) land in s3/Storage area. The knee-jerk reaction is to feed it into a distributed processing engine that autoscales. The "autoscaling" part appropriately. It may. Most likely you're flipping a coin and lighting a candle, hoping that all works out. What if the file sizes are heterogeneous and the variances between them could be quite significant? What about error handling? Does it have to be all or nothing (meaning all files need to be processing or all fail)?
What if autoscaling is driven through smaller processing units (groups)?
Here we have taken the same payloads, but defrag them into smaller units. Each unit requires a heterogeneous compute footprint driven resource consumption efficiencies.
Technical Details
For example, a payload (myExamplePayload.zip) containers 1000 JSON files. Instead of throwing this entire payload at a single compute cluster (requiring the maximum number of resources possible, aka top-line compute profile)...defrag the payload.
As the payload arrives in s3, CDF-X's (Cloudera Data Flow Experience) s3 processor listens for any files. CDF-X will pull the files, decompress, and write all the files back to s3. For example:
s3://My-S3-Bucket/decompressed/file1.json, 3://My-S3-Bucket/decompressed/file2.json, ...
In parallel CDF-X generates CDE (Cloudera Data Engineering, Spark) a job spec, a JSON payload. The job spec includes file locations. Each job would be provided with roughly ~100 file names+location. Since CDF-X knows about the file size, it can also hint to CDE how much compute would be required to process these files. This step is not necessary as we are already defragged the unit of work to something manageable, and therefore CDE autoscaling should kick in and perform well. Once the job spec is created, CDF-X calls CDE over the rest sending over the job spec. CDE accepts the job spec and arguments (file locations) and runs the micro workloads. Each workload has its own heterogeneous compute profile and auto-scales independently.
Wrapping Up
Defragmentation of large singleton payloads enables autoscaling to run more efficiently. Autoscale is an incredibly powerful capability often misused by not applying sound design principles. Leveraging these simple patterns allow for ease of operations, cost control, and manageability.
... View more