Community Articles
Find and share helpful community-sourced technical articles
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.
New Contributor

Objective: Describe a process to perform DI check using spark JDBC and also oozie spark action.

Most of the ETL pipelines involve DI checks between source system and ETL environment to ensure that the record counts match, and also sum/average of certain significant columns is compared on both source & ETL environments. We will see how DI check can be performed and results are stored for audit purpose using spark JDBC. We will also talk about how to prevent storing clear text password in order to make JDBC call using spark.

The following components need to be written in order to accomplish this.

  • PySpark code that invokes the Java class and retrieves clear text password at runtime & does DI check.
  • Java class to decrypt the password using hadoop credentials java API
  • Oozie spark action to trigger the spark code that performs DI check (counts)

PySpark Code for DI Check:

This python module calls a java method to decrypt the password and uses it in the spark jdbc call to get the row count from the table. Then the record count from the file is obtained using spark dataframe.count() method. Now, both the counts can be compared and determined if the counts matched. The same logic can be applied to compare sum, avg of specific columns on both RDBMS & Hadoop table/file for further data quality checks.

import sys
from datetime import datetime
from pyspark import SparkConf,SparkContext,HiveContext
from pyspark.sql.functions import json
from pyspark.sql import SQLContext
from py4j.java_gateway 
import java_import,JavaGateway

## Main Starts Here.

if __name__ == "__main__":        

conf = SparkConf().setAppName("Spark DI Check Code")        
sc = SparkContext(conf=conf)        
sqlContext = SQLContext(sc)        

### Load the Database Properties (connection string, user, password alias, credential file) from a JSON Configuration file        

dbProps = sqlContext.read.json(sys.argv[1]).first()        
print sys.argv[3]
### Extract password from alias & provider                
myjvm = sc._gateway.jvm        
// Below code creates a java Class Object
decryptionUtil = myjvm.com.hortonworks.util.DecryptionUtil()
// Below code calls Java class method and extracts the password
pswd = decryptionUtil.getPasswordFromCredentialProviders(dbProps['alias'],dbProps['provider'])
### Load the file into a data frame        
unloaded_count = sqlContext.read.parquet(sys.argv[3]).count()        
count_from_db = sqlContext.read.format('jdbc').load(url=dbProps.url,driver=dbProps.driver,dbtable=dbProps.query,user=sys.argv[5],password=pswd)        
count_from_db_int = int(count_from_db.rdd.map(lambda x:x.COUNT_FROM_TBL).first())        
print count_from_db_int        
print unloaded_count        
rowdf = sqlContext.createDataFrame([(dbProps.tblnm,str(datetime.now()),count_from_db_int,unloaded_count)], ("table_name", "runtime","rdbms_count","hadoop_count"))                

if unloaded_count == count_from_db_int:           
   print "******COUNT VALIDATION PASSED *****"        
else:           
   print "*****COUNTS DID NOT MATCH. SENDING BAD RETURN CODE*****"           

Java Class to decrypt password:

The below Java Class makes use of Hadoop Credentials API and extracts clear text password, given the alias name and the path of the encrypted password file

package com.hortonworks.code.util;

import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.List;

public class DecryptionUtil {
 public static final String CREDENTIAL_PROVIDER_PATH="hadoop.security.credential.provider.path";
 public DecryptionUtil() {}
 public String getPasswordFromCredentialProviders(String alias,String provider) throws IOException {
  char[] pass = null;
  try {
      Configuration conf = new Configuration();
      conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,provider);
      pass = conf.getPassword(alias);
     // System.out.println("pswd : " + String.valueOf(pass));
    } catch (Exception e) {
        e.printStackTrace();
    }
  return String.valueOf(pass);
 }
}

Oozie spark action:

The oozie spark action invokes the pyspark code that performs DI check

<credentials>
    <credential name='hcat-creds' type='hcat'>
        <property>
            <name>hcat.metastore.uri</name>
            <value>${HCAT_METASTORE_URI}</value>
        </property>
        <property>
            <name>hcat.metastore.principal</name>
            <value>${HCAT_METASTORE_PRINCIPAL}</value>
        </property>
    </credential>
</credentials>
<action name=“my-spark_di-check" cred="hcat-creds">
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${JOB_TRACKER}</job-tracker>
            <name-node>${NAMENODE}</name-node>
            <prepare>
                <delete path="${DI_RESULT_FILE}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${QUEUE_NAME}</value>
                </property>
                <property>
                    <name>oozie.launcher.yarn.app.mapreduce.am.env</name>
                    <value>SPARK_HOME=/usr/hdp/current/spark-client</value>
                </property>
            </configuration>
            <master>yarn-cluster</master>
            <mode>cluster</mode>
            <name>DI Check</name>
            <jar>${DI_CHECK_MODULE}</jar>
            <spark-opts> --files ${OOZIE_LIBPATH}/conf/hive-site.xml --executor-memory 4G --num-executors 2 --conf spark.yarn.appMasterEnv.hive.metastore.uris=${HIVE_METASTORE_URI}</spark-opts>
            <arg>${DBPROPS_FILE}</arg>
            <arg>${DI_RESULT_FILE}</arg>
            <arg>${TGT_BASE_TBL_DIR}</arg>
            <arg>${COUNTS_LOG}</arg>
            <arg>${SRC_USERNAME}</arg>
        </spark>
        <ok to=“next-action” />
        <error to="fail"/>
    </action>
1,932 Views
0 Kudos
Don't have an account?
Coming from Hortonworks? Activate your account here
Version history
Revision #:
1 of 1
Last update:
‎06-16-2017 06:20 AM
Updated by:
 
Contributors
Top Kudoed Authors