Member since
06-21-2018
9
Posts
6
Kudos Received
0
Solutions
01-07-2021
05:35 PM
1 Kudo
In this article, we’ll walk through the steps required to connect a Spark Structured Streaming application to Kafka in CDP Data Engineering Experience (DEX). This article extends the official CDP documentation: Connecting Kafka clients to Data Hub provisioned clusters, to include Spark applications run in Cloudera Data Experience.
Steps
Obtain the FreeIPA certificate of your environment:
From the CDP home page, navigate to Management Console > Environments
Locate and select your environment from the list of available environments
Click Actions
Select Get FreeIPA Certificate from the drop-down menu. The FreeIPA certificate downloads.
Add the FreeIPA certificate to a Java keystore: All clients that you want to connect to the Data Hub provisioned cluster, will need to use the certificate to communicate over TLS. The exact steps of adding the certificate to the truststore depends on the platform and key management software used. For example, you can use the Java keytool command line tool: keytool -import -keystore [CLIENT_TRUSTSTORE.JKS] -alias [ALIAS] -file [FREEIPA_CERT]
In this case, we will replace [CLIENT_TRUSTSTORE.JKS] with keystore.jks, since we want to name our newly created keystore file keystore.jks.
Obtain CDP workload credentials: A valid workload username and password have to be provided to the client, otherwise, it cannot connect to the cluster. Credentials can be obtained from the Management Console.
From the CDP Home Page, navigate to Management Console > User Management.
Locate and select the user account you want to use from the list of available accounts. The user details page displays information about the user.
Find the username found in the Workload Username entry and note it down.
Find the Workload Password entry and click Set Workload Password.
In the dialog box that appears, enter a new workload password, confirm the password and note it down.
Fill out the Environment text box.
Click Set Workload Password and wait for the process to finish.
Click Close.
Create a pyspark script to stream from a Kafka topic script.py: from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka_broker_hostname:9093") \
.option("subscribe", "yourtopic") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.truststore.location", "/app/mount/keystore.jks") \
.option("kafka.ssl.truststore.password", "mypassword") \
.option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="mypassword";') \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
query.awaitTermination() Note: In the above code, we have specified that our keystore location as /app/mount/keystore.jks, in our option (kafka.ssl.truststore.location). This is because when we upload our keystore.jks file to Cloudera data experience later, it will be uploaded to the /app/mount directory. Note: We have specified our keystore password in the kafka.ssl.truststore.password option. This was the password that we provided for our keystore at the time of its creation. Finally, note that we have specified our workload username and password in the kafka.sasl.jaas.config option.
Create a Cloudera Data Engineering Experience Job
From the CDP Home Page navigate to CDP Cloudera Data Engineering > Jobs > Create Job
Upload the script.py file to Cloudera Data Engineering Experience (via the Job Details > Upload File Textbox), and the keystore.jks file (via Advanced Options > Other Dependencies Textbox)
Name the job using the Job details > Name Textbox
Unselect Schedule
Click Run now
Validate that the Job is running successfully
Navigate to CDP Cloudera Data Engineering > Jobs Runs
Drill down into the Run ID/ Job
Navigate to Actions > Logs
Select the stdout tab |
... View more
01-07-2021
01:20 PM
In this article, we will walk through the steps required to connect a Spark Structured Streaming application to Kafka in CDP Data Hub. We use two Data Hubs, one with a Data Engineering Template, and another with a Streams Messaging template. Both Data Hubs were created in the same environment. This article extends the official CDP documentation: Connecting Kafka clients to Data Hub provisioned clusters to include Spark applications. Steps 1. Obtain the FreeIPA certificate of your environment: From the CDP Home Page, navigate to Management Console > Environments Locate and select your environment from the list of available environments Click Actions Select Get FreeIPA Certificate from the drop-down menu. The FreeIPA certificate downloads. 2. Add the FreeIPA certificate to the truststore of the client. The certificate needs to be added for all clients that you want to connect to the Data Hub provisioned cluster. The exact steps of adding the certificate to the truststore depends on the platform and key management software used. For example, you can use the Java keytool command line tool: keytool -import -keystore [CLIENT_TRUSTSTORE.JKS] -alias [ALIAS] -file [FREEIPA_CERT] 3. Obtain CDP workload credentials: A valid workload username and password has to be provided to the client, otherwise it cannot connect to the cluster. Credentials can be obtained from Management Console. From the CDP Home Page, navigate to Management Console > User Management Locate and select the user account you want to use from the list of available accounts. (The user details page displays information about the user.) Find the username found in the Workload Username entry and note it down Find the Workload Password entry and click Set Workload Password In the dialog box that appears, enter a new workload password, confirm the password and note it down Fill out the Environment text box Click Set Workload Password and wait for the process to finish Click Close 4. Create a pyspark script to connect to Kafka script.py from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka_broker_hostname:9093") \
.option("subscribe", "yourtopic") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.ssl.truststore.location", "/path/to/keystore.jks") \
.option("kafka.ssl.truststore.password", "mypassword") \
.option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="yourusername" password="mypassword";') \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start() \
query.awaitTermination() Note, in the above code, we have specified our keystore location in our option (kafka.ssl.truststore.location), and our keystore password in the kafka.ssl.truststore.password option. The password we provide here is the password that we provided for our keystore at the time of its creation. Note: We have specified our workload username and password in the "kafka.sasl.jaas.config" option. 5. Kinit as a user with permissions to the Kafka topic kinit username 6. Submit the spark job To run the job on a local machine spark-submit
--master=local
./script.py To run the job on the YARN cluster spark-submit --master=yarn \
--files "mykeystore.jks" \
--executor-cores 1 \
./script.py 7. Validate that the Job is running successfully From the CDP Home Page, navigate to Data Hub Clusters > (Drill down to the Data Engineering Data Hub) > Resource Manager > Applications > (Drill down to the stdout logs for your Job)
... View more
01-17-2020
07:48 PM
In this article we’ll take an end to end look at the execution of a hive query. We will focus on the query execution in the context of both Hive on Tez, and Hive on Map Reduce. We will assume that our data includes customer data, so we will want to apply granular access control down to the column level, to secure PCI, and PII data. This means we will be using Hive in conjunction with Apache Ranger (the standard open source Apache project for securing data in Hive). Lets begin: 1) Client issues the SQL Query The query will be issued from a client such as a JDBC client, or Beeline. JDBC clients might include any of the standard BI tools such as Tableau. Beeline is the supported CLI for Hive 2 and 3, and it will also use a JDBC connection to connect to the remote Hive Server. The JDBC URL for these connections can be easily retrieved in CDP-Data Warehouse dashboard, as shown in the dashboard below: For, HDP, this URL is available from within Ambari, the Hive service summary, shown below. 2) Query Parsing and Access Control Once the query has been issued by the client, and received by Hive Server 2 (HS2), it will be parsed and validated against ranger policies to enforce access control, based on the user’s authorization to the data queried. For this, HS2 references the Hive Ranger Plugin (hosted on the HS2 server) to determine whether the authenticated user has permissions to the columns, tables etc. referenced in the query 1) If the user does not have permissions to some of the columns, etc., the query is rewritten to strip the unauthorized resources, and preserve the authorized resources. For example, If an authenticated user ran the simple query: SELECT FirstName, LastName, SSN, CC FROM customer And they were unauthorized to access the SSN and CC columns, their query would be be rewritten as SELECT FirstName, LastName, FROM customer If the user has permissions for all the columns, there is no need to rewrite the query. 3) Query Planning and Cost Based Optimization Once the query is parsed, a logical query plan is generated, for use by the query execution engine (in this case, the engine is either Tez or the traditional Map Reduce engine). Hive performs both logical and physical optimizations, including partition pruning, projection pruning, and predicate pushdown. Hive uses Apache Calcite, to provide Cost Based Optimizations (CBO) Cost Based Optimization (CBO) Query performance can be improved through the use of Cost Based Optimization (CBO) techniques, including join reordering. Knowing the number of rows in each table, and the cardinality of columns, the optimizer can order joins in a way to minimize the total number of rows joined. The simplest example of this is to first run tasks that produce the smallest number of rows on which a join will be performed. This can reduce the number of Tez (map/reduce) tasks that need to be spawned for a given query. In order for Cost Based Optimization (Calcite) to be used, column level stats should be enabled in Hive. This can be achieved through the ANALYZE TABLE statement for existing tables, and by setting three configuration parameters in Hive (detailed below): hive.stats.autogather
for collection of table-level statistics.
hive.stats.fetch.column.stats
for collection of column-level statistics
hive.compute.query.using.stats
to use statistics when generating query plans Further details on the configuration can be found here: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/performance-tuning/content/hive_set_up_cbo_and_stats.html IN CDP Public Cloud, CBO is enabled by default, and the above configurations are configured as true (with the option of disabling). Column level stats are stored in the HMS (Hive meta store) which is is backed by RDBMS such as MySQL or PostgreSQL. (Previous HBase backed Metastore support is discontinued in Hive 3). Once a query plan has been generated the plan is delegated to Tez (or the traditional Map Reduce execution engine) for execution. 4) Query Coordination with Map Reduce, or Tez: Map Reduce and Tez are distributed execution frameworks created for Hive (and other projects), which can take a query plan, and coordinate the execution of that plan across different (map reduce) tasks. These tasks can be represented as a directed acyclic graph (DAG), as shown below. The query coordination is performed by the application master (AM), and this is the case both for queries executed with the traditional Hive map reduce engine, and with the Tez engine. Predicate (and projection) pushdown: Once a query plan is generated, that plan is delegated to the Application Master (AM), for coordination of execution. Based on the query plan, the AM must first identify the folders in which the underlying ORC files are, relating to the query. Any predicates in the query on the partitioned column, will limit the partitions, and therefore folders which Tez needs to read from. For the remaining partitions, the AM, must next request (from NameNode), an enumeration of the ORC files in those folders, and the block locations for the footers of those ORC files. ORC footers contain file and stripe level statistics which the AM can use to determine which stripes, need to be read by mappers for each ORC file. Min, Max, Null, statistics, and bloom filters can be used to eliminate unnecessary stripe reads, based on predicates. This is the first mechanism by which predicate pushdown is performed. Further elimination of disk IO can be achieved by eliminating the read of unnecessary columns. For this, the SELECT clause is used to determine which columns need to be read (projection pushdown). Predicate and projection pushdown improves query performance, by reducing disk scans and reducing the number of mappers required to execute a query. By avoiding full table scans, and avoiding the need to read all columns, single mapper can perform the task of many mappers. To provide a simple example, consider the query SELECT first_name last_name
WHERE
zip code = 94611,
and customer_segment='champion'
and lifestyle='active'
and household='empty_nester' run against a denormalized customer table with 20 columns. Applying projection pushdown (only reading the streams for first and last name) will reduce the disk reads from 20 columns (streams) to 2 columns, per stripe. Applying predicate pushdown with bloom filters, will reduce disk reads for all stripes for which there are no customers who meet all of the criteria of being in the 94611 zip code, and being empty nesters in the champion customer segment. If these predicates eliminates 50% of the stripe reads, then only half the mappers are required. The combination of projection and predicate pushdown, will allow a single mapper to perform the work of 10 mappers. The impact of predicate and projection pushdown is significant both for standard HDD’s as well as for NVMe’s, in the traditional model of local storage. This raises the question as to how predicate and projection pushdown work in the context of cloud based object storage. In this instance because Tez can apply offsets to file reads in storage layers such as S3, predicate pushdown is possible for these storage layers also. To summarize, it is the Tez AM, which performs the first step in predicate pushdown, by determining which stripes must be read, based on ORC file footer statistics. Based on this, the Tez AM delegates the stripe reads to mapper tasks. More detail on the ORC file format can be found at the links below: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC https://orc.apache.org/docs/indexes.html The Mappers: Mappers are responsible for executing the stripe reads. They retrieve the stripe footer for each of the stripes they will read, which contains the offsets for the column data they must read in each of the stripes. Column data is stored as streams in ORC file format, where one column (e.g. for first name) will be written first in the stripe, followed by the next column, etc. In uncompressed format this might look like: Stripe Index (Header)
-----------------------------
Andrew, John, Sandy, Steve
Miller, Smith, White, Carpenter
------------------------------
Stripe Footer In reality there are multiple streams per column, both for the indexes and the data. These differ according to data type, but generally include at a minimum a stream indicating whether a value is present, and its value. At the index, a potential bloom filter stream, can also be present. Leveraging index streams can allow further predicate pushdown, at the level of the mappers. Statistics data is also present for strides (groupings of 10,000 rows), which further offers additional predicate pushdown. Finally, it's worth bearing in mind, that column data is compressed, based on the data type of the column, which further reduces the IO overhead. Tez vs. Map Reduce Tez offers performance gains over traditional Hive on Map Reduce, through the elimination of disk IO between tasks in the query execution DAG. With the traditional Map Reduce execution engine, the output of each task is written to storage, and then read from storage by downstream tasks. Tez avoids this by facilitating access to shared memory across tasks, including RDMA (where relevant). Shared Memory: The ability to share memory between tasks eliminates improves data throughput by several orders of magnitude for tasks located on the same node. Current DDR4 memory can transfer data at 90 GB/s, vs even locally attached storage such as local NVMe’s at 3.6GB/s and HDD’s at 120MB/s. Tez DAG tasks will be distributed across many nodes in a cluster, so data throughput gains, are moderated because not all tasks requiring shared memory will be running on the same nodes. This is partially mitigated by support for RDMA. Additional optimizations in Tez: Tez further optimizes query execution by defining the DAG at runtime. The DAG is generated based on block locations, and the location of YARN containers allocated by Resource Manager. Finally, Tez makes possible the reuse of containers, across tasks and queries, avoiding the overhead of container spin up. There are many additional optimizations performed by Tez, the details for which can be found here: https://web.eecs.umich.edu/~mosharaf/Readings/Tez.pdf
... View more
11-14-2018
05:02 AM
2 Kudos
In part 1 of, we looked at how we could use deep learning to classify Melanoma, using transfer learning with a pre-trained VGG16 convolutional neural network. Here we take a look at how to implement this using TensorFlow and Keras. First we import the packages we need: import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Dropout, Flatten, Dense
from keras import applications These packages include, the network layers we will be using, the applications (which includes the pretrained VGG16 model), and some image pre-processing utilities. Next, we “strip” our model of it’s top layers. This is as simple as one line of code in Keras. base_model = applications.VGG16(include_top=False,
weights='imagenet') Next we take our images of skin, and we transform them using Keras image pre-processing utility: datagen = ImageDataGenerator(
rotation_range=40,
width_shift_range=0.2,
height_shift_range=0.2,
rescale=1./255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
fill_mode='nearest') This will provide us with extra variation in our training image data-set, and will help prevent overfitting in our model. Next, we feed our processed images through our “stripped” VGG16 network, and generate a feature set. generator = datagen.flow_from_directory(
validation_data_dir,
target_size=(img_width, img_height),
batch_size=batch_size,
class_mode=None,
shuffle=False)
bottleneck_features_validation = model.predict_generator(
generator, nb_validation_samples // batch_size) Next we take build a fully connected classifier network. top_model = Sequential()
top_model.add(Flatten(input_shape=train_data.shape[1:]))
top_model.add(Dense(256, activation='relu'))
top_model.add(Dropout(0.5))
top_model.add(Dense(1, activation='sigmoid'))
top_model.compile(optimizer='rmsprop',
loss='binary_crossentropy', metrics=['accuracy']) We train this classifier by feeding the feature set generated from our stripped VGG16 network through it. We’ll also save the weights of the trained classifier, once we’ve trained it. top_model.fit(train_data, train_labels,
epochs=epochs,
batch_size=batch_size,
validation_data=(validation_data, validation_labels))
top_model.save_weights(top_model_weights_path) Finally, we plug our classifier model back to our stripped VGG16 model, and we re-train the convolutional weights of the VGG16 network. model = Model(inputs=base_model.input, outputs=top_model(base_model.output))
model.compile(loss='binary_crossentropy',
optimizer=optimizers.SGD(lr=1e-4, momentum=0.9),
metrics=['accuracy'])
model.fit_generator(
train_generator,
steps_per_epoch=nb_train_samples // batch_size,
epochs=epochs,
validation_data=validation_generator,
validation_steps=nb_validation_samples // batch_size,
verbose=2)
In this case, our train_generator, and validation_generator’s are created in the same way we created the generator for our classifier. You can see the full example, and build this for yourself on github, at: https://github.com/hortonworks-sk/HDP-3.0-classifying-melanoma
... View more
Labels:
10-23-2018
07:54 PM
Very cool! Great diagrams too 🙂
... View more
07-02-2018
11:52 AM
1 Kudo
@Steve Kiaie Yes, it should be checked. If not, I believe you can add it later on the server of your choice.
... View more
07-06-2018
08:37 PM
Thanks for the detailed response Geoffrey. This is very helpful.
... View more