Member since
05-30-2018
1322
Posts
712
Kudos Received
148
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1851 | 08-20-2018 08:26 PM | |
709 | 08-15-2018 01:59 PM | |
897 | 08-13-2018 02:20 PM | |
1598 | 07-23-2018 04:37 PM | |
2240 | 07-19-2018 12:52 PM |
05-05-2022
07:59 PM
2 Kudos
Cyber/Log data ingestion is super simple using CDP NiFi; however, it alone does not complete the full picture InfoSec & Threat hunters are interested in. Often these uber pros are looking for breadcrumbs/clues to shorten their "hunting" exercise. CDP DataViz immediately provides rich visualizations on high fidelity cyber/log data to shorten mean time to vulnerability detection. Additionally, I've observed these teams often need the ability to run ad hoc analysis, enrichment, and/or correlation on stream using an intuitive interface; not to mention, without writing code. Ah yes, SQL on Stream. The entire demo looks less than a few hours to build; proving in cyber, we don't always have to engineer (i.e. Grok) our way through difficult data challenges. Demo:
... View more
01-28-2022
03:11 PM
2 Kudos
Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it.
Demo
Flink SSB Credit Card Fraud Demo
Prerequisite
Quick setup
Note
If you experience any challenges with running SSB, as a sanity check run the following on the cluster
flink-yarn-session -d \
-D security.kerberos.login.keytab=<keytab_filename>.keytab \
-D security.kerberos.login.principal=<workload_username>
If the above works as a sanity check, kill the yarn job.
yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list. Then run this to kill the app
yarn application -kill application_xxxx
customer_inferences table
CREATE TEMPORARY TABLE customer_inferences (
`customer_id` INT,
`account_number` STRING,
`center_inferred_lat` FLOAT,
`center_inferred_lon` FLOAT,
`max_inferred_distance` STRING,
`max_inferred_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.account_number.expression' = '#{IdNumber.valid}',
'fields.center_inferred_lat.expression' = '#{Address.latitude}',
'fields.center_inferred_lon.expression' = '#{Address.longitude}',
'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
);
cc_charges table
CREATE TEMPORARY TABLE cc_charges (
`customer_id` INT,
`lat` FLOAT,
`lon` FLOAT,
`location` STRING,
`charge_amount` FLOAT
)
WITH (
'connector' = 'faker',
'rows-per-second' = '5',
'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
'fields.lat.expression' = '#{Address.latitude}',
'fields.lon.expression' = '#{Address.longitude}',
'fields.location.expression' = '#{Friends.location}',
'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
);
SQL to detect fraud
select ci.account_number, cc.charge_amount,
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE
2 * 3961 * asin(sqrt(
power(
power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2)
+ cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat))
* (sin(radians((cc.lon - ci.center_inferred_lon) / 2)))
, 2))) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
DISTANCE_BETWEEN function
function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
function toRad(x) {
return x * Math.PI / 180;
}
lat1 = parseFloat(lat1);
lon1 = parseFloat(lon1);
lat2 = parseFloat(lat2);
lon2 = parseFloat(lon2);
var R = 6371; // km
var x1 = lat2 - lat1;
var dLat = toRad(x1);
var x2 = lon2 - lon1;
var dLon = toRad(x2)
var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
Math.sin(dLon / 2) * Math.sin(dLon / 2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
var d = R * c;
// convert to miles
return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3)
SQL to detect fraud using DISTANCE_BETWEEN function
select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance
OR cc.charge_amount > ci.max_inferred_amount
... View more
Labels:
01-27-2022
04:21 PM
A short how-to article on how to list CDP Data hub Kafka topics via command line: Prerequisites CDP DataHub Kafka cluster Workload username & password CDP PC trust store Steps Fetch CDP cert and store in a trust store Creating TLS truststore or simple fetch it from any node /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks ssh into any Data hub Kafka node using CDP workload username/password Copy trust store onto the node Set broker host BROKER_HOST=$(hostname -f) Create jaas.conf KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<workload username>"
password="<workload password>";
}; Set KAFKA_OPTS environment variable to point to location where jaas.conf is located export KAFKA_OPTS=-Djava.security.auth.login.config=<LOCATION-OF-YOUR-JAASCONF>/jaas.conf Create client-kerberos.properties security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.kerberos.service.name=kafka
ssl.truststore.location=/<LOCATION-OF-YOUR-TRUSTSTORE>/truststore.jks
org.apache.kafka.common.security.plain.PlainLoginModule required username=<WORKLOAD-USER-NAME> password=<WORKLOAD-PASSWORD> Run Kafka-topics /opt/cloudera/parcels/CDH/bin/kafka-topics --list --bootstrap-server $BROKER_HOST:9093 --command-config <LOCATION-OF-YOUR-CLIENT-KERB-PROP-FILE>/client-kerberos.properties
... View more
Labels:
01-25-2022
01:30 PM
I often encounter the following question - "How to connect a Java application to CDP Kafka?". The good news is, it's simple.
Public docs: Connecting Kafka clients to Data Hub provisioned clusters
CDP Kafka, by default, is secure and governed. That means CDP Kafka uses TLS and CDP workload users for security and authorization. CDP Kafka supports all Apache Kafka authentication capabilities. Again, by default, CDP Kafka is automatically secured <SASL_SSL> and governed by Apache Ranger.
Prerequisites
CDP Public Cloud with a Kafka DataHub instance.
Connectivity requirements
To connect to CDP Kafka, the following is required:
CDP TrustStore
CDP Workload Username/Password
Kafka Broker Name
JaaS File
Java Example
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "connectivity-testing");
properties.put("sasl.kerberos.service.name", "kafka");
properties.put("ssl.truststore.location", "/home/sunilemanjee/truststore.jks");
properties.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CDP-WORKLOAD-USER>\" password=\"<CDP-WORKLOAD-PASSWORD>\";");
Full code here.
TrustStore
Fetch the TrustStore for CDP is well documented here.
CDP Workload Username/Password
Set and fetch CDP workload username/password
Kafka Broker Names
Fetch broker names from the CDP management console under DataHub, Hardware Tab
JaaS File
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cdp-workload-user"
password="cdp-workload-password;
};
That's it. Enjoy
... View more
11-16-2021
11:26 AM
My previous article demonstrated how to use the CDE-managed Airflow instance to read files from CML. To close the loop, here is a short demonstration on how to write files to CML using CDE's managed Airflow instance.
Artifacts
Airflow dag
Demo
... View more
11-09-2021
08:29 AM
Cloudera docs run through a written example of using SparklyR within Cloudera Machine Learning (CML) to read from a Hive ACID table (Direct Reader Mode). Sometimes seeing is believing, and therefore, I'm posting a live demo of doing just that. All the artifacts used in the demo are publicly available.
CML SparklyR code
Use Direct Reader Mode with SparklyR
Demo
... View more
10-26-2021
11:19 AM
1 Kudo
Short demo on how to use CDE's new Airflow UI to design pipelines. Additionally leveraging Airflow triggers to orchestrate a typical job pipeline forking logic (triggers). A "Typical job pipeline" often encapsulates logic regarding success and failure paths. Artifacts Demo Video https://youtu.be/G-53Uf4qrFw Git Repo https://github.com/sunileman/cde-airflow-branch-operator
... View more
10-20-2021
09:14 AM
2 Kudos
Apache Airflow and Spark running on K8s provide unreasonable scalability. Cloudera Machine Learning also runs on K8s and often the ML users leverage Apache Airflow to orchestrate their pipelines. Apache Airflow within Cloudera Data Engineering now has the capability to orchestrate CML jobs via the new Airflow CML Operator. This time instead of writing up a detailed article, I recorded a live demo. Assets used in the demo CML Operator https://github.com/curtishoward/cml_operator/ Airflow Dag https://github.com/sunileman/cde-airflow-branch-operator Demo https://www.youtube.com/watch?v=n9o_Qimnggo Enjoy!
... View more
09-15-2021
07:08 PM
Yes the above comment on the functionality not working any longer is accurate. NiFi has gone through many enhancements and therefore not reflected in this article. I do not plan on updating the article due to the following: The objective of this article was to get nifi running in K8s because during that time NiFi on K8s offering did not exist within Hortonworks HDF. Recently NiFi on K8s has been GA'd. It's call DataFlow experience. Docs here:https://docs.cloudera.com/dataflow/cloud/index.html. If you want NiFi on K8s, that is your new playground.
... View more
08-30-2021
01:20 PM
1 Kudo
File processing + autoscaling seems to have an antinomic relationship. It doesn't have to be..really. Autoscaling often drives inefficient behaviors..why? The concomitant response... "It Autoscales". Autoscaling still requires sound design principles. Without that, It won't autoscale well.
Autoscaling within a distributed framework requires proper data dichotomization along with processing/service layer decoupling
Anti-Pattern
Taking an example of where I've seen the most amount of heartburn.
Large files (i.e. zip/tar/etc) land in s3/Storage area. The knee-jerk reaction is to feed it into a distributed processing engine that autoscales. The "autoscaling" part appropriately. It may. Most likely you're flipping a coin and lighting a candle, hoping that all works out. What if the file sizes are heterogeneous and the variances between them could be quite significant? What about error handling? Does it have to be all or nothing (meaning all files need to be processing or all fail)?
What if autoscaling is driven through smaller processing units (groups)?
Here we have taken the same payloads, but defrag them into smaller units. Each unit requires a heterogeneous compute footprint driven resource consumption efficiencies.
Technical Details
For example, a payload (myExamplePayload.zip) containers 1000 JSON files. Instead of throwing this entire payload at a single compute cluster (requiring the maximum number of resources possible, aka top-line compute profile)...defrag the payload.
As the payload arrives in s3, CDF-X's (Cloudera Data Flow Experience) s3 processor listens for any files. CDF-X will pull the files, decompress, and write all the files back to s3. For example:
s3://My-S3-Bucket/decompressed/file1.json, 3://My-S3-Bucket/decompressed/file2.json, ...
In parallel CDF-X generates CDE (Cloudera Data Engineering, Spark) a job spec, a JSON payload. The job spec includes file locations. Each job would be provided with roughly ~100 file names+location. Since CDF-X knows about the file size, it can also hint to CDE how much compute would be required to process these files. This step is not necessary as we are already defragged the unit of work to something manageable, and therefore CDE autoscaling should kick in and perform well. Once the job spec is created, CDF-X calls CDE over the rest sending over the job spec. CDE accepts the job spec and arguments (file locations) and runs the micro workloads. Each workload has its own heterogeneous compute profile and auto-scales independently.
Wrapping Up
Defragmentation of large singleton payloads enables autoscaling to run more efficiently. Autoscale is an incredibly powerful capability often misused by not applying sound design principles. Leveraging these simple patterns allow for ease of operations, cost control, and manageability.
... View more
08-27-2021
02:24 PM
2 Kudos
Recently I was tasked to build a possible pattern regarding how to handle slowing changing dimensions (Type 2 to be specific) within CDP. The obvious answer is to use Hive ACID, but clearly, that answer is too generic. I needed to build a pipeline similar to what I used to do as an Informatica developer and verify legitimacy to the solution/pattern. Here we go.
Why?
The objective was to build a possible (one of many) pattern on how to handle SCD Type 2 dimensions with CDP. Outcome: I was able to repeat a typical ETL workflow with CDP. How? By taking off my horse blinders...Really. What are SCDs?
For those who may not be family with slowly changing dimensions within a EDW context, here is a great quick read: Types Of Dimension Tables
How?
Data lands in a sourcing area
Super typical
Pull source data & run it through a cleaning/processing layer for staging
Ready for merging against SCD table
Run an ACID merge between stage and SCD Table
Not rocket science
I don't believe you...I wanna try this in CDP Public Cloud or Private Cloud
Perfect. This article runs through a demo using CDP to do exactly that.
What is required for this demo?
CDP Public Cloud Account
Cloudera Data Engineering (CDE)...which includes airflow
Cloudera Data Warehouse (HiveLLAP)
Assets required to produce this demo:
Spark Code
https://github.com/sunileman/spark-CDE-SCD
Airflow Dag
https://github.com/sunileman/airflow-scd-dag
Data
products.psv (Type 2 dim)
product_changes.psv (changes that need to be applied to the product dimension table)
Workflow Details
Airflow will orchestrate the workflow. First, a CDE Spark job will pick up the product changes from s3. Spark will perform the required ETL and then write the output to a staging table (Hive External). Lastly, using the new Airflow CDW operator; a Hive ACID merge will be executed between the staging table and product dimension. Not rocket science. I know.
Data/Tables Setup
For this demo, there is a single dimension table; product. The other table is product_ext, which is an external table with raw data which needs to be applied to the product dimension. Very common stuff.
Product DDL
Note: Hive 3 tables are by default internal full acid support
CREATE TABLE IF NOT EXISTS product(
product_id INT,
product_name STRING,
aisle_id int,
department_id int,
start_date date,
end_date date,
is_current string DEFAULT 'Y')
Product_ext Schema
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket.
CREATE EXTERNAL TABLE IF NOT EXISTS product_ext(
product_id INT,
product_name STRING,
aisle_id int,
department_id int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION 's3a://<YOUR-S3-BUCKET>/sunman/products/'
tblproperties ("skip.header.line.count"="1");
Load the Production dimension table
insert into product
select product_id, product_name, aisle_id, department_id, current_date() as start_date, null as end_date, 'Y' as is_current from product_ext
Lastly, upload the product_changes.psv file to s3
Note: Replace <YOUR-S3-BUCKET> with your s3 bucket
s3a://<YOUR-S3-BUCKET>/sunman/product_changes/
Recap: A product dimension table has been created. A file with changes that need to be applied against the product dimension table has been uploaded to s3.
Cloudera Data Warehousing
Via CDW, a Hive ACID merge statement will merge the staging table with the product dimension. This will be triggered by Airflow using the new CDW operator. More on that later.
Grab the JDBC URL from the virtual warehouse
Note: SSO must be disabled
For example:
jdbc:hive2://zzz.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3;
Create an Airflow CDW Connection
To execute a CDW HQL statement(s), an airflow connection to CDW is required. The connection is referenced in the airflow dag, more on that later.
How to create a CDW Airflow connection: Automating data pipelines using Apache Airflow in Cloudera Data Engineering.
Important: Make note of the conn Id. It will be used later in this article
Create a CDE Spark Job
This job reads product_changes.psv file (contains the changes which need to be applied to the product dimension), performs cleansing/ETL, and stages the changes as an external Hive table.
The code is available on my Github page. If you're not interested in building the code, no worries. I have also provided the Spark jar which can be downloaded instead.
https://github.com/sunileman/spark-CDE-SCD/blob/master/target/scala-2.11/spark-scd_2.11-0.1.jar
CDE Spark Job Details
Note: For this demo to work, all job specs/args/configs must match the screen shot below
Note: Update <YOUR-S3-BUCKET> with your s3 bucket
Create a CDE Airflow Job
The airflow dag flow is the following:
First, a Spark CDE job will be called to stage the product changes into an external Hive table. Then CDW will be called to perform the Hive ACID Merge between the product dimension and staging table. The code for the dag is here:
https://github.com/sunileman/airflow-scd-dag
The dag file is airflow-scd.py
Open it and update the cli_conn_id. This is the Airflow CDW connection created earlier:
##https://docs.cloudera.com/data-engineering/cloud/manage-jobs/topics/cde-airflow-dag-pipeline.html
hive_cdw_job = CDWOperator(
task_id='cdw_hive_job',
dag=dag,
cli_conn_id='<YOUR-AIRFLOW-CDW-CONNECTION-ID>',
hql=hive_merge,
schema='default',
use_proxy_user=False,
query_isolation=False
)
Run the SCD workflow
Generally, airflow jobs can be executed through the UI. Since I parametrized the dag, at this time the only way to execute airflow job with run time configs is through CLI.
Download CDE CLI: Using the Cloudera Data Engineering command line interface
Note: Update <YOUR-S3-BUCKET> with your s3 bucket name
./cde job run --config c_stageTable='product_staged' --config c_sourceLoc='s3a://<YOUR-S3-BUCKET>/sunman/product_changes/' --config c_stageCleansedTable=product_staged_cleansed --config c_dimTable=product --name EDW-SCD-WorkFlow
This will return an airflow job ID. Return to the CDE UI. After the dag completes, run the following:
SELECT * FROM PRODUCT WHERE PRODUCT_ID = 232
Product ID 232 is one of several products, which was updated.
Wrap It Up!
This end-to-end SCD pipeline demonstrated the capability to use the right tool for the right job to accomplish a highly important EDW task. All driven by using Airflow's super powerful orchestration engine to hand off work between Apache Spark and Apache Hive. Spark for data processing. Hive for EDW (Acid Merge).
... View more
01-20-2021
12:55 PM
2 Kudos
Credits to @mbawa (Mandeep Singh Bawa) who co-built all the assets in this article. Thank you! We (Mandeep and I) engaged on a customer use case where Cloudera Data Engineering (Spark) jobs were triggered once a file lands in S3 (details on how to trigger CDE from Lambda here). Triggering CDE jobs is quite simple; however, we needed much more. Here are a few of the requirements: Decoupling Ingestion Layer / Processing Layer Decoupling apps (sender) from Spark Apps can send and forget payloads without the burden of configuring Spark (#number of executors, memory/cpu, etc), the concern of Spark availability (Upgrades, resources availability, etc), or application impacts from CDE API changes Real-time changes to where CDE jobs are sent (Multi CDE) Monitor job status and alerts Monitoring job run times and alerts which may be out-of-spec runtimes Failover to Secondary CDE Throttling Authentication It may look as though we are trying to make NiFi into an orchestration engine for CDE. That's not the case. Here we are trying to fill some core objectives and leveraging capabilities within the platform to accomplish the above-stated task. CDE comes with Apache Airflow, a much richer orchestration engine. Here we are integrating AWS triggers, multiple CDE clusters, monitoring, alerting, and single API for multi clusters. Artifacts NiFi CDE Jobs Pipeline Workflow Streams Messaging Cluster (Kafka) CDF clusters (NiFi) Heavy usage of NiFi parameters High-Level WorkFlow At a high level, the NiFi workflow does the following: Exposes a single rest endpoint for CDE job submission CDE workload balancing between multiple CDE clusters If only a single CDE cluster is available, it will queue jobs until compute bandwidth is available Queue jobs if CDE clusters are too busy Jobs will re-run if set in the queue If the number of retry for a job spec is greater than 3 (parameterized), an alert will be triggered Monitor jobs from start to finish Alert if job Fails Run time out of predetermined max run time i.e. jobs run for 10 minutes and max run time for jobs is set to 5 minutes Setup The following NiFi parameters will be required api_token (CDE Token, more on this later) Set to ${cdeToken} job-runtime-threshold-ms Max run time a job should run before an alert is triggered kbrokers Kafka brokers ktopic-fail Kafka topic: cde-job-failures ktopic-inbound-jobs Kafka topic: cde-jobs ktopic-job-monitoring Kafka topic: cde-job-monitoring ktopic-job-runtime-over-limit Kafka topic: cde-job-runtime-alert ktopic-retry Kafka topic: cde-retry username CDE Machine user password CDE machine user password primary-vc-token-api CDE token api (more on this later) primary_vc_jobs_api CDE Primary cluster jobs api (more on this later) secondary-vc-available Y/N If secondary CDE cluster is available, set to Y, else N secondary_vc_jobs_api CDE secondary cluster jobs API if the secondary cluster is available run_count_limit Max number of concurrent running jobs per CDE cluster i.e. 20 wait-count-max Max retry count. If a job is unable to be submitted to CDE (ie due to be too busy), how many times should NiFi retry before adding job to Kafka ktopic-fail topic i.e. 5 start_count_limit Max number of concurrent starting jobs per CDE cluster i.e. 20 Note: When you run the workflow for the first time, generally the Kafka topics will be automatically created for you. Detailed WorkFlow Once a CDE job spec is sent to NiFi, NiFi does the following: Write job spec to Kafka k topic-inbound-jobs (nifi parameter) topic Pull jobs from Kafka k topic-inbound-jobs (nifi parameter) topic New jobs- k topic-inbound-jobs (nifi parameter) topic retry jobs- ktopic-retry (nifi parameter) topic Monitoring jobs- ktopic-job-monitoring (nifi parameter) topic Fetch CDE API tokens Check if the primary cluster current run count is less than run_count_limit (nifi parameter) Check if the primary cluster current starting count is less than start_count_limit (nifi parameter) If run or start counts are not within limit, retry the same logic on the secondary cluster (if available, secondary-vc-available) If run/start counts are within limit, job spec will be submitted to CDE If run/start counts are not within limit for primary and secondary CDE and the number of retries is less than wait-count-max (nifi parameter), job spec will be written to a Kafka ktopic-retry topic (nifi parameter) Monitoring NiFi will call CDE to determine the current status of Job ID (pulled from ktopic-job-monitoring) If the job end is successful, nothing more will happen here. If the job ends with failure, job spec will be written to Kafka ktopic-fail topic If the job is running and run time is less than job-runtime-threshold-ms Write job spec to ktopic-job-monitoring Else send an alert (nifi parameter) CDE APIs To get started, CDE primary and secondary (if available) cluster API details are needed in NiFi as parameters: To fetch the token API, click the pencil icon: Click on Grafana URL: The URL will look something like this: https://service.cde-zzzzzz.moad-aw.aaaaa-aaaa.cloudera.site/grafana/d/sK1XDusZz/kubernetes?orgId=1&refresh=5s Set the NiFi parameter primary-vc-token-api to the first part of the URL: service.cde-zzzzzz.moad-aw.aaaaa-aaaa.cloudera.site Now get the Jobs API for both primary and secondary (if available). For a virtual cluster, Click the pencil icon Click Jobs API URL to copy the URL The jobs URL will look something like this: https://aaa.cde-aaa.moad-aw.aaa-aaa.cloudera.site/dex/api/v1 Fetch the first part of the URL and set the NiFi parameter primary_vc_jobs_api. Do the same steps for secondary_vc_jobs_api aaa.cde-aaa.moad-aw.aaa-aaa.cloudera.site Run a CDE job Inside of the NiFi workflow, there is a test flow to verify the NiFi CDE jobs pipeline works: To run the flow, inside of InvokeHTTP, set the URL to one of the NiFi nodes. Run it and if the integration is working successfully; you will see a job running in CDE. Enjoy! Oh, by the way, I plan on publishing a video walking through the NiFi flow.
... View more
11-09-2020
01:34 PM
1 Kudo
Recently I ran into a scenario requiring to connect my Spark Intellij IDE to Kafka DataHub. I'm not going to claim the status of a pro at IDE secure setup. Therefore for novices in the security realm alike, they may find this article useful This article will go through steps setting up an Spark Scala IDE (Intellij) (with a supplied working code example) to connect securely to a Kafka DataHub over SASL_SSL protocol using PLAIN SASL mechanism. Artifacts https://github.com/sunileman/spark-kafka-streaming Scala Object https://github.com/sunileman/spark-kafka-streaming/blob/master/src/main/scala/KafkaSecureStreamSimpleLocalExample.scala The scala object accepts 2 inputs Target Kafka topic Kafka broker(s) Prequequites Kafka DataHub Instances Permission setup on Ranger to be able to read/write from Kafka Intellij (or similar) with the Scala plugin installed Workload username and password TrustStore Andre Sousa Dantas De Araujo did a great job explaining (very simply) how get the certificate from CDP and create a truststore. Just a few simple steps here https://github.com/asdaraujo/cdp-examples#tls-truststore I stored it here on my local machine which is referenced in the spark scala code ./src/main/resources/truststore.jks JaaS Setup Create a jaas.conf file KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="YOUR-WORKLOAD-USER"
password="YOUR-WORKLOAD-PASSWORD";
}; I stored mine here which is referenced in the spark scala code ./src/main/resources/jaas.conf Spark Session (Scala Code) Master is set to local set spark.driver.extraJavaOptions and spark.executor.extraJavaOptions to the location of your jaas.conf set spark.kafka.ssl.truststore.location to the location of your truststore val spark = SparkSession.builder
.appName("Spark Kafka Secure Structured Streaming Example")
.master("local")
.config("spark.kafka.bootstrap.servers", kbrokers)
.config("spark.kafka.sasl.kerberos.service.name", "kafka")
.config("spark.kafka.security.protocol", "SASL_SSL")
.config("kafka.sasl.mechanism", "PLAIN")
.config("spark.driver.extraJavaOptions", "-Djava.security.auth.login.config=./src/main/resources/jaas.conf")
.config("spark.executor.extraJavaOptions", "-Djava.security.auth.login.config=./src/main/resources/jaas.conf")
.config("spark.kafka.ssl.truststore.location", "./src/main/resources/truststore.jks")
.getOrCreate() Write to Kafka The data in the dataframe is hydrated via csv file. Here I will simply read the dataframe and write it back out to a Kafka topic val ds = streamingDataFrame.selectExpr("CAST(id AS STRING)", "CAST(text AS STRING) as value")
.writeStream.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", kbrokers)
.option("topic", ktargettopic)
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.ssl.truststore.location", "./src/main/resources/truststore.jks")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("checkpointLocation", "/tmp/spark-checkpoint2/")
.start()
.awaitTermination() Run Supply JVM option, provide the location of the jaas.conf -Djava.security.auth.login.config=/PATH-TO-YOUR-jaas.conf Supply the program arguments. My code takes 2, kafka topic and Kafka broker(s) sunman my-kafka-broker:9093 That's it! Run it and enjoy secure SparkStreaming+Kafka glory
... View more
09-11-2020
12:47 PM
1 Kudo
Recently I was engaged in a use case where CDE processing was required to be triggered once data landed on s3. The s3 trigger in AWS would be via a Lambda function. As the files/data land in s3, an AWS Lambda function would be triggered to then call CDE to process the data/files. Lambda functions at trigger time include the names and locations of the files the trigger was executed upon. The file locations/names would be passed onto the CDE engine to pick up and process accordingly.
Prerequisites to run this demo
AWS account
s3 Bucket
Some knowledge of Lambda
CDP and CDE
Artifacts
AWS Lambda function code
https://github.com/sunileman/spark-kafka-streaming/blob/master/src/main/awslambda/triggerCDE.py
CDE Spark Job, main class com.cloudera.examples.SimpleCDERun
Code for class com.cloudera.examples.SimpleCDERun
https://github.com/sunileman/spark-kafka-streaming
Prebuilt jar
https://sunileman.s3.amazonaws.com/CDE/spark-kafka-streaming_2.11-1.0.jar
Processing Steps
Create a CDE Job (Jar provided above)
Create a Lambda function on an s3 bucket (Code provided above)
Trigger on put/post
Load a file or files on s3 (any file)
AWS Lambda is triggered by this event which calls CDE. The call to CDE will include the locations and names of all files the trigger was executed upon
CDE will launch, processing the files, and end gracefully
It's quite simple.
Create a CDE Job
Name: Any Name. I called it testjob
Spark Application: Jar file provided above
Main Class: com.cloudera.examples.SimpleCDERun
Lambda
Create an AWS Lambda function to trigger on put/post for s3. The lambda function code is simple. It will call CDE for each file posted to s3. Lambda function provided in the artifacts section above.
The following are the s3 properties:
Trigger CDE
Upload a file to s3. Lambda will trigger the CDE job. For example, I uploaded a file test.csv to s3. Once the file was uploaded, Lambda calls CDE to execute a job on that file
Lambda Log
The first arrow shows the file name (test.csv). The second arrow shows the CDE JobID, which in this case returned the number 14.
In CDE, Job Run ID: 14
In CDE stdout logs show that the job received the location and name of the file which Lambda was triggered upon.
As I said in my last post, CDE is making things super simple. Enjoy.
... View more
Labels:
08-28-2020
09:44 AM
2 Kudos
The all new Cloudera Data Engineering Experience I recently had the opportunity to work with Cloudera Data Engineering to stream data from Kafka. It's quite interesting how I was able to deploy code without much worry about how to configure the back end components. Demonstration This demo will pull from the Twitter API using NiFi, write to payload to a Kafka topic named "twitter". Spark Streaming on Cloudera Data Engineering Experience CDE will pull from the twitter topic, extract the text field from the payload (which is the tweet itself) and write back to another Kafka topic named "tweet" The following is an example of a twitter payload. The objective is to extract only the text field: What is Cloudera Data Engineering? Cloudera Data Engineering (CDE) is a serverless service for Cloudera Data Platform that allows you to submit Spark jobs to an auto-scaling cluster. CDE enables you to spend more time on your applications, and less time on infrastructure. How do I begin with Cloudera Data Engineering (CDE)? Complete setup instructions here. Prerequisites Access to a CDE Some understanding of Apache Spark Access to a Kafka cluster In this demo, I use Cloudera DataHub, Streamings Messaging for rapid deployment of a Kafka Cluster on AWS An IDE I use Intellij I do provide the jar later on in this article Twitter API developer Access: https://developer.twitter.com/en/portal/dashboard Setting up a twitter stream I use Apache NiFi deployed via Cloudera DataHub on AWS Source Code I posted all my source code here. If you're not interested in building the jar, that's fine. I’ve made the job Jar available here. Oc t26, 2020 update - I added source code for how to connect CDE to Kafka DH available here. Users should be able to run the code as is without need for jaas or keytab. Kafka Setup This article is focused on Spark Structured Streaming with CDE. I'll be super brief here Create two Kafka topics twitter This topic is used to ingest the firehose data from twitter API tweet This topic is used post tweet extraction performed via Spark Structured streaming NiFi Setup This article is focused on Spark Structured Streaming with CDE. I'll be super brief here. Use the GetTwitter processor (which requires twitter api developer account, free) and write to the Kafka twitter topic Spark Code (Scala) Load up the Spark code on your machine from here: https://github.com/sunileman/spark-kafka-streaming Fire off a sbt clean and package A new jar will be available under target: spark-kafka-streaming_2.11-1.0.jar The jar is available here What does the code do? It will pull from the source Kafka topic (twitter), extract the text value from the payload (which is the tweet itself) and write to the target topic (tweet) CDE Assuming CDE access is available, navigate to virtual clusters->View Jobs Click on Create Job: Job Details Name Job Name Spark Application File This is the jar created from the sbt package: spark-kafka-streaming_2.11-1.0.jar Another option is to simply provide the URL where the jar available https://sunileman.s3.amazonaws.com/CDE/spark-kafka-streaming_2.11-1.0.jar Main Class com.cloudera.examples.KafkaStreamExample Arguments arg1 Source Kafka topic: twitter arg2 Target Kafka topic: tweet arg3 Kafka brokers: kafka1:9092,kafka2:9092,kafka3:9092 From here jobs can be created and run or simply created. Click on Create and Run to view the job run: To view the metrics about the streaming: At this point, only the text (tweet) from the twitter payload is being written to the tweet Kafka topic. That's it! You now have a spark structure stream running on CDE fully autoscaled. Enjoy
... View more
08-25-2020
07:16 AM
I am trying running nifi on AWS with a ELB fronting it. I use ListenHTTP (port 7777)and need to do a health check via ELB. It seems listenhttp only supports HEAD and POST. ELB only supports health check via GET. Any ideas how I can accomplish this to determine health status of port 7777 using ELB health checks?
... View more
Labels:
- Labels:
-
Apache NiFi
08-11-2020
01:40 PM
2 Kudos
Image Courtesy: k9s
I recently ran into a scenario where I needed to gather Hive logs on the new Data Warehouse Experience on AWS. The "old" way of fetching logs was to SSH into the nodes. Data Warehouse Experience is now deployed on K8s, so SSHing is off the table. Therefore a tool like K9s is key. This is a raw article to quickly demonstrate how to use K9s to fetch Data Warehouse Experience logs which are deployed on AWS K8s
Prerequisites
Data Warehouse Experience
K9s installed on your machine
AWS ARN (instructions provided below)
AWS configure (CLI) pointing to your AWS env. Simply type AWS configure via CLI and point to the correct AWS subscription
AWS ARN
Your AWS ARN is required to successfully connect K9s to CDW(DW-X)
On AWS, go to IAM > Users > Search for your user name:
Click on your username to fetch the ARN:
Kubeconfig
Connecting to DW-X using K9s requires kubeconfig. DW-X makes this available under DW-X-> Environments > Your Environment > Show Kubeconfig.
Click on the copy option and make the contents available within a file in your machine file system. For example, I stored the kubeconfig contents here: /Users/sunile.manjee/.k9s/kubeconfig.yml
ARN
To access K8s from K9s, your ARN will need to be added under Grant Access:
K9s
Now all is set up to connect to DW-X K8s using K9s. Reference kubeconfig.yml file when using K9s
k9s --kubeconfig /Users/sunile.manjee/.k9s/kubeconfig.yml
That's it. From here the logs are made available and a ton of other metrics. For more information on how to use K9s, see k9scli.io
... View more
Labels:
05-04-2020
10:38 AM
1 Kudo
The EFM (Edge Flow Manager) makes it super simple to write flows for MiNiFi to execute where ever it may be located (laptops, refineries, phones, OpenShift,etc). All agents (MiNiFi) are assigned an agentClass. Once the agent is turned on, it will phone home to EFM for run-time instructions. The run-time instructions are set at the Class level. Meaning all agents within a class, run the same instruction (flow) set. There can be 0 to many Classes. In this example, I will capture Windows Security Events via MiNiFi and ship them to NiFi over Site2Site
Download MiNiFi MSI and set the classname. In this example, I set the classname to test6. This property is set at install time (MSI) or by going directly into minifi.properties. Also, notice the setting nifi.c2.enable=true. This informs MiNFi that run time flow instructions will be received from EFM. Start MiNiFi.
MiNiFi can be configured to send data to multi endpoint (ie Kafka, NiFi, EventHub, etc). In this example, data will be sent to NiFi over S2S. On NiFi create an input port:
Capture the port ID. This will be used in EFM later on:
On EFM, open class test6. This is where we design the flow for all agents with their class is set to test6:
To capture Windows events via MiNiFi, add ConsumeWindowsEventLog processor to the canvas:
Configure the process to pull events. In this example, MiNiFi will listen for Windows Security Events:
To send data from MiNiFi to NiFi, add Remote Process Group to the canvas. Provide a NiFi endpoint:
Connect ConsumeWindowsEventLog processor to the Remote Process Group. Provide the NiFi Input Port ID captured earlier:
Flow is ready to publish:
Click on Publish. MiNiFi will phone home at a set interval (nifi.c2.agent.heartbeat.period). Once that occurs, MiNiFi will receive new run time flow instructions. At that time data will start flowing into NiFi.
The EFM makes it super simple to capture Windows events and universally ship anywhere without the ball and chain of most agent/platform designs.
... View more
Labels:
04-13-2020
07:54 AM
@haris_khan Good observation. Clustering concept in k8s in general requires a fair bit of understanding how the underlying software behaves during these various scenarios. Apache NiFi engineers have built a k8s operator which handles scaling up and down. I believe you may want to seriously look at NiFi stateless or MiNiFi (both on k8s) if rapid scaling up/down is of interest.
... View more
04-07-2020
08:30 PM
1 Kudo
Application deployment has been significantly proliferated by Kubernetes. However, true universal log capture with multi endpoint (downstream) support is lacking. Apache NiFi Stateless provides a possibility to bridge the gap between rapid application deployment and InfoSecs desire to continue to capture and monitor behaviors.
What is NiFi Stateless?
NiFi-Fn is a library for running NiFi flows as stateless functions. It provides delivery guarantees similar to NiFi, without the need for an on-disk repository, by waiting to confirm receipt of incoming data until it has been written to the destination (source NIFI-5922).
Try it out
Prerequisites
K8s (local or cluster). In this demonstration, Azure Kubernetes Service is used.
Some familiarity with K8s & NiFi
Assets Used
NiFi on K8s
https://github.com/sunileman/AKS-YAMLS/blob/master/apache-nifi.yaml
Any instance of NiFi will do here. It does not need to run on K8s.
NiFi Registry on K8s
https://github.com/sunileman/AKS-YAMLS/blob/master/nifi-registry.yml
Any instance of NiFi Registry will do here. It does not need to run on K8s.
Laying the groundwork
NiFi Stateless will pull an existing flow from NiFi Registry. The following is a simple NiFi flow designed in NiFi:
TailFile processor will tail the application log file /var/log/app.txt. The application deployed will write log entries to this file:
The flow is checked into NiFi Registry. NiFi Registry URL, Bucket Identifier & Flow Identifier will be used by NiFi Stateless at run time. More about this soon.
Time to deploy
The flow has been registered into NiFi Registry, therefore the application pod can be deployed. A NiFi Stateless container will be deployed in the same application Pod (sidecar) to capture the log data generated from the application. The application being deployed is simple. It is a dummy application that generates a timestamp log entry every 5 seconds into a log file (/var/log/app.txt). NiFi stateless will tail this file and ship the events. The event can be shipped virtually anywhere due to NiFi’s inherent universal log forward compatibility. (Kafka/Splunk/ElasticSearch/Mongo/Kinesis/EventHub/S3/ADLS/etc). All NiFi processors are in https://nifi.apache.org/docs.html. For this demonstration, the log event will be shipped to a NiFi cluster over Site2Site.
Here is the K8s YAML to deploy the Pod (application with NiFi Stateless sidecar): https://github.com/sunileman/AKS-YAMLS/blob/master/nifi-stateless-sidecar.yml
In that YAML file, NiFi Registry URL, bucketId, and flowId will need to be updated. These values are from the NiFi registry. NiFi Stateless binds itself at runtime to a specific flow to execute.
args: ["RunFromRegistry", "Continuous", "--json", "{\"registryUrl\":\"http://nifiregistry-service\",\"bucketId\":\"71efc3ea-fe1d-4307-97ce-589f78be05fb\",\"flowId\":\"c9092508-4deb-45d2-b6a4-b6a4da71db47\"}"]
To deploy the Pod, run the following:
kubectl apply -f nifi-statless-sidecar.yml
Once the pod is up and running, immediately application log events are captured by NiFi Stateless containers and shipped downstream.
Wrapping Up
FluentD and similar offerings are great for getting started to capture application log data. However, enterprises require much richer connectivity (Universal Log Forward Compatibility) to enable InfoSec to perform their vital role. NiFi Stateless bridges that current gap.
... View more
- Find more articles tagged with:
- K8
- kubernetes
- NiFi
- stateless
Labels:
01-15-2020
11:57 AM
Over the last fews weeks as customers have started to ramp up their usage of CDP cloud assets (like DataHub, Experiences, etc), I have observed many of the ways they are leveraging on-prem engineering assets (code) in the cloud. The concept of write-once-deploy-anywhere is fundamental to a well designed data strategy. It's NOT a sales pitch. It's a reality for enterprises who have invested in a Modern Data Architecture. However, unlike on-prem where storage and services are tightly coupled, CDP flips that concept on its head. We can now launch services independently and choose only the capabilities we need for the task at hand. For example, streaming use cases typically require NiFi, Kafka, and Spark Streaming. Each of those services would be separate DataHub clusters and scale independently. This article focuses on using PySpark to read from a secured Kafka instance. To be clear, this is one way (not the only way) of using PySpark to read from Kafka. Both (DE & SM) clusters are launched via CDP control plane. CDP DataHub assets used in this article Data Engineering Cluster (DE) Streams Messaging (SM) Launching DataHub DE and SM clusters are well documented here. PreWork Secure Kafka and generate certs/truststore https://docs.cloudera.com/runtime/7.0.2/kafka-securing/topics/kafka-secure-tls.html In this article I refer to the truststore as kafka.client.truststore Get a list of all Kafka brokers and ports For this article I will call the brokers k1.cloudera.com:9093, k2.cloudera.com:9093, k3.cloudera.com:9093 Create a kafka.properties file security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/home/sunilemanjee/kafka.client.truststore.jks
ssl.truststore.password=password
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="machine-user" password="password; Create a Kafka jaas file called jaas.conf (this can be named whatever you like) KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="machine-user"
password="password";
}; Create a Kafka topic The easiest way to create a Kafka topic is via SMM (Streamings Messaging Manager) which is shipped with Streams Messaging cluster. Click on the SMM URL within DataHub and the click on "Topics" located on the right menu bar. Click on "Add New" to create a new Kafka topic. Enter the topic name "demo", set partitions to 1, and clean up policy to "delete". The demo topic should now be available. Generate Data For PySpark to consume from a secured instance of Kafka, we need the ability to write to Kafka. Here we will use Kafka console. SSH into one of the broker nodes Update k*.cloudera.com:9093 with your broker list and ports Upload kafka.properties (created early) onto this node Update the location of your kafka.properties file After the below command is executed, you can start to write data (messages) to Kafka. We will come back to this in a moment. kafka-console-producer --broker-list k1.cloudera.com:9093, k2.cloudera.com:9093,k3.cloudera.com:9093 --producer.config /home/c
sunilemanjee/kafka.properties --topic demo Read from Kafka using PySpark SSH into any node within the DE cluster Uploaded jaas.conf and kafka.client.truststore Update the location of jaas.conf and kafka.client.truststore Launch PySpark shell using the following command pyspark --files "/home/csunilemanjee/jaas.conf#jaas.conf,/home/sunilemanjee/kafka.client.truststore.jks#kafka.client.truststore.jks" --driver-java-options "-Djava.security.auth.login.config=/home/sunilemanjee/jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/sunilemanjee/jaas.conf" Once PySpark shell is up, it may be easier to store the Kafka brokers in a variable like this: KAFKA_BROKERS = "k1.cloudera.com:9093,k2.cloudera.com:9093,k3.cloudera.com:9093" Create a structured stream to read from Kafka. Update the following kafka.ssl.truststore.location kafka.ssl.truststore.password username password df_kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", KAFKA_BROKERS).option("subscribe", "demo").option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.mechanism", "PLAIN").option("kafka.ssl.truststore.location", "./kafka.client.truststore.jks").option("kafka.ssl.truststore.password", "password").option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"machine-user\" password=\"password\"serviceName=\"kafka\";").load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").trigger(continuous="1 second").start() To start viewing Kafka message on the console (pyspark shell) from the "demo" topic stream = df_kafka.writeStream.format("console").start()
stream.awaitTermination()
##once you are finished, to kill the stream run this
stream.stop() Go back to your kafka console and start write messaging (anything you like). You will see those messages show up in your PySpark Shell console. That's it. Again this is one (not the only way) to use PySpark to consume from a secured Kafka instance. I see as an emerging pattern in the CDP for streaming use cases. Enjoy.
... View more
- Find more articles tagged with:
- CDP
- Kafka
- public-cloud
- pyspark
- stream-processing
Labels:
11-18-2019
08:29 PM
@Kalyan77 Good question and I haven't tried yet. in the next few weeks I have an engagement which will require me to find out. will keep you posted.
... View more
09-04-2019
11:47 AM
2 Kudos
My Kubernetes series (Part1, Part2) was strictly focused on MiNiFi on K8S. NiFi and MiNiFi may communicate over Site 2 Site; however, often the pattern is to leverage Kafka for a clean message handoff. NiFi within these patterns is generally the central router and transformer of messages. Think of it like "FedEx" for data. Till now most have deployed NiFi on bare metal or VMs. Natural evolution kicks in. Deploy NiFi on k8s and yes it's super simple. In this article I will demonstrate how to deploy both a NiFi and ZooKeeper cluster (none being a single pod!) on Azure Kubernetes Service (AKS); however, all artifacts may be leveraged to deploy on virtually any kubernetes offering. Prerequisites Some knowledge of Kubernetes and NiFi AKS / k8s Cluster kubectl cli NiFi image available in a registry (ie dockerhub) Deployment All the contents in the ymls below can be placed into single file for deployment. For this demonstration, chucking it into smaller components makes it easier to explain. I have loaded a NiFi image into Azure Container Repository. You can use the NiFi image available here in DockerHub. ZooKeeper NiFi uses ZooKeeper for several state management functions. More on that here. ZooKeeper for NiFi can be deployed using embedded or stand alone mode. Here I will deploy 3 pods of ZK on k8s. Deploying ZK on k8s is super simple. kubectl apply -f https://k8s.io/examples/application/zookeeper/zookeeper.yaml After a few minutes 3 pods of ZK will be available for NiFi to use. Once ZK pods become available, proceed to deploy NiFi on k8s kubectl get pods -w -l app = zk NiFi Below is the k8s deployment yml for NiFi (named nifi.yml). Few fields to highlight replicas This will be number of NiFi pods to deploy (Cluster) Image Replace with your image location apiVersion: extensions/v1beta1
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: nifi-service #+
spec: #+
selector: #+
app: nifi #+
ports: #+
- protocol: TCP #+
targetPort: 8080 #+
port: 8080 #+
name: ui #+
- protocol: TCP #+
targetPort: 9088 #+
port: 9088 #+
name: node-protocol-port #+
- protocol: TCP #+
targetPort: 8888 #+
port: 8888 #+
name: s2s #+
type: LoadBalancer #+
---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: nifi
spec:
replicas: 2
selector:
matchLabels:
app: nifi
template:
metadata:
labels:
app: nifi
spec:
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: nifi-container
image: sunmanregistry.azurecr.io/nifi:latest
ports:
- containerPort: 8080
name: http
- containerPort: 22
name: ssh
resources:
requests:
cpu: ".5"
memory: "6Gi"
limits:
cpu: "1"
env:
- name: VERSION
value: "1.9"
- name: NIFI_CLUSTER_IS_NODE
value: "true"
- name: NIFI_CLUSTER_NODE_PROTOCOL_PORT
value: "9088"
- name: NIFI_ELECTION_MAX_CANDIDATES
value: "1"
- name: NIFI_ZK_CONNECT_STRING
value: "zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181" To deploy this NiFi k8s manifest: kubectl apply -f nifi.yml Few minutes later you will have a fully functioning NiFi cluster on Kubernetes I use super simple a lot. I know. Deploying MiNiFi & NiFi on k8s is just that, super simple. Enjoy.
... View more
Labels:
08-27-2019
09:06 PM
2 Kudos
Part 2 of Autoscaling MiNiFi on K8S is focused on deploying the artifacts on AKS - Amazon Kubernetes Service. My knew jerk reaction was all Kubernetes as a Service would play well but that is definitely not the case. Hence why GCP Anthos product direction for this space is a key. The net-net of my observation is k8s app deployment on any single cloud vendor would cause deployment complexities any other k8s deployment, cloud or OnPrem.. Vendor lock in theory is ALIVE and WELL. In Azure I leveraged ACS for EFM and NiFi Registry; however, the natural evolution was to deploy EFM and NiFi Registry (NR) on K8S. EFM, NR, and MiNiFi are integrated components (refer to part 1 on architecture). I will leverage several key out of the box k8s components to make this all work together. The good news is, the deployment is super simple! Prerequisites Some knowledge of Kubernetes and AKS AKS Cluster kubectl cli eksctl cli VPC 2 public subnets within a VPC NR, EFM, and MiNiFi images uploaded to and available ECS Refer to part 1 on image locations Create a AKS Cluster eksctl Makes this simple. I tried using aws eks and it was painful. eksctl create cluster \
--name sunman-k8s \
--version 1.13 \
--nodegroup-name standard-workers \
--node-type t3.medium \
--nodes 3 \
--nodes-min 1 \
--nodes-max 4 \
--vpc-public-subnets=subnet-067d0ffbc09152382,subnet-037d8c6750c5de236 \
--node-ami auto Deployment All the contents in the ymls below can be placed into single file for deployment. For this demonstration, chucking it into smaller components makes it easier to explain. NiFi Registry (NR) Edge Flow Manager has a dependency on NR. Flow versions are stored in NR. Here is nifiregistry.yml. apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: nifiregistry
spec:
replicas: 1
selector:
matchLabels:
app: nifiregistry
template:
metadata:
labels:
app: nifiregistry
spec:
containers:
- name: nifiregistry-container
image: your-image-location/nifiregistry
ports:
- containerPort: 18080
name: http
- containerPort: 22
name: ssh
resources:
requests:
cpu: ".5"
memory: "2Gi"
limits:
cpu: "1"
env:
- name: VERSION
value: "11"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: nifiregistry-service #+
spec: #+
selector: #+
app: nifiregistry #+
ports: #+
- protocol: TCP #+
targetPort: 18080 #+
port: 80 #+
name: http #+
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
type: LoadBalancer #+
loadBalancerSourceRanges:
- 0.0.0.0/0 Update the following line in nifiregistry.yml with the location of your NR image. image: your-image-location/nifiregistry Also take note the load balancer for NR is open to the world. You may want to lock this down. Deploy NR on k8s kubectl apply -f nifiregistry.yml Edge Flow Manager (EFM) Next deploy EFM on k8s. Here is efm.yml apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: efm
spec:
replicas: 1
selector:
matchLabels:
app: efm
template:
metadata:
labels:
app: efm
spec:
containers:
- name: efm-container
image: your-image-location/efm
ports:
- containerPort: 10080
name: http
- containerPort: 22
name: ssh
resources:
requests:
cpu: ".5"
memory: "2Gi"
limits:
cpu: "1"
env:
- name: VERSION
value: "11"
- name: NIFI_REGISTRY_ENABLED
value: "true"
- name: NIFI_REGISTRY_BUCKETNAME
value: "testbucket"
- name: NIFI_REGISTRY
value: "<a href="<a href="<a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>>>>" target="_blank"><a href="<a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>>" target="_blank"><a href="<a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a>" target="_blank"><a href="http://nifiregistry-service.default.svc.cluster.local</a</a</a</a" target="_blank">http://nifiregistry-service.default.svc.cluster.local</a</a</a</a</a>>>>>"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: efm-service #+
spec: #+
selector: #+
app: efm #+
ports: #+
- protocol: TCP #+
targetPort: 10080 #+
port: 80 #+
name: http #+
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
type: LoadBalancer #+
loadBalancerSourceRanges:
- 0.0.0.0/0 Update the following line in efm..yml with the location of your EFM image. image: your-image-location/efm Also take note the load balancer for EFM is open to the world. You may want to lock this down. Deploy EFM on k8s kubectl apply -f efm.yml MiNiFi Lastly, deploy MiNiF on k8s. Here is minifi.yml apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: minifi
spec:
replicas: 1
selector:
matchLabels:
app: minifi
template:
metadata:
labels:
app: minifi
spec:
containers:
- name: minifi-container
image: your-image-location/minifi-azure-aws
ports:
- containerPort: 10080
name: http
- containerPort: 6065
name: listenhttp
- containerPort: 22
name: ssh
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1"
env:
- name: NIFI_C2_ENABLE
value: "true"
- name: MINIFI_AGENT_CLASS
value: "listenSysLog"
- name: NIFI_C2_REST_URL
value: "<a href="<a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/heartbeat</a</a</a</a>>>>"
- name: NIFI_C2_REST_URL_ACK
value: "<a href="<a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a>>>" target="_blank"><a href="<a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a>>" target="_blank"><a href="<a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a>" target="_blank"><a href="http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a" target="_blank">http://efm-service.default.svc.cluster.local/efm/api/c2-protocol/acknowledge</a</a</a</a>>>>"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: minifi-service #+
spec: #+
selector: #+
app: minifi #+
ports: #+
- protocol: TCP #+
targetPort: 10080 #+
port: 10080 #+
name: http #+
- protocol: TCP #+
targetPort: 9877 #+
port: 9877 #+
name: tcpsyslog
- protocol: TCP #+
targetPort: 9878 #+
port: 9878 #+
name: udpsyslog
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
- protocol: TCP #+
targetPort: 6065 #+
port: 6065 #+
name: listenhttp #+
type: LoadBalancer #+
loadBalancerSourceRanges:
- 0.0.0.0/0 Update the following line in minifi..yml with the location of your MiNiFI image. image: your-image-location/minifi-azure-aws Also take note the load balancer for MiNiFi is open to the world. You may want to lock this down. Deploy MiNiFi on k8s kubectl apply -f minifi.yml Thats it! Part 1 of this series demonstrated how to autoscale MiNiFi and those same k8s commands can be used here to scale this out properly. The next part of this series will add the concept of k8s stateful sets and their impact EFM/NR/MiNiFi for a resilient backend persistence layer.
... View more
08-14-2019
06:45 PM
3 Kudos
MiNiFi (Java Version) is essentially NiFi with a few differences and hence why it runs so darn well on containers/Kubernetes. The use case is to have a single management console (Edge Flow Manager) to manage 0 + many MiNiFi agents which require autoscaling on Kubernetes based on some arbitrary metrics...for example CPU/RAM threshold. EFM and NiFi Registry are required but don't need autoscaling; therefore, these services will be deployed on Azure Container Service. MiNiFi on the other hand often benefits from autoscaling and hence it will be deployed on Azure Kubernetes Service. Required for this demonstration Azure subscription Container Registry Demo will leverage Azure Container Registry Kubernetes Service Demo will leverage Azure Kubernetes Service Azure CLI The following images need to be stored in Azure Container Registry Edge Flow Manager https://github.com/sunileman/efm1.0.0.0-docker NiFi Registry https://github.com/sunileman/NiFi-Registry-Service MiNiFi (Java) https://github.com/sunileman/CEM1.0-Java-MiNiFi This image will come precooked with Azure/AWS NARs Architecture This is a 10k foot view of the architecture. EFM communicates with MiNiFi agents about the work they need to do. EFM also communicates with NiFi Registry to store/version control flows will get passed to the MiNiFi agents. Deploy NiFi Registry and EFM on Azure Container Service Since EFM and Registry don't really benefit from autoscaling, they both are great fit for Azure container service (Mostly Static installs). ACS will guarantee EFM and NiFi registry are alway up with 1 container instance each. EFM, MiNiFi, and Registry have all been imported into my container registry on azure. Create NiFi Registry on ACS NiFi Registry variables to note --name Name of the nifi registry container --dns-name-label Prefix for the dns on the registry service. This will be used as an input into EFM container environment variable az container create --resource-group sunmanCentralRG --name mynifiregistry --image sunmanregistry.azurecr.io/nifiregistry:latest --dns-name-label nifiregistry --ports 18080 --registry-username ****** --registry-password ****** Create EFM on ACS EFM variables to note --NIFI_REGISTRY should match NiFi registry Container DNS (fully qualified server name) --dns-name-label DNS prefix az container create --resource-group sunmanCentralRG --name efm --image sunmanregistry.azurecr.io/efm:latest --dns-name-label myefm --ports 10080 --registry-username ***** --registry-password **** --environment-variables 'NIFI_REGISTRY_ENABLED'='true' 'NIFI_REGISTRY_BUCKETNAME'='testbucket' 'NIFI_REGISTRY'='http://mynifiregistry.centralus.azurecontainer.io:18080' Create a 'testbucket' on NiFi Registry MiNiFi flows will be designed using EFM and stored in the NiFi Registry bucket 'testbucket'. This bucket name was identified as a variable during EFM container was creation. 'NIFI_REGISTRY_BUCKETNAME'='testbucket' NiFi registry will be available under YourNiFiRegistryDSN:18080/nifi-registry/ . For example http://mynifiregistry=y.centralus.azurecontainer.io:18080/nifi-registry/ Click on "NEW BUCKET", Enter bucket name - testbucket Click create Validate EFM is up EFM UI will be available under http://YourEfmDnsPrefix.centralus.azurecontainer.io:10080/efm/ui for example http://myefm.centralus.azurecontainer.io:10080/efm/ui Run MiNiFi Kubernetes Deployment The easiest way to run a deployment in k8s is to build a manifest file. To learn more about k8s manifest files here. Look for < > in the manifest below, as these are the variables a change prior to your deployment (only a few, super simple). Variable to Note MINIFI_AGENT_CLASS This will be the agent class published to EFM. To learn more about EFM, go here Kubernet Manifest File: apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: minifi
spec:
replicas: 1
selector:
matchLabels:
app: minifi
template:
metadata:
labels:
app: minifi
spec:
containers:
- name: minifi-container
image: <Your Containe Registry>/minifi-azure-aws:latest
ports:
- containerPort: 10080
name: http
- containerPort: 6065
name: listenhttp
- containerPort: 22
name: ssh
resources:
requests:
cpu: ".05"
memory: "1Gi"
limits:
cpu: "1"
env:
- name: NIFI_C2_ENABLE
value: "true"
- name: MINIFI_AGENT_CLASS
value: "test"
- name: NIFI_C2_REST_URL
value: "http://<Your EFM servername>.centralus.azurecontainer.io:10080/efm/api/c2-protocol/heartbeat"
- name: NIFI_C2_REST_URL_ACK
value: "http://<Your EFM servername>.centralus.azurecontainer.io:10080/efm/api/c2-protocol/acknowledge"
---
kind: Service #+
apiVersion: v1 #+
metadata: #+
name: minifi-service #+
spec: #+
selector: #+
app: minifi #+
ports: #+
- protocol: TCP #+
targetPort: 10080 #+
port: 10080 #+
name: http #+
- protocol: TCP #+
targetPort: 22 #+
port: 22 #+
name: ssh #+
- protocol: TCP #+
targetPort: 6065 #+
port: 6065 #+
name: listenhttp #+
type: LoadBalancer #+ Once the manifest file has been updated, store it as minifi.yml (this can be any name). Deploy on k8s using kubectl apply -f minifi.yml output sunile.manjee@hwx:~/Documents/GitHub/AKS-YAMLS(master⚡) » kubectl apply -f minifi.yml
deployment.extensions/minifi created
service/minifi-service created
sunile.manjee@hwx:~/Documents/GitHub/AKS-YAMLS(master⚡) » MiNiFi has been successfully deployed. To verify successful deployment visit EFM. EFM should show the agent class name 'test' matching the class name used in the minifi k8s manifest file. Open the class and design any flow. Here I simply used GenerateFlowFile and terminated success relationship with 3 concurrent threads Click on publish and soon thereafter MiNiFi will be executing the flow. AutoScale MiNiFi At this time a single MiNiFi container/agent is executing flows. I purposefully set MiNiFi CPU allocation (manifest file) to a small number to force the autoscaling. First lets check the number of minifi pods running on k8s Single MiNiFi pod. Lets check if autoscaling is enabled for this deployment To enable autoscaling on k8s: kubectl autoscale deployment minifi --cpu-percent=25 --min=1 --max=3 minifi is the deployment name. If CPU utilization exceeds 25%, the autoscaler increases the pods up to a maximum of 3 instances. A minimum of 1 instances is then defined for the deployment Verify autoscaling is enabled on the minifi deployment Number of minifi pods after autoscaling was enabled (3). Kubernetes added 2 additional MiNiFi pods. Lets kill one of the pods and see what happens Kubernetes immediately launched a new MiNiFi container after a MiNiFi pod was killed. Enjoy AutoScale on MiNiFi!
... View more
- Find more articles tagged with:
- cloud
- efm
- How-ToTutorial
- kubernetes
- minifi
- solutions
Labels:
06-21-2019
08:02 PM
1 Kudo
Building an Apache NiFi processor is super easy. I have seen/read several articles on how to get started by executing maven commands via CLI. This article is geared towards individuals who like to use an IDE (specially IntelliJ) to do the imports instead of running via CLI. On IntelliJ click on create project, Check "Create from archetype", click on "ADD Archetype" and enter the following GroupId: org.apache.nifi ArtifactId: nifi-processor-bundle-archetype Version: <YourVersionOfNifi> and then click "OK" Now your new NiFi Archetype has been created. Select it Enter GroupId, ArtifactId, and Version of your choice A final attribute we need to add is artifactBaseName. This is mandatory. Click on "+" and enter Name: artifactBaseName Value: whatEverYouLike Now a project ready to build a custom processor. Enjoy!
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- FAQ
- IDE
- intellij
- NiFi
- nifi-processor
Labels:
12-11-2018
09:47 PM
6 Kudos
I came across an article on how to setup NiFi to write into ADLS which required cobbling together various integration pieces and launching HDI. Since then there have been many updates in NiFi enabling a much easier integration. Combined with CloudBreak's rapid deployment of a HDF clusters provides an incredible ease of user experience. ADLS is Azure's native cloud storage (Look and feel of HDFS) and the capability to read/write via NiFi is key. This article will demonstrate how use use a CloudBreak Recipe to rapidly deploy a HDF NiFI "ADLS Enabled" cluster. Assumptions A CloudBreak instance is available Azure Credentials available Moderate familiarity with Azure Using HDF 3.2+ From Azure you will need: ADLS url Application ID Application Password Directory ID NiFi requires ADLS jars, core-site.xml, and hdfs-site.xml. The recipe I built will fetch these resources for you. Simply download the recipe/script from: https://s3-us-west-2.amazonaws.com/sunileman1/scripts/setAdlsEnv.sh Open it and scroll all the way to the bottom Update the following: Your_ADLS_URL: with your adls url
Your_APP_ID: with your application ID
Your_APP_Password: with your application password
Your_Directory_ID: with your directory id Once the updates are completed, simply add the script under CloudBreak Recipes. Make sure to select "post-cluster-install" Begin provisioning a HDF cluster via CloudBreak. Once the Recipes page is shown, add the recipe to run on the NiFi nodes. Once cluster is up use the PutHDFS processor to write to ADLS. Configure PutHDFS Properties Hadoop Configuration Resources: /home/nifi/sities/core-site.xml,/home/nifi/sites/hdfs-sites.xml
Additional Classpath Resources: /home/nifi/adlsjars
Directory: / The above resources are all available on each node due to the recipe. All you have to do is call the location of the resources in the PutHDFS processor. That's it! Enjoy
... View more
- Find more articles tagged with:
- adls
- azure
- Cloud & Operations
- Cloudbreak
- How-ToTutorial
- NiFi
Labels:
12-11-2018
09:43 PM
I came across an article on how to setup NiFi to write into ADLS which required users to cobble together various integration pieces and launching HDI. Since then there have been many updates in NiFi enabling a much easier integration. Combine with CloudBreak's rapid deployment of a HDF cluster provides incredible ease of use. ADLS is native cloud storage provided by Azure (Look and feel of HDFS) and the capabilities to read/write via NiFi is key. This article will demonstrate how use use CloudBreak to rapidly deploy a HDF NiFI "ADLS Enabled" cluster.
... View more
- Find more articles tagged with:
- adls
- azure
- Cloud & Operations
- Cloudbreak
- NiFi
Labels:
11-15-2018
08:05 PM
9 Kudos
Objective of this article is to demonstrate how to rapidly deploy a demo Druid & LLAP cluster preloaded with 20 years (nearly 113 million records) of airline data ready for analytics using CloudBreak on any IaaS. Entire deployment is UI driven without the need for a large overhead of administration. All artifacts mentioned in this article are publicly available for reuse to try on your own Prolegomenon Time series is an incredible capability highly leveraged within the IoT space. Current solution sets offer non scalable & expensive or distributed processing engines lacking low latency OLAP speeds. Druid is an OLAP time series engine backed by a Lambda architecture. Druid out of the box SQL capabilities are severely limited and without join support. Layering HiveQL over Druid brings the best of both worlds. Hive 3 also offers HiveQL over Kafka essentially making Hive a true SQL federation engine. With Druid’s native integration with Kafka, streaming data from Kafka directly into Druid while executing real time SQL queries via HiveQL offers a comprehensive Time Series solution for the IoT space. On with the demo.... To begin the demonstration, launch a CloudBreak deployer instance on any IaaS or on prem VM. Quick start makes this super simple. Launching CloudBreak deployer is well documented here. Once the CloudBreak deployer is up, add your Azure, AWS, GCP, or OpenStack credentials within the CloudBreak UI. This will allow deployment of the same cluster on any IaaS. Druid Blue Print To launch a Druid/LLAP cluster, an Ambari blue print will be required. Click On Blueprints Click on CREATE BLUEPRINT Name the blue print and enter the following url to import it into CloudBreak https://s3-us-west-2.amazonaws.com/sunileman1/ambari-blueprints/hdp3/druid+llap+Ambari+Blueprint Recipes
Druid Requires a MetaStore. Import the recipe to create the MetaStore into CloudBreak Under Cluster Extensions, click on Recipes
Enter a name for the recipe and select "pre-ambari-start" to run this recipe prior to Ambari starting Under URL enter the following to import the recipe into CloudBreak https://s3-us-west-2.amazonaws.com/sunileman1/scripts/druid+metastore+install.sh This cluster will come preloaded with 20 years of airline data Enter a name for recipe and select "post-cluster-install to run this recipe once HPD services are up Under URL enter the following to import the recipe into CloudBreak https://s3-us-west-2.amazonaws.com/sunileman1/scripts/airline-data.sh
Create a Cluster
Now that all recipes are in place, next step is to create a cluster Select a IaaS to deploy on (credential) Enter Cluster Name Select HDP 3.0 Select cluster type: Druid LLAP HPD 3
This is the new blue print which was imported in previous steps
Select an image. Base image will do for most deployments
Select instance types
Note - I used 64 GB of ram per node. Additionally, I added 3 compute nodes.
Select network to deploy the cluster on. If one is not pre-created, CloudBreak will create one
CloudBreak can be configured to use S3, ADLS, WASB, GCS
Configuring CloudBreak for S3 here Configuring CloudBreak for ADLS here Configuring CloudBreak for WASB here Configuring CloudBreak for GCS On the Worker node attach get-airline-data recipe On the Druid Broker node attach druid-metastore-install External MetaStores (databases) can be bootstrapped to the cluster. This demo does not require it.
Knox will not be used for this demo
Attach a security group to each host group. If a SG is not pre-created, CloudBreak will create one
Lastly, provide an Ambari password and ssh key
Cluster deployment and provision will begun. Within the next few minutes a cluster will be ready and an Ambari URL will be available.
Zeppelin NoteBook can be imported using the url below. https://s3-us-west-2.amazonaws.com/sunileman1/Zeppelin-NoteBooks/airline_druid.json Here I demonstrated how to rapidly launch a Druid/LLAP cluster preloaded with airline data using CloudBreak. Enjoy Druid, it's crazy fast. HiveQL makes Druid easy to work with. CloudBreak makes the deployment super quick.
... View more
- Find more articles tagged with:
- Cloud & Operations
- Cloudbreak
- druid
- How-ToTutorial
- iaas
- llap
Labels: