Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Master Guru

 

2021-01-20_10-21-26.jpg

Credits to @mbawa (Mandeep Singh Bawa) who co-built all the assets in this article. Thank you!

 

We (Mandeep and I) engaged on a customer use case where Cloudera Data Engineering (Spark) jobs were triggered once a file lands in S3 (details on how to trigger CDE from Lambda here). Triggering CDE jobs is quite simple; however, we needed much more. Here are a few of the requirements:

  • Decoupling Ingestion Layer / Processing Layer
  • Decoupling apps (sender) from Spark
    • Apps can send and forget payloads without the burden of configuring Spark (#number of executors, memory/cpu, etc), the concern of Spark availability (Upgrades, resources availability, etc), or application impacts from CDE API changes
  • Real-time changes to where CDE jobs are sent (Multi CDE)
  • Monitor job status and alerts
  • Monitoring job run times and alerts which may be out-of-spec runtimes
  • Failover to Secondary CDE
  • Throttling
  • Authentication

2021-01-20_10-32-51.jpg

It may look as though we are trying to make NiFi into an orchestration engine for CDE. That's not the case.  Here we are trying to fill some core objectives and leveraging capabilities within the platform to accomplish the above-stated task. CDE comes with Apache Airflow, a much richer orchestration engine. Here we are integrating AWS triggers, multiple CDE clusters, monitoring, alerting, and single API for multi clusters. 

Artifacts

High-Level WorkFlow

At a high level, the NiFi workflow does the following:

  • Exposes a single rest endpoint for CDE job submission
  • CDE workload balancing between multiple CDE clusters
    • If only a single CDE cluster is available, it will queue jobs until compute bandwidth is available
  • Queue jobs if CDE clusters are too busy
    • Jobs will re-run if set in the queue
    • If the number of retry for a job spec is greater than 3 (parameterized), an alert will be triggered
  • Monitor jobs from start to finish
  • Alert if job
    • Fails
    • Run time out of predetermined max run time 
      • i.e. jobs run for 10 minutes and max run time for jobs is set to 5 minutes

Setup

The following NiFi parameters will be required

  • api_token (CDE Token, more on this later)
    • Set to ${cdeToken}
  • job-runtime-threshold-ms
    • Max run time a job should run before an alert is triggered
  • kbrokers
    • Kafka brokers
  • ktopic-fail
    • Kafka topic: cde-job-failures
  • ktopic-inbound-jobs
    • Kafka topic: cde-jobs
  • ktopic-job-monitoring
    • Kafka topic: cde-job-monitoring
  • ktopic-job-runtime-over-limit
    • Kafka topic: cde-job-runtime-alert
  • ktopic-retry
    • Kafka topic: cde-retry
  • username
    • CDE Machine user
  • password
    • CDE machine user password
  • primary-vc-token-api
    • CDE token api (more on this later)
  • primary_vc_jobs_api
    • CDE Primary cluster jobs api (more on this later)
  • secondary-vc-available
    • Y/N
    • If secondary CDE cluster is available, set to Y, else N
  • secondary_vc_jobs_api
    • CDE secondary cluster jobs API if the secondary cluster is available
  • run_count_limit
    • Max number of concurrent running jobs per CDE cluster
    • i.e. 20
  • wait-count-max
    • Max retry count. If a job is unable to be submitted to CDE (ie due to be too busy), how many times should NiFi retry before adding job to Kafka ktopic-fail topic
    • i.e. 5
  • start_count_limit
    • Max number of concurrent starting jobs per CDE cluster
    • i.e. 20

Note: When you run the workflow for the first time, generally the Kafka topics will be automatically created for you.

Detailed WorkFlow

Once a CDE job spec is sent to NiFi, NiFi does the following:

  1. Write job spec to Kafka ktopic-inbound-jobs (nifi parameter) topic
  2. Pull jobs from Kafka ktopic-inbound-jobs (nifi parameter) topic
    1. New jobs- ktopic-inbound-jobs (nifi parameter) topic
    2. retry jobs- ktopic-retry (nifi parameter) topic 
    3. Monitoring jobs- ktopic-job-monitoring (nifi parameter) topic 
  3. Fetch CDE API tokens
  4. Check if the primary cluster current run count is less than run_count_limit (nifi parameter)
  5. Check if the primary cluster current starting count is less than start_count_limit (nifi parameter)
    1. If run or start counts are not within limit, retry the same logic on the secondary cluster (if available, secondary-vc-available)
    2. If run/start counts are within limit, job spec will be submitted to CDE
    3. If run/start counts are not within limit for primary and secondary CDE and the number of retries is less than wait-count-max (nifi parameter), job spec will be written to a Kafka ktopic-retry topic (nifi parameter)
  6. Monitoring
    1. NiFi will call CDE to determine the current status of Job ID (pulled from ktopic-job-monitoring)
    2. If the job end is successful, nothing more will happen here.
    3. If the job ends with failure, job spec will be written to Kafka ktopic-fail topic
    4. If the job is running and run time is less than job-runtime-threshold-ms
      1. Write job spec to ktopic-job-monitoring
      2. Else send an alert (nifi parameter)

CDE APIs

To get started, CDE primary and secondary (if available) cluster API details are needed in NiFi as parameters:

  1. To fetch the token API, click the pencil icon:
    2021-01-20_10-56-19.jpg
  2. Click on Grafana URL:
    2021-01-20_10-56-41.jpg
    The URL will look something like this:

 

 

https://service.cde-zzzzzz.moad-aw.aaaaa-aaaa.cloudera.site/grafana/d/sK1XDusZz/kubernetes?orgId=1&refresh=5s

 

 

  • Set the NiFi parameter primary-vc-token-api to the first part of the URL:

 

 

service.cde-zzzzzz.moad-aw.aaaaa-aaaa.cloudera.site

 

 

Now get the Jobs API for both primary and secondary (if available). For a virtual cluster,

  1. Click the pencil icon
    2021-01-20_11-48-25.jpg
  2. Click Jobs API URL to copy the URL
    2021-01-20_11-48-40.jpg
    The jobs URL will look something like this:

 

 

https://aaa.cde-aaa.moad-aw.aaa-aaa.cloudera.site/dex/api/v1

 

 

  • Fetch the first part of the URL and set the NiFi parameter primary_vc_jobs_api. Do the same steps for secondary_vc_jobs_api

 

 

aaa.cde-aaa.moad-aw.aaa-aaa.cloudera.site

 

 

Run a CDE job

Inside of the NiFi workflow, there is a test flow to verify the NiFi CDE jobs pipeline works:

2021-01-20_11-58-29.jpg

To run the flow, inside of InvokeHTTP, set the URL to one of the NiFi nodes. Run it and if the integration is working successfully; you will see a job running in CDE.

Enjoy! Oh, by the way, I plan on publishing a video walking through the NiFi flow.

1,771 Views