Created on 06-16-2017 06:20 AM
Objective: Describe a process to perform DI check using spark JDBC and also oozie spark action.
Most of the ETL pipelines involve DI checks between source system and ETL environment to ensure that the record counts match, and also sum/average of certain significant columns is compared on both source & ETL environments. We will see how DI check can be performed and results are stored for audit purpose using spark JDBC. We will also talk about how to prevent storing clear text password in order to make JDBC call using spark.
The following components need to be written in order to accomplish this.
PySpark Code for DI Check:
This python module calls a java method to decrypt the password and uses it in the spark jdbc call to get the row count from the table. Then the record count from the file is obtained using spark dataframe.count() method. Now, both the counts can be compared and determined if the counts matched. The same logic can be applied to compare sum, avg of specific columns on both RDBMS & Hadoop table/file for further data quality checks.
import sys from datetime import datetime from pyspark import SparkConf,SparkContext,HiveContext from pyspark.sql.functions import json from pyspark.sql import SQLContext from py4j.java_gateway import java_import,JavaGateway ## Main Starts Here. if __name__ == "__main__": conf = SparkConf().setAppName("Spark DI Check Code") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) ### Load the Database Properties (connection string, user, password alias, credential file) from a JSON Configuration file dbProps = sqlContext.read.json(sys.argv[1]).first() print sys.argv[3] ### Extract password from alias & provider myjvm = sc._gateway.jvm // Below code creates a java Class Object decryptionUtil = myjvm.com.hortonworks.util.DecryptionUtil() // Below code calls Java class method and extracts the password pswd = decryptionUtil.getPasswordFromCredentialProviders(dbProps['alias'],dbProps['provider']) ### Load the file into a data frame unloaded_count = sqlContext.read.parquet(sys.argv[3]).count() count_from_db = sqlContext.read.format('jdbc').load(url=dbProps.url,driver=dbProps.driver,dbtable=dbProps.query,user=sys.argv[5],password=pswd) count_from_db_int = int(count_from_db.rdd.map(lambda x:x.COUNT_FROM_TBL).first()) print count_from_db_int print unloaded_count rowdf = sqlContext.createDataFrame([(dbProps.tblnm,str(datetime.now()),count_from_db_int,unloaded_count)], ("table_name", "runtime","rdbms_count","hadoop_count")) if unloaded_count == count_from_db_int: print "******COUNT VALIDATION PASSED *****" else: print "*****COUNTS DID NOT MATCH. SENDING BAD RETURN CODE*****"
Java Class to decrypt password:
The below Java Class makes use of Hadoop Credentials API and extracts clear text password, given the alias name and the path of the encrypted password file
package com.hortonworks.code.util; import org.apache.hadoop.security.alias.CredentialProvider; import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry; import org.apache.hadoop.security.alias.CredentialProviderFactory; import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.util.List; public class DecryptionUtil { public static final String CREDENTIAL_PROVIDER_PATH="hadoop.security.credential.provider.path"; public DecryptionUtil() {} public String getPasswordFromCredentialProviders(String alias,String provider) throws IOException { char[] pass = null; try { Configuration conf = new Configuration(); conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,provider); pass = conf.getPassword(alias); // System.out.println("pswd : " + String.valueOf(pass)); } catch (Exception e) { e.printStackTrace(); } return String.valueOf(pass); } }
Oozie spark action:
The oozie spark action invokes the pyspark code that performs DI check
<credentials> <credential name='hcat-creds' type='hcat'> <property> <name>hcat.metastore.uri</name> <value>${HCAT_METASTORE_URI}</value> </property> <property> <name>hcat.metastore.principal</name> <value>${HCAT_METASTORE_PRINCIPAL}</value> </property> </credential> </credentials> <action name=“my-spark_di-check" cred="hcat-creds"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${JOB_TRACKER}</job-tracker> <name-node>${NAMENODE}</name-node> <prepare> <delete path="${DI_RESULT_FILE}"/> </prepare> <configuration> <property> <name>mapred.job.queue.name</name> <value>${QUEUE_NAME}</value> </property> <property> <name>oozie.launcher.yarn.app.mapreduce.am.env</name> <value>SPARK_HOME=/usr/hdp/current/spark-client</value> </property> </configuration> <master>yarn-cluster</master> <mode>cluster</mode> <name>DI Check</name> <jar>${DI_CHECK_MODULE}</jar> <spark-opts> --files ${OOZIE_LIBPATH}/conf/hive-site.xml --executor-memory 4G --num-executors 2 --conf spark.yarn.appMasterEnv.hive.metastore.uris=${HIVE_METASTORE_URI}</spark-opts> <arg>${DBPROPS_FILE}</arg> <arg>${DI_RESULT_FILE}</arg> <arg>${TGT_BASE_TBL_DIR}</arg> <arg>${COUNTS_LOG}</arg> <arg>${SRC_USERNAME}</arg> </spark> <ok to=“next-action” /> <error to="fail"/> </action>
Created on 12-26-2023 03:31 AM
This way the password is provided to the connection is exposed in plain text?