Member since
05-30-2018
1322
Posts
715
Kudos Received
148
Solutions
04-09-2024
11:33 PM
Hi @sunile_manjee, Thank you for sharing the ingestion patterns. Can you please also tell whether Nifi can be considered as a worthy alternative to logstash? as logstash log processing pipeline is very easy to configure. I am trying to find an alternative to logstash. Any advice would be helpful.
... View more
10-24-2022
06:09 AM
@MaarufB Please make a new post with as much detail as you can around your question and use case. This is an old topic and will not get a good response in the comments. Feel free to @ tag me in the new post.
... View more
05-05-2022
07:59 PM
2 Kudos
Cyber/Log data ingestion is super simple using CDP NiFi; however, it alone does not complete the full picture InfoSec & Threat hunters are interested in. Often these uber pros are looking for breadcrumbs/clues to shorten their "hunting" exercise. CDP DataViz immediately provides rich visualizations on high fidelity cyber/log data to shorten mean time to vulnerability detection. Additionally, I've observed these teams often need the ability to run ad hoc analysis, enrichment, and/or correlation on stream using an intuitive interface; not to mention, without writing code. Ah yes, SQL on Stream. The entire demo looks less than a few hours to build; proving in cyber, we don't always have to engineer (i.e. Grok) our way through difficult data challenges. Demo:
... View more
Labels:
02-01-2022
08:45 PM
Some more examples here: https://github.com/asdaraujo/cdp-examples
... View more
01-28-2022
03:11 PM
2 Kudos
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Prerequisite
Quick setup
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.lat.expression' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
}
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
... View more
Labels:
01-25-2022
01:30 PM
I often encounter the following question - "How to connect a Java application to CDP Kafka?". The good news is, it's simple.
Public docs: Connecting Kafka clients to Data Hub provisioned clusters
CDP Kafka, by default, is secure and governed. That means CDP Kafka uses TLS and CDP workload users for security and authorization. CDP Kafka supports all Apache Kafka authentication capabilities. Again, by default, CDP Kafka is automatically secured <SASL_SSL> and governed by Apache Ranger.
Prerequisites
CDP Public Cloud with a Kafka DataHub instance.
Connectivity requirements
To connect to CDP Kafka, the following is required:
CDP TrustStore
CDP Workload Username/Password
Kafka Broker Name
JaaS File
Java Example
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "connectivity-testing");
properties.put("sasl.kerberos.service.name", "kafka");
properties.put("ssl.truststore.location", "/home/sunilemanjee/truststore.jks");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CDP-WORKLOAD-USER>\" password=\"<CDP-WORKLOAD-PASSWORD>\";");
Full code here.
TrustStore
Fetch the TrustStore for CDP is well documented here.
CDP Workload Username/Password
Set and fetch CDP workload username/password
Kafka Broker Names
Fetch broker names from the CDP management console under DataHub, Hardware Tab
JaaS File
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cdp-workload-user"
password="cdp-workload-password;
};
That's it. Enjoy
... View more
Labels:
11-16-2021
11:26 AM
My previous article demonstrated how to use the CDE-managed Airflow instance to read files from CML. To close the loop, here is a short demonstration on how to write files to CML using CDE's managed Airflow instance.
Artifacts
Airflow dag
Demo
... View more
Labels:
10-26-2021
11:19 AM
1 Kudo
Short demo on how to use CDE's new Airflow UI to design pipelines. Additionally leveraging Airflow triggers to orchestrate a typical job pipeline forking logic (triggers). A "Typical job pipeline" often encapsulates logic regarding success and failure paths. Artifacts Demo Video https://youtu.be/G-53Uf4qrFw Git Repo https://github.com/sunileman/cde-airflow-branch-operator
... View more
10-25-2021
08:27 PM
One problem. SQLException: ERROR 1088 (44A19): Cannot create an index on a mutable table that has a ROW_TIMESTAMP column. tableName=<blah> So if you pick this strategy you can only index your table if you set rows as immutable.
... View more
10-20-2021
09:14 AM
3 Kudos
Apache Airflow and Spark running on K8s provide unreasonable scalability. Cloudera Machine Learning also runs on K8s and often the ML users leverage Apache Airflow to orchestrate their pipelines. Apache Airflow within Cloudera Data Engineering now has the capability to orchestrate CML jobs via the new Airflow CML Operator. This time instead of writing up a detailed article, I recorded a live demo. Assets used in the demo CML Operator https://github.com/curtishoward/cml_operator/ Airflow Dag https://github.com/sunileman/cde-airflow-branch-operator Demo https://www.youtube.com/watch?v=n9o_Qimnggo Enjoy!
... View more