Member since
01-24-2017
22
Posts
9
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
41493 | 06-14-2017 02:04 PM | |
1144 | 04-25-2017 02:00 PM | |
11388 | 01-31-2017 08:04 PM |
07-05-2020
02:12 PM
1 Kudo
Introduction
This document describes the details of Knox SSO integration with a SAML identity provider (For the purpose of this article, the IDP is Ping Federate). Here, Ping Federate acts as the identity provider and it has the ability to connect to any authentication source such as AD, LDAP, etc.
The following diagram depicts the process at a high level:
The SAML authentication process has three components
Identity Provider: Identity provider has the flexibility to authenticate users against an authentication source, and sends a SAML response XML back to the service provider
Service Provider: Service provider is an application that requires users to authenticate
Authentication source: A directory service such as AD, LDAP. The service provider uses the authentication token and does not need to know the authentication source.
As part of this integration, both IDP (Ping Fed) and SP (Knox server) exchange information for the handshake to work successfully.
The following information is shared to IDP by SP:
Service Provider EntityID= https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso?pac4jCallback=true&client_name=SAML2Client
Assertion Consumer service URL (IDP sends SAML response to this endpoint) = https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso?pac4jCallback=true&client_name=SAML2Client
Knox hostnames and IP addresses so that ports can be opened on the firewalls etc
The following information is shared to SP by IDP:
IDP metadata URL
Knox configuration
Configure knoxsso.xml to use pac4j provider which has the identity provider metadata that allows Knox to communicate with IDP. A sample knoxsso XML is available in the Appendix section below.
Configure the Service XML that uses the SSOCookieProvider mechanism to route the initial service request to KnoxSSO.
Request the IDP team for the metadata XML file. You may receive the metadata URL from the IDP team. This URL can be added in the Knox SSO XML under the property: saml.identityProviderMetadataPath
Conversely, the IDP Metadata URL can be entered on the browser. Copy the XML content and paste it in a file such as /etc/knox/conf/idp-metadata.xml. This file path can be used as a value for the property saml.identityProviderMetadataPath.
Ensure that the file is owned by Knox: $ chown knox:knox /etc/knox/conf/idp-metadata.xml
The saml.serviceProviderMetadataPath property should be set to a file path on Knox server such as /etc/knox/conf/sp-metadata.xml. The property should be in knoxsso.xml file however, the file does not exist.
Ensure that the service provider EntityID is shared to IDP team and it matches what we have in knoxsso.xml configuration.
The Service Provider EntityID look like the following: https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso?pac4jCallback=true&client_name=SAML2Client In our scenario, the above URL was shared with the IDP team for them to the update SP metadata. However, we ran into an issue where the Entity IDs did not match between SP & IDP. The SAML trace shows that the “amp;” was removed. The solution for this is to remove “amp;” from the SP Entity ID on IDP: SP Entity ID on knox: https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso?pac4jCallback=true&client_name=SAML2Client SP Entity ID shared to Ping Federate: https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso?pac4jCallback=true&client_name=SAML2Client
Create a Knox topology XML that uses the SSOCookieProvider and lists the services that should be placed behind the Knox SSO. The example file is present in the Appendix section.
Ensure that the knoxsso topology content has been added in Ambari Knox configuration in the Advanced-Knoxsso-topology section and restart the Knox service.
Ensure that both topology files (knoxsso, service topology xml) are owned by the Knox user: $ chown knox:knox /etc/knox/conf/topologies/knoxsso.xml
$ chown knox:knox /etc/knox/conf/topologies/clustername-sso.xml (this file is in Appendix)
Issues Encountered:
ISSUE 1
The attempt to access a Knox URL failed with below error while Knox was trying to communicate to Ping Federate server:
javax.servlet.ServletException: org.pac4j.core.exception.TechnicalException: java.lang.UnsupportedOperationException: trusted certificate entries are not password-protected
This error indicates that the keystore has a certificate that is of type “trustedCertEntry”. Pac4j expects all certificates in Knox keystore to be of type “PrivateKeyEntry”.
The certificate was converted from "trustedCertEntry" to "PrivateKeyEntry". To resolve this issue, do the following to create a PrivateKeyEntry certificate:
Obtain the public key and private key for the certificate.
Create a PKCS12 formatted keystore from the above keys: $ openssl pkcs12 -export -in cert -inkey key -out pkcert.p12 -name pkcert.alias
Import the cert from PKCS12 formatted cert into the Knox java keystone: $ keytool -importkeystore -deststorepass knoxgwpasswd -destkeypass knoxgwkeypass -destkeystore gateway.jks -srckeystore pkcert.p12 -srcstoretype PKCS12 -srcstorepass somepasswd -alias pkcert.alias
Delete any certificates of kind "trustedCertEntry" from knox gateway.jks: $ keytool -delete -noprompt -alias xxxxxx -keystore gateway.jks -storepass xxxxxx
Once this is done, restart Knox and test again by entering a Knox URL for a Hadoop service (Ranger, Atlas, Yarn UI, Spark UI, Zeppelin UI, etc. )
ISSUE 2
Another error encountered during our test is that upon entering the URL for service UI (yarn UI), the browser redirects to the IDP login page, and upon entering credentials, it takes the user back to the IDP login page.
This issue occurred because Knox certificate was regenerated or recreated and the Knox public key was not replaced in the configuration of services like Yarn, MapReduce, Ranger, Spark, Atlas, etc.
To resolve this issue, ensure that whenever Knox certificate or Keystore is modified, the public key should be extracted and added into Yarn and other service configurations. Once all the above configuration is in place, the Knox SSO service URL can be entered on the browser:
https://knoxhost.domain:8443/gateway/topology/yarn/
The topology in above URL is not the KNOXSSO topology. It should be another topology that has SSOCookieProvider and has service URL mappings.
Once you enter the local AD credentials, IDP (Ping Fed) validates the user credentials and sends the SAML response back to Knox. Knox SSO redirects back to Yarn UI.
Appendix
The following are the Knox gateway logs during successful round trips form Knox to Ping federate:
Knoxsso.xml
<topology>
<gateway>
<provider>
<role>federation</role>
<name>pac4j</name>
<enabled>true</enabled>
<param>
<name>pac4j.callbackUrl</name> <value>https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso</value>
</param>
<param>
<name>clientName</name>
<value>SAML2Client</value>
</param>
<param>
<name>saml.identityProviderMetadataPath</name>
<value>/etc/knox/conf/idp-metadata.xml</value>
</param>
<param>
<name>saml.serviceProviderMetadataPath</name>
<value>/etc/knox/conf/sp-metadata.xml</value>
</param>
<param>
<name>saml.serviceProviderEntityId</name>
<value>https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso?pac4jCallback=true&client_name=SAML2Client</value>
</param>
</provider>
</gateway>
<service>
<role>KNOXSSO</role>
<param>
<name>knoxsso.cookie.secure.only</name>
<value>true</value>
</param>
<param>
<name>knoxsso.token.ttl</name>
<value>3600000</value>
</param>
<param>
<name>knoxsso.redirect.whitelist.regex</name>
<value>.*$</value>
</param>
</service>
</topology>
Knox Topology XML to expose the services(clustername-sso.xml):
<topology>
<generated>true</generated>
<gateway>
<provider>
<role>webappsec</role>
<name>WebAppSec</name>
<enabled>true</enabled>
<param>
<name>cors.enabled</name>
<value>true</value>
</param>
<param><name>xframe.options.enabled</name><value>true</value></param>
</provider>
<provider>
<role>federation</role>
<name>SSOCookieProvider</name>
<enabled>true</enabled>
<param>
<name>sso.authentication.provider.url</name>
<value>https://knoxhost.domain:8443/gateway/knoxsso/api/v1/websso</value>
</param>
</provider>
<provider>
<role>identity-assertion</role>
<name>Default</name>
<enabled>true</enabled>
</provider>
</gateway>
<service>
<role>ATLAS-API</role>
<url>https://knoxhost.domain:21443</url>
<url>https://knoxhost.domain:21443</url>
</service>
<service>
<role>ATLAS</role>
<url>https://knoxhost.domain:21443</url>
<url>https://knoxhost.domain:21443</url>
</service>
<service>
<role>RANGER</role>
<url>https://knoxhost.domain:6182</url>
</service>
<service>
<role>RANGERUI</role>
<url>https://knoxhost.domain:6182</url>
</service>
<service>
<role>YARNUI</role>
<url>https://knoxhost.domain:8090</url>
<url>https://knoxhost.domain:8090</url>
</service>
</topology>
... View more
Labels:
06-29-2020
01:45 PM
1 Kudo
Introduction
This article assumes that KnoxSSO for NiFi UI is enabled and is working as expected.
To configure Knox topology in order to access NiFi Rest API using Knox based NiFi URL, do the following:
Access NiFi UI through Knox SSO based URL such as the following: https://<knox.fqdn>:8443/gateway/<nifi-topology>/nifi-app/nifi/ This helps the users to expose only the Knox host and port (not the NiFi hosts) and also authenticate users via SSO before they successfully log in to NiFi UI.
Once the SSO is enabled on NiFi UI, NiFi URL for the UI or the Rest API will automatically redirect to the SSO page.
Usually, the NiFi Rest API access involves obtaining Bearer token and using it in subsequent API calls. With Knox proxy-based NiFi endpoint, this Bearer token would not work as Knox does not recognize this token. On top of that, the URL always redirects to the SSO URL.
To establish access to Knox based NiFi URLs, both Knox and NiFi need to be configured to generate the Knox JWT token and honor the JWT token while accessing NiFi Rest API.
Configuration
The following is the process to enable NiFi API access to NiFi instances that are protected by KnoxSSO:
Knox
Create a KNOXTOKEN service in one of the Knox topologies to allow users to extract the Knox token. Any existing topology can be used for this purpose. In this example, knoxsso.xml is used to add KNOXTOKEN service.
Add the following content at the end of the Advanced KnoxSSO topology in Ambari UI and restart Knox service: <service>
<role>KNOXTOKEN</role>
<param>
<name>knox.token.ttl</name>
<value>18000000</value>
</param>
<param>
<name>knox.token.audiences</name>
<value>hdftopology</value>
</param>
<param>
<name>knox.token.target.url</name>
<value>https://knox.fqdn:8443/gateway/hdftopology</value>
</param>
</service>
Create a new Knox topology XML file for NiFi API access via Knox token. This topology should use the JWTProvider for authentication which honors the Knox token.
Place this file in the /etc/knox/conf/topologies/ folder and ensure it is owned by the "Knox" user. <topology>
<gateway>
<provider>
<role>federation</role>
<name>JWTProvider</name>
<enabled>true</enabled>
<param>
<name>knox.token.audiences</name>
<value>hdftopology</value>
</param>
</provider>
<provider>
<role>identity-assertion</role>
<name>Default</name>
<enabled>true</enabled>
</provider>
<provider>
<role>authorization</role>
<name>XASecurePDPKnox</name>
<enabled>true</enabled>
</provider>
</gateway>
<service>
<role>NIFI</role>
<url>https://nifi-1.domain:9091</url>
<url>https://nifi-2.domain:9091</url>
<url>https://nifi-3.domain:9091</url>
<param>
<name>useTwoWaySsl</name>
<value>true</value>
</param>
</service>
<service>
<role>NIFI-API</role>
<url>https://nifi-1.domain:9091</url>
<url>https://nifi-2.domain:9091</url>
<url>https://nifi-3.domain:9091</url>
<param>
<name>useTwoWaySsl</name>
<value>true</value>
</param>
</service>
</topology>
Run the following command on Knox server: $ chown knox:knox /etc/knox/conf/topologies/hdftopology.xml
The moment the topology file is added, browse the Knox logs to ensure that the topology is activated successfully.
NiFi
In Nifi configuration, add a new entry for nifi.web.proxy.context.path for NiFi to allow this URL path: “gateway/hdftopology/nifi-app”.
This path uses the new JWTProvider topology that we created for NiFi API access.
After making the changes, restart NiFi.
Validation
Generate Knox token by using the following CURL command: $curl -ivku <username>:<password> https://knox.fqdn:8443/gateway/knoxsso/knoxtoken/api/v1/token
Extract the “access token” so that it can be used in subsequent NiFi API calls. This access token is nothing but the Knox JWT token.
This token should be used as the Bearer token in NiFi API calls. An example NiFi API call looks like the following:
[root@knox.fqdn topologies]# curl -ivk -H "Authorization: Bearer xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" https://knox.fqdn:8443/gateway/hdftopology/nifi-app/nifi-api/flow/cluster/summary
* About to connect() to knox.fqdn port 8443 (#0)
* Trying 10.xx.xxx.xx...
* Connected to knox.fqdn (10.xx.xxx.xx) port 8443 (#0)
...
...
* Connection #0 to host knox.fqdn left intact
{"clusterSummary":{"connectedNodes":"3 / 3","connectedNodeCount":3,"totalNodeCount":3,"connectedToCluster":true,"clustered":true}
... View more
Labels:
02-14-2020
04:18 PM
1 Kudo
This document seeks to explain the configuration steps required to access NiFi UI using Knox SSO and along with Knox proxy.
This eliminates the need to access NiFi UI using the regular port 9091 on the NiFi node
Configure Knox SSO for NiFi:
Create the CA signed Knox certificate:
Ensure that Knox certificate is signed with the same CA that NiFi certificates are signed. By default, knox creates a self-signed certificate which needs to be replaced by CA signed cert.
Go to knox server and do the following:
Copy the CA signed certificate from the knox host as gateway.jks in the knox keystore path.
$ cp /etc/security/certs/keystore.jks /var/lib/knox/data-3.1.0.0-78/security/keystores/gateway.jks
$ chown knox:knox /var/lib/knox/data-3.1.0.0-78/security/keystores/gateway.jks
Knox certificate’s keystore password should match the knox master secret (let us say, it is Hadoop*123)
Now, let us change the new knox certificate’s password:
$ keytool -storepasswd -keystore /var/lib/knox/data-3.1.0.0-78/security/keystores/gateway.jks
Enter existing password: <changeit>
New password: <Hadoop*123>
Re-enter new password:
Change key's password for knox certificate:
$ keytool -keypasswd -alias gateway-identity -keystore /var/lib/knox/data-3.1.0.0-78/security/keystores/gateway.jks
enter keystore password: <Hadoop*123>
enter key's existing password: <changeit>
enter key's new password: <Hadoop*123>
Restart Knox service using ambari UI
Deploy Knox Certificate on NiFi Nodes:
Run the following commands on all NiFi nodes:
Ensure that the correct knox server is used in the below command.
$ openssl s_client -connect knox.example.com:8443</dev/null| openssl x509 -out /usr/hdf/current/nifi/conf/knox.pem
$ chown nifi:nifi /usr/hdf/current/nifi/conf/knox.pem
Add Knox hostname as a user in NiFi UI:
This step is very important, and needs to be done before modifying nifi configurations and restarting nifi
Login to NiFi UI as the admin user and add Knox hostname as a user and add the proxy request privilege to the Knox host user.
Once knox DN is added as user, go to policies, select the policy “proxy user requests” and click on add user button. Select the knox host user you just added and click on ADD.
NiFi Configuration changes in Ambari UI:
Login to HDF Ambari UI make the following configuration changes.
Get the Knox server DN from knox certificate. For this:
Login to knox server and run the command:
$ keytool -list -keystore /var/lib/knox/data-3.1.0.0-78/security/keystores/gateway.jks -v
Enter the keystore password: <Hadoop*123>
Look for the word “gateway-identity” in the output and capture the DN of the “owner”. It will look something like this CN=knox.host.example.com,
Add the DN obtained from above step, and add it as a node identity in NiFi configuration as shown below. The last entry in the screen shot is the knox DN:
Advanced nifi-ambari-ssl-config:
nifi.security.needClientAuth = true
Add an entry in the “node identities” check box on ambari nifi configuration page, and add the full DN of the knox host from the knox certificate.
Advanced nifi-properties:
Make the following changes in ambari.
Please note that nifi.security.user.login.identity.provider should be empty when you configure for knox SSO. Usually we see “ldap-provider” in this property. If you see it, make it empty.
In the following configuration, replace “mytopo” with the name of the “topology” name. the knox host name and port should be entered in “nifi.web.proxy.host”
Restart NiFi service on ambari
Knox proxy configuration:
Configure Knox proxy in your knox topology file by adding below entries in the file. The topology file should be owned by “knox” user.
<topology>
<gateway>
<provider>
<role>federation</role>
<name>SSOCookieProvider</name>
<enabled>true</enabled>
<param>
<name>sso.authentication.provider.url</name>
<value>https://knox.host.example.com:8443/gateway/knoxsso/api/v1/websso</value>
</param>
</provider>
<provider>
<role>identity-assertion</role>
<name>Default</name>
<enabled>true</enabled>
</provider>
</gateway>
<service>
<role>NIFI</role>
<url>https://nifi1.example.com:9091</url>
<url>https://nifi2.example.com:9091</url>
<url>https://nifi3.example.com:9091</url>
<param>
<name>useTwoWaySsl</name>
<value>true</value>
</param>
</service>
</topology>
Add the NiFi IP addresses and host name into your windows hosts file (if you are using private IPs)
Once it is done, enter the following url in the browser:
https://knox.host.example.com:8443/gateway/mytopo/nifi-app/nifi
It will take you to the knox SSO page, and then you can enter the user credentials and it will take you to the NiFi UI canvas.
... View more
Labels:
09-09-2017
11:30 PM
1 Kudo
This article seeks to walk you through the process developed in order to classify a given set of images into one of the x number of categories with the help of training datasets (of images) & a deep learning image recognition model "InceptionV3" & RanomForest classification algorithm. The technologies used are tensorflow & spark on hadoop platform. It inovles the following modules: Technology Stack: Spark 2.1.0, Python 3.5, HDP 2.6.1 Prepare the Training datasets: 1. Install the ImageMagick package using "yum install imagemagick" 2. Obtain sample images from the customer and also the label that needs to be associated with them 2. Extract the label (metadata) from each image using "ImageMagick" tool and place each image into different folders prepareTrainingData.sh This is a bash script that extracts the metadata from the image and puts them into different bins say category 1 through 5. These images were manually classified by analyzing them with naked eye. These images will be used for training our model. #!/bin/bash
IMAGES="/home/arun/image-classification/Images"
UNLABELED="./UnlabeledImages.txt"
TRAININGDATA="/home/arun/image-classification/TrainingData"
rm "$UNLABELED"
for i in $(ls -1 "$IMAGES")
do
for j in $(ls -1 "$IMAGES"/"$i")
do
rating=`identify -verbose "$IMAGES"/"$i"/"$j" | grep xmp:Rating | cut -d':' -f3`
rtng=`echo "$rating" | awk '{$1=$1};1'`
case "$rtng" in
1) echo " this is cat 1"
cp "$IMAGES"/"$i"/"$j" "$TRAININGDATA"/cat1/
;;
2) echo "this is cat 2"
cp "$IMAGES"/"$i"/"$j" "$TRAININGDATA"/cat2/
;;
3) echo "this is cat 3"
cp "$IMAGES"/"$i"/"$j" "$TRAININGDATA"/cat3/
;;
4) echo "this is cat 4"
cp "$IMAGES"/"$i"/"$j" "$TRAININGDATA"/cat4/
;;
5) echo "thi is cat 5"
cp "$IMAGES"/"$i"/"$j" "$TRAININGDATA"/cat5/
;;
*) echo "this is someting else"
echo "$j" >> "$UNLABELED"
;;
esac
done
done
Classify Images: 1. Install the python packages numpy, keras, tensorflow, nose, pillow, h5py, py4j on all the gateway & worker nodes of the cluster. You can use either pip or anaconda for this. 2. Start a pyspark session and download a spark deep learning library from Databricks that runs on top of tensorflow and uses other python packages that we installed before. This spark DL library provides an interface to perform functions such as reading images into a spark dataframe, applying the InceptionV3 model and extract features from the images etc., 3. In the pyspark session, read the images into a dataframe and split the images into training and test dataframes. 4. Create a spark ml pipeline and add the stages 1) ImageFeaturizer 2) RandomForest Classifier 5. Execute the fit function and obtain a model 6. Predict using the model & also calculate the prediction accuracy ### Fire up a pyspark session
export PYSPARK_PYTHON=/opt/anaconda3/bin/python3
export SPARK_HOME=/usr/hdp/current/spark2-client
$SPARK_HOME/bin/pyspark --packages databricks:spark-deep-learning:0.1.0-spark2.1-s_2.11 --master yarn --executor-memory 3g --driver-memory 5g --conf spark.yarn.executor.memoryOverhead=5120
### Add the spark deep-learning jars into the classpath
import sys,glob,os
sys.path.extend(glob.glob(os.path.join(os.path.expanduser("~"),".ivy2/jars/*.jar")))
### PySpark code to read images, create spark ml pipeline, train the mode & predict
from sparkdl import readImages
from pyspark.sql.functions import lit
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
img_dir = "/user/arun/TrainingData"
cat1_df = readImages(img_dir + "/cat1").withColumn("label", lit(1))
cat2_df = readImages(img_dir + "/cat2").withColumn("label", lit(2))
cat3_df = readImages(img_dir + "/cat3").withColumn("label", lit(3))
cat4_df = readImages(img_dir + "/cat4").withColumn("label", lit(4))
cat5_df = readImages(img_dir + "/cat5").withColumn("label", lit(5))
//Split the images where 90% of them go to training data, 10% go to test data
cat1_train, cat1_test = cat1_df.randomSplit([0.9, 0.1])
cat2_train, cat2_test = cat2_df.randomSplit([0.9, 0.1])
cat3_train, cat3_test = cat3_df.randomSplit([0.9, 0.1])
cat4_train, cat4_test = cat4_df.randomSplit([0.9, 0.1])
cat5_train, cat5_test = cat5_df.randomSplit([0.9, 0.1])
train_df = cat1_train.unionAll(cat2_train).unionAll(cat3_train).unionAll(cat4_train).unionAll(cat5_train)
test_df = cat1_test.unionAll(cat2_test).unionAll(cat3_test).unionAll(cat4_test).unionAll(cat5_test)
featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
p = Pipeline(stages=[featurizer, rf])
p_model = p.fit(train_df)
predictions = p_model.transform(test_df)
predictions.select("filePath", "label", "prediction").show(200,truncate=False)
preds_vs_labels = predictions.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("accuracy of predictions by model = " + str(evaluator.evaluate(preds_vs_labels)))
# TRY TO CLASSIFY CAT 5 IMAGES, AND SEE HOW CLOSE THEY GET IN PREDICTING
cat5_imgs = readImages(img_dir + "/cat5").withColumn("label", lit(5))
pred5 = p_model.transform(cat5_imgs)
pred5.select("filePath","label","prediction").show(200,truncate=False)
Reference: https://medium.com/linagora-engineering/making-image-classification-simple-with-spark-deep-learning-f654a8b876b8
... View more
Labels:
06-16-2017
06:20 AM
Objective: Describe a process to perform DI check using spark JDBC and also oozie spark action. Most of the ETL pipelines involve DI checks between source system and ETL environment to ensure that the record counts match, and also sum/average of certain significant columns is compared on both source & ETL environments. We will see how DI check can be performed and results are stored for audit purpose using spark JDBC. We will also talk about how to prevent storing clear text password in order to make JDBC call using spark. The following components need to be written in order to accomplish this.
PySpark code that invokes the Java class and retrieves clear text password at runtime & does DI check. Java class to decrypt the password using hadoop credentials java API Oozie spark action to trigger the spark code that performs DI check (counts) PySpark Code for DI Check: This python module calls a java method to decrypt the password and uses it in the spark jdbc call to get the row count from the table. Then the record count from the file is obtained using spark dataframe.count() method. Now, both the counts can be compared and determined if the counts matched. The same logic can be applied to compare sum, avg of specific columns on both RDBMS & Hadoop table/file for further data quality checks. import sys
from datetime import datetime
from pyspark import SparkConf,SparkContext,HiveContext
from pyspark.sql.functions import json
from pyspark.sql import SQLContext
from py4j.java_gateway
import java_import,JavaGateway
## Main Starts Here.
if __name__ == "__main__":
conf = SparkConf().setAppName("Spark DI Check Code")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
### Load the Database Properties (connection string, user, password alias, credential file) from a JSON Configuration file
dbProps = sqlContext.read.json(sys.argv[1]).first()
print sys.argv[3]
### Extract password from alias & provider
myjvm = sc._gateway.jvm
// Below code creates a java Class Object
decryptionUtil = myjvm.com.hortonworks.util.DecryptionUtil()
// Below code calls Java class method and extracts the password
pswd = decryptionUtil.getPasswordFromCredentialProviders(dbProps['alias'],dbProps['provider'])
### Load the file into a data frame
unloaded_count = sqlContext.read.parquet(sys.argv[3]).count()
count_from_db = sqlContext.read.format('jdbc').load(url=dbProps.url,driver=dbProps.driver,dbtable=dbProps.query,user=sys.argv[5],password=pswd)
count_from_db_int = int(count_from_db.rdd.map(lambda x:x.COUNT_FROM_TBL).first())
print count_from_db_int
print unloaded_count
rowdf = sqlContext.createDataFrame([(dbProps.tblnm,str(datetime.now()),count_from_db_int,unloaded_count)], ("table_name", "runtime","rdbms_count","hadoop_count"))
if unloaded_count == count_from_db_int:
print "******COUNT VALIDATION PASSED *****"
else:
print "*****COUNTS DID NOT MATCH. SENDING BAD RETURN CODE*****"
Java Class to decrypt password: The below Java Class makes use of Hadoop Credentials API and extracts clear text password, given the alias name and the path of the encrypted password file package com.hortonworks.code.util;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.List;
public class DecryptionUtil {
public static final String CREDENTIAL_PROVIDER_PATH="hadoop.security.credential.provider.path";
public DecryptionUtil() {}
public String getPasswordFromCredentialProviders(String alias,String provider) throws IOException {
char[] pass = null;
try {
Configuration conf = new Configuration();
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,provider);
pass = conf.getPassword(alias);
// System.out.println("pswd : " + String.valueOf(pass));
} catch (Exception e) {
e.printStackTrace();
}
return String.valueOf(pass);
}
}
Oozie spark action: The oozie spark action invokes the pyspark code that performs DI check <credentials>
<credential name='hcat-creds' type='hcat'>
<property>
<name>hcat.metastore.uri</name>
<value>${HCAT_METASTORE_URI}</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>${HCAT_METASTORE_PRINCIPAL}</value>
</property>
</credential>
</credentials>
<action name=“my-spark_di-check" cred="hcat-creds">
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${JOB_TRACKER}</job-tracker>
<name-node>${NAMENODE}</name-node>
<prepare>
<delete path="${DI_RESULT_FILE}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${QUEUE_NAME}</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.env</name>
<value>SPARK_HOME=/usr/hdp/current/spark-client</value>
</property>
</configuration>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>DI Check</name>
<jar>${DI_CHECK_MODULE}</jar>
<spark-opts> --files ${OOZIE_LIBPATH}/conf/hive-site.xml --executor-memory 4G --num-executors 2 --conf spark.yarn.appMasterEnv.hive.metastore.uris=${HIVE_METASTORE_URI}</spark-opts>
<arg>${DBPROPS_FILE}</arg>
<arg>${DI_RESULT_FILE}</arg>
<arg>${TGT_BASE_TBL_DIR}</arg>
<arg>${COUNTS_LOG}</arg>
<arg>${SRC_USERNAME}</arg>
</spark>
<ok to=“next-action” />
<error to="fail"/>
</action>
... View more
06-15-2017
07:25 PM
You can persist the data with partitioning by using the partitionBy(colName) while writing the data frame to a file. The next time you use the dataframe, it wont cause shuffles. There is a JIRA for the issue you mentioned, which is fixed in 2.2. You can still workaround by increasing driver.maxResult size.
SPARK-12837
... View more
06-14-2017
02:04 PM
1 Kudo
Hi Jean, Can you please try the following and let us know if the query performance improved ? 1. set up the shuffle partitions to a higher number than 200, because 200 is default value for shuffle partitions. ( spark.sql.shuffle.partitions=500 or 1000) 2. while loading hive ORC table into dataframes, use the "CLUSTER BY" clause with the join key. Something like, df1 = sqlContext.sql("SELECT * FROM TABLE1 CLSUTER BY JOINKEY1") df2 = sqlContext.sql("SELECT * FROM TABLE2 CLUSTER BY JOINKEY2") df1.registerTempTable("df1_tbl") df2.registerTempTable("df2_tbl") Now join df1_tbl & df2_tbl using joinkey1 & joinkey2.
... View more
04-25-2017
02:00 PM
Hi Amit, HDPCD:Spark exam focuses on Spark core, Spark SQL,Spark-submit and writing data into different formats. It does not test you on Spark streaming, and I am not sure if it has any questions on Spark ML.
... View more
02-07-2017
03:58 PM
Are you using sqljdbc42.jar or jtds.jar for the driver ? Also, is sqoop eval working with integratedSecurity option or you are passing user id & password ?
... View more
02-07-2017
03:52 PM
Clustering algorithms in Spark 2.0 use ML version so, the feature vectors need to be of type ML instead of MLLIB. I had to convert an MLLIB vector to ML vector to make it work in spark 2.0.
... View more