Member since 
    
	
		
		
		05-30-2018
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                1322
            
            
                Posts
            
        
                715
            
            
                Kudos Received
            
        
                148
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 4006 | 08-20-2018 08:26 PM | |
| 1882 | 08-15-2018 01:59 PM | |
| 2339 | 08-13-2018 02:20 PM | |
| 4060 | 07-23-2018 04:37 PM | |
| 4955 | 07-19-2018 12:52 PM | 
			
    
	
		
		
		05-05-2022
	
		
		07:59 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							      Cyber/Log data ingestion is super simple using CDP NiFi; however, it alone does not complete the full picture InfoSec & Threat hunters are interested in. Often these uber pros are looking for breadcrumbs/clues to shorten their "hunting" exercise. CDP DataViz immediately provides rich visualizations on high fidelity cyber/log data to shorten mean time to vulnerability detection. Additionally, I've observed these teams often need the ability to run ad hoc analysis, enrichment, and/or correlation on stream using an intuitive interface;  not to mention, without writing code. Ah yes, SQL on Stream.    The entire demo looks less than a few hours to build; proving in cyber, we don't always have to engineer (i.e. Grok) our way through difficult data challenges.       Demo:            
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		01-28-2022
	
		
		03:11 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 
 Running SQL on stream is super simple with SQL Stream Builder, via Flink SQL. I recorded a live demo and this article is to provide supporting artifacts to run it. 
   
 Demo 
 Flink SSB Credit Card Fraud Demo 
 Prerequisite 
 Quick setup 
 Note 
 If you experience any challenges with running SSB, as a sanity check run the following on the cluster 
 
   
   
 flink-yarn-session -d \ 
-D security.kerberos.login.keytab=<keytab_filename>.keytab \ 
-D security.kerberos.login.principal=<workload_username> 
   
   
 If the above works as a sanity check, kill the yarn job. 
   
 yarn application -appStates RUNNING -list
That will display all running yarn apps. Your app will show up in this list.  Then run this to kill the app
yarn application -kill application_xxxx 
   
 customer_inferences table 
   
 CREATE TEMPORARY TABLE customer_inferences (
  `customer_id` INT,
  `account_number` STRING,
  `center_inferred_lat` FLOAT,
  `center_inferred_lon` FLOAT,
  `max_inferred_distance` STRING,
  `max_inferred_amount` FLOAT
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '5',
  'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.account_number.expression' = '#{IdNumber.valid}',
  'fields.center_inferred_lat.expression' = '#{Address.latitude}',
  'fields.center_inferred_lon.expression' = '#{Address.longitude}',
  'fields.max_inferred_distance.expression' = '#{number.numberBetween ''6000'',''11000''}',
  'fields.max_inferred_amount.expression' = '#{number.numberBetween ''8000'',''10000''}'
); 
   
 cc_charges table 
   
 CREATE TEMPORARY TABLE cc_charges (
  `customer_id` INT,
  `lat` FLOAT,
  `lon` FLOAT,
  `location` STRING,
  `charge_amount` FLOAT
)
WITH (
  'connector' = 'faker',
  'rows-per-second' = '5',
  'fields.customer_id.expression' = '#{number.numberBetween ''0'',''1000''}',
  'fields.lat.expression' = '#{Address.latitude}',
  'fields.lon.expression' = '#{Address.longitude}',
  'fields.location.expression' = '#{Friends.location}',
  'fields.charge_amount.expression' = '#{number.numberBetween ''1000'',''10000''}'
); 
   
 SQL to detect fraud 
   
 select ci.account_number, cc.charge_amount,
  2 * 3961 * asin(sqrt(
                    power(
                      power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2) 
                      + cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat)) 
                      * (sin(radians((cc.lon - ci.center_inferred_lon) / 2))) 
                      , 2))) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE 
  2 * 3961 * asin(sqrt(
                    power(
                      power((sin(radians((cc.lat - ci.center_inferred_lat) / 2))) , 2) 
                      + cos(radians(ci.center_inferred_lat)) * cos(radians(cc.lat)) 
                      * (sin(radians((cc.lon - ci.center_inferred_lon) / 2))) 
                      , 2))) > ci.max_inferred_distance
                      OR cc.charge_amount > ci.max_inferred_amount 
   
 DISTANCE_BETWEEN function 
   
 function DISTANCE_BETWEEN(lat1, lon1, lat2, lon2) {
  function toRad(x) {
    return x * Math.PI / 180;
  }
  lat1 = parseFloat(lat1);
  lon1 = parseFloat(lon1);
  lat2 = parseFloat(lat2);
  lon2 = parseFloat(lon2);
  
  var R = 6371; // km
  
  var x1 = lat2 - lat1;
  var dLat = toRad(x1);
  var x2 = lon2 - lon1;
  var dLon = toRad(x2)
  var a = Math.sin(dLat / 2) * Math.sin(dLat / 2) +
    Math.cos(toRad(lat1)) * Math.cos(toRad(lat2)) *
    Math.sin(dLon / 2) * Math.sin(dLon / 2); 
  var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
  var d = R * c;
  
  // convert to miles
  return (d / 1.60934);
}
DISTANCE_BETWEEN($p0, $p1, $p2, $p3) 
   
 SQL to detect fraud using DISTANCE_BETWEEN function 
   
 select ci.account_number, cc.charge_amount, DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, ci.center_inferred_lon) as distance, ci.max_inferred_distance, ci.max_inferred_amount
from cc_charges cc
join customer_inferences ci on cc.customer_id = ci.customer_id
WHERE DISTANCE_BETWEEN(cc.lat, cc.lon, ci.center_inferred_lat, center_inferred_lon) > ci.max_inferred_distance 
OR cc.charge_amount > ci.max_inferred_amount 
   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		01-27-2022
	
		
		04:21 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							  A short how-to article on how to list CDP Data hub Kafka topics via command line:  Prerequisites   CDP DataHub Kafka cluster  Workload username & password  CDP PC trust store   Steps   Fetch CDP cert and store in a trust store  Creating TLS truststore  or simple fetch it from any node            /var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_truststore.jks         ssh into any Data hub Kafka node using CDP workload username/password  Copy trust store onto the node  Set broker host            BROKER_HOST=$(hostname -f)            Create jaas.conf            KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="<workload username>"
password="<workload password>";
};            Set KAFKA_OPTS environment variable to point to location where jaas.conf is located            export KAFKA_OPTS=-Djava.security.auth.login.config=<LOCATION-OF-YOUR-JAASCONF>/jaas.conf            Create client-kerberos.properties            security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.kerberos.service.name=kafka
ssl.truststore.location=/<LOCATION-OF-YOUR-TRUSTSTORE>/truststore.jks
org.apache.kafka.common.security.plain.PlainLoginModule required username=<WORKLOAD-USER-NAME> password=<WORKLOAD-PASSWORD>            Run Kafka-topics            /opt/cloudera/parcels/CDH/bin/kafka-topics --list --bootstrap-server $BROKER_HOST:9093 --command-config <LOCATION-OF-YOUR-CLIENT-KERB-PROP-FILE>/client-kerberos.properties                                        
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		01-25-2022
	
		
		01:30 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 
 I often encounter the following question - "How to connect a Java application to CDP Kafka?". The good news is, it's simple. 
   
 Public docs: Connecting Kafka clients to Data Hub provisioned clusters 
   
 CDP Kafka, by default, is secure and governed. That means CDP Kafka uses TLS and CDP workload users for security and authorization. CDP Kafka supports all Apache Kafka authentication capabilities. Again, by default, CDP Kafka is automatically secured <SASL_SSL> and governed by Apache Ranger.   
 Prerequisites 
 CDP Public Cloud with a Kafka DataHub instance. 
 Connectivity requirements 
 To connect to CDP Kafka, the following is required: 
 
 CDP TrustStore 
 CDP Workload Username/Password 
 Kafka Broker Name 
 JaaS File 
 
 Java Example 
 
   
  Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093,<KAFKA-BROKER>:9093");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "connectivity-testing");
        properties.put("sasl.kerberos.service.name", "kafka");
        properties.put("ssl.truststore.location", "/home/sunilemanjee/truststore.jks");
        properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CDP-WORKLOAD-USER>\" password=\"<CDP-WORKLOAD-PASSWORD>\";"); 
   
 Full code here. 
 TrustStore  
 Fetch the TrustStore for CDP is well documented here.  
 CDP Workload Username/Password 
 Set and fetch CDP workload username/password 
    
 Kafka Broker Names 
 Fetch broker names from the CDP management console under DataHub, Hardware Tab 
    
 JaaS File 
   
 KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="cdp-workload-user"
password="cdp-workload-password;
}; 
   
 That's it. Enjoy 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-16-2021
	
		
		11:26 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 
 My previous article demonstrated how to use the CDE-managed Airflow instance to read files from CML. To close the loop, here is a short demonstration on how to write files to CML using CDE's managed Airflow instance. 
   
 Artifacts 
 
 Airflow dag 
 Demo     
 
   
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-26-2021
	
		
		11:19 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							         Short demo on how to use CDE's new Airflow UI to design pipelines.  Additionally leveraging Airflow triggers to orchestrate a typical job pipeline forking logic (triggers).  A "Typical job pipeline" often encapsulates logic regarding success and failure paths.     Artifacts    Demo Video  https://youtu.be/G-53Uf4qrFw    Git Repo  https://github.com/sunileman/cde-airflow-branch-operator           
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-20-2021
	
		
		09:14 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		3 Kudos
		
	
				
		
	
		
					
							              Apache Airflow and Spark running on K8s provide unreasonable scalability.  Cloudera Machine Learning also runs on K8s and often the ML users leverage Apache Airflow to orchestrate their pipelines. Apache Airflow within Cloudera Data Engineering now has the capability to orchestrate CML jobs via the new Airflow CML Operator.  This time instead of writing up a detailed article, I recorded a live demo.     Assets used in the demo   CML Operator  https://github.com/curtishoward/cml_operator/    Airflow Dag  https://github.com/sunileman/cde-airflow-branch-operator    Demo  https://www.youtube.com/watch?v=n9o_Qimnggo     Enjoy! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		09-15-2021
	
		
		07:08 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Yes the above comment on the functionality not working any longer is accurate. NiFi has gone through many enhancements and therefore not reflected in this article.  I do not plan on updating the article due to the following:     The objective of this article was to get nifi running in K8s because during that time NiFi on K8s offering did not exist within Hortonworks HDF. Recently NiFi on K8s has been GA'd.  It's call DataFlow experience.  Docs here:https://docs.cloudera.com/dataflow/cloud/index.html. If you want NiFi on K8s, that is your new playground.   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		08-30-2021
	
		
		01:20 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							    
 File processing + autoscaling seems to have an antinomic relationship. It doesn't have to be..really.  Autoscaling often drives inefficient behaviors..why? The concomitant response... "It Autoscales". Autoscaling still requires sound design principles. Without that, It won't autoscale well. 
   
 Autoscaling within a distributed framework requires proper data dichotomization along with processing/service layer decoupling 
 Anti-Pattern 
 Taking an example of where I've seen the most amount of heartburn. 
    
 Large files (i.e. zip/tar/etc) land in s3/Storage area. The knee-jerk reaction is to feed it into a distributed processing engine that autoscales. The "autoscaling" part appropriately. It may. Most likely you're flipping a coin and lighting a candle, hoping that all works out. What if the file sizes are heterogeneous and the variances between them could be quite significant? What about error handling? Does it have to be all or nothing (meaning all files need to be processing or all fail)? 
 What if autoscaling is driven through smaller processing units (groups)? 
   
    
 Here we have taken the same payloads, but defrag them into smaller units. Each unit requires a heterogeneous compute footprint driven resource consumption efficiencies. 
 Technical Details 
 For example, a payload (myExamplePayload.zip) containers 1000 JSON files. Instead of throwing this entire payload at a single compute cluster (requiring the maximum number of resources possible, aka top-line compute profile)...defrag the payload. 
    
  As the payload arrives in s3, CDF-X's (Cloudera Data Flow Experience) s3 processor listens for any files. CDF-X will pull the files, decompress, and write all the files back to s3. For example: 
   
 s3://My-S3-Bucket/decompressed/file1.json,  3://My-S3-Bucket/decompressed/file2.json, ... 
   
   In parallel CDF-X generates CDE (Cloudera Data Engineering, Spark) a job spec, a JSON payload. The job spec includes file locations.  Each job would be provided with roughly ~100 file names+location. Since CDF-X knows about the file size, it can also hint to CDE how much compute would be required to process these files. This step is not necessary as we are already defragged the unit of work to something manageable, and therefore CDE autoscaling should kick in and perform well. Once the job spec is created, CDF-X calls CDE over the rest sending over the job spec. CDE accepts the job spec and arguments (file locations) and runs the micro workloads. Each workload has its own heterogeneous compute profile and auto-scales independently.  
 Wrapping Up 
 Defragmentation of large singleton payloads enables autoscaling to run more efficiently. Autoscale is an incredibly powerful capability often misused by not applying sound design principles. Leveraging these simple patterns allow for ease of operations, cost control, and manageability.   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		08-27-2021
	
		
		02:24 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 
    
 Recently I was tasked to build a possible pattern regarding how to handle slowing changing dimensions (Type 2 to be specific) within CDP. The obvious answer is to use Hive ACID, but clearly, that answer is too generic. I needed to build a pipeline similar to what I used to do as an Informatica developer and verify legitimacy to the solution/pattern. Here we go. 
 Why? 
 The objective was to build a possible (one of many) pattern on how to handle SCD Type 2 dimensions with CDP.  Outcome: I was able to repeat a typical ETL workflow with CDP.  How? By taking off my horse blinders...Really.     What are SCDs? 
 For those who may not be family with slowly changing dimensions within a EDW context, here is a great quick read: Types Of Dimension Tables 
 How? 
 
 Data lands in a sourcing area
 
 Super typical 
 
 
 Pull source data & run it through a cleaning/processing layer for staging
 
 Ready for merging against SCD table 
 
 
 Run an ACID merge between stage and SCD Table
 
 Not rocket science 
 
 
 
 I don't believe you...I wanna try this in CDP Public Cloud or Private Cloud 
 Perfect. This article runs through a demo using CDP to do exactly that.   
 What is required for this demo? 
 
 CDP Public Cloud Account 
 Cloudera Data Engineering (CDE)...which includes airflow 
 Cloudera Data Warehouse (HiveLLAP) 
 
 Assets required to produce this demo: 
 
 Spark Code
 
 https://github.com/sunileman/spark-CDE-SCD 
 
 
 Airflow Dag
 
 https://github.com/sunileman/airflow-scd-dag 
 
 
 Data
 
 products.psv (Type 2 dim) 
 product_changes.psv (changes that need to be applied to the product dimension table) 
 
 
 
 Workflow Details 
 Airflow will orchestrate the workflow. First, a CDE Spark job will pick up the product changes from s3. Spark will perform the required ETL and then write the output to a staging table (Hive External). Lastly, using the new Airflow CDW operator; a Hive ACID merge will be executed between the staging table and product dimension. Not rocket science. I know. 
    
 Data/Tables Setup 
 For this demo, there is a single dimension table; product. The other table is product_ext, which is an external table with raw data which needs to be applied to the product dimension. Very common stuff.  
 Product DDL 
 Note: Hive 3 tables are by default internal full acid support 
 
   
 CREATE TABLE IF NOT EXISTS product(
  product_id INT,
  product_name STRING,
  aisle_id int,
  department_id int,
  start_date date,
  end_date date,
  is_current string DEFAULT 'Y') 
   
 Product_ext Schema 
 Note: Replace <YOUR-S3-BUCKET> with your s3 bucket.   
   
 CREATE EXTERNAL TABLE IF NOT EXISTS product_ext(
  product_id INT,
  product_name STRING,
  aisle_id int,
  department_id int)
  ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '|'
  STORED AS TEXTFILE
  LOCATION 's3a://<YOUR-S3-BUCKET>/sunman/products/'
  tblproperties ("skip.header.line.count"="1"); 
   
 Load the Production dimension table 
   
 insert into product
select product_id, product_name, aisle_id, department_id, current_date() as start_date, null as end_date, 'Y' as is_current from product_ext 
   
 Lastly, upload the product_changes.psv file to s3 
 Note: Replace <YOUR-S3-BUCKET> with your s3 bucket 
   
 s3a://<YOUR-S3-BUCKET>/sunman/product_changes/ 
   
 Recap: A product dimension table has been created. A file with changes that need to be applied against the product dimension table has been uploaded to s3.   
 Cloudera Data Warehousing 
 Via CDW, a Hive ACID merge statement will merge the staging table with the product dimension. This will be triggered by Airflow using the new CDW operator. More on that later. 
 Grab the JDBC URL from the virtual warehouse 
 Note: SSO must be disabled 
    
   
   
   
   
   
   
   
 For example: 
   
 jdbc:hive2://zzz.cloudera.site/default;transportMode=http;httpPath=cliservice;socketTimeout=60;ssl=true;retries=3; 
   
 Create an Airflow CDW Connection 
 To execute a CDW HQL statement(s), an airflow connection to CDW is required. The connection is referenced in the airflow dag, more on that later. 
 How to create a CDW Airflow connection: Automating data pipelines using Apache Airflow in Cloudera Data Engineering. 
   
 Important: Make note of the conn Id. It will be used later in this article 
 Create a CDE Spark Job 
 This job reads product_changes.psv file (contains the changes which need to be applied to the product dimension), performs cleansing/ETL, and stages the changes as an external Hive table. 
 The code is available on my Github page. If you're not interested in building the code, no worries. I have also provided the Spark jar which can be downloaded instead. 
   
 https://github.com/sunileman/spark-CDE-SCD/blob/master/target/scala-2.11/spark-scd_2.11-0.1.jar 
   
 CDE Spark Job Details 
 Note: For this demo to work, all job specs/args/configs must match the screen shot below 
 Note: Update <YOUR-S3-BUCKET> with your s3 bucket 
    
 Create a CDE Airflow Job 
 The airflow dag flow is the following:  
    
 First, a Spark CDE job will be called to stage the product changes into an external Hive table.  Then CDW will be called to perform the Hive ACID Merge between the product dimension and staging table.  The code for the dag is here: 
   
 https://github.com/sunileman/airflow-scd-dag 
   
 The dag file is airflow-scd.py 
 Open it and update the cli_conn_id. This is the Airflow CDW connection created earlier: 
   
 ##https://docs.cloudera.com/data-engineering/cloud/manage-jobs/topics/cde-airflow-dag-pipeline.html
hive_cdw_job = CDWOperator(
    task_id='cdw_hive_job',
    dag=dag,
    cli_conn_id='<YOUR-AIRFLOW-CDW-CONNECTION-ID>',
    hql=hive_merge,
    schema='default',
    use_proxy_user=False,
    query_isolation=False
) 
   
   
    
   
   
   
   
   
   
   
   
   
   
   
   
 Run the SCD workflow 
 Generally, airflow jobs can be executed through the UI. Since I parametrized the dag, at this time the only way to execute airflow job with run time configs is through CLI. 
 Download CDE CLI: Using the Cloudera Data Engineering command line interface 
 Note: Update <YOUR-S3-BUCKET> with your s3 bucket name 
   
 ./cde job run --config c_stageTable='product_staged' --config c_sourceLoc='s3a://<YOUR-S3-BUCKET>/sunman/product_changes/' --config c_stageCleansedTable=product_staged_cleansed  --config c_dimTable=product --name EDW-SCD-WorkFlow 
   
 This will return an airflow job ID. Return to the CDE UI. After the dag completes, run the following: 
   
 SELECT * FROM PRODUCT WHERE PRODUCT_ID = 232 
   
 Product ID 232 is one of several products, which was updated. 
 Wrap It Up! 
 This end-to-end SCD pipeline demonstrated the capability to use the right tool for the right job to accomplish a highly important EDW task. All driven by using Airflow's super powerful orchestration engine to hand off work between Apache Spark and Apache Hive.  Spark for data processing. Hive for EDW (Acid Merge). 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
         
					
				













