Member since
06-02-2020
331
Posts
67
Kudos Received
49
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2801 | 07-11-2024 01:55 AM | |
| 7866 | 07-09-2024 11:18 PM | |
| 6576 | 07-09-2024 04:26 AM | |
| 5912 | 07-09-2024 03:38 AM | |
| 5614 | 06-05-2024 02:03 AM |
07-11-2023
11:03 PM
2 Kudos
Spark3 Kudu Integration
This blog post will guide you through the process of integrating Spark 3 with Kudu, providing you with valuable insights and step-by-step instructions.
Apache Kudu
Kudu is a distributed columnar storage engine optimized for OLAP workloads. Kudu runs on commodity hardware, is horizontally scalable, and supports highly available operations.
Kudu Integration with Spark
Apache Kudu can be integrated with Apache Spark using the built-in Spark-SQL-Kudu library. This integration allows us to read from and write to Kudu tables directly from Spark applications using Spark SQL.
Kudu integrates with Spark through the Data Source API as of version 1.0.0. To integrate Kudu with Spark3, we need to use kudu-spark3_2.12 library.
Below are the step-by-step instructions to seamlessly integrate Kudu with Spark3:
Impala/Kudu:
Step 1: Launch the impala-shell
Go to the Cloudera Manager > Impala > Status > Copy the Impala Shell Command and run the command from the shell.
For example,
impala-shell -i node1 -d default -k --ssl --ca_cert=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_cacerts.pem
Step 2: Create the Kudu table.
CREATE TABLE employees
(
id BIGINT,
name STRING,
age SMALLINT,
SALARY FLOAT,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
Step 3: Insert the data into the Kudu table
INSERT INTO employees VALUES (1, "Ranga", 28, 10000), (2, "Nishanth", 5, 40000), (3, "Meena", 30, 24000);
Step 4: Verify the data from the Kudu table
SELECT * FROM employees;
Step 5: Quit the Impala shell
quit;
Spark:
Pre-requisites:
Verify the user has proper permission(s) to access the table/database to avoid the following exception: org.apache.kudu.client.NonRecoverableException: Unauthorized action
at org.apache.kudu.client.KuduException.transformException(KuduException.java:110)
at org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:470)
at org.apache.kudu.client.KuduClient.openTable(KuduClient.java:288)
at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:327)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
To provide right permission to the user go to Ranger UI > Access Manager > KUDU and click on cm_kudu > Use existing policy or create a new policy and provide required permission.
You need to provide the exact number of kudu master values else similar f exception(s) will occur.
To collect the kudu master values, go to the Cloudera Manager > Kudu > Instances > Copy the Hostname whose Role Type is Master. 23/05/15 12:59:38 WARN client.ConnectToCluster: [kudu-nio-0]: Could not connect to a leader master. Client configured with 1 master(s) (node1:7051) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Could not connect to a leader master. Client configured with 1 master(s) (node1) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.kudu.spark.kudu.KuduContext.<init>(KuduContext.scala:169)
at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:325)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
... 47 elided
Step 6: Launch the spark3-shell by passing kudu-spark3 jar.
There are two ways to pass the kudu-spark3 connector jar to spark3-shell:
Using Jars: spark3-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
Using Packages: spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
You can use any of the above options.
For <kudu-cdp-version>, check Cloudera Runtime component versions in Release Notes.
Step 7: Run the spark code to retrieve data from kudu.
Note: Before running the following code, you must replace the kudu.master value collected from prerequisite.
val kudu_table = "default.employees"
val kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
// Load the data from kudu
val df = spark.read.options(Map("kudu.master" -> kudu_master, "kudu.table" -> kudu_table)).format("kudu").load()
// Display the data
df.show()
// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(4L, "Employee6", 56, 1500.5f),
Employee(5L, "Employee7", 30, 15000.5f)
).toDF()
// Save the data to kudu
employeeDF.write.options(Map("kudu.master"-> kudu_master, "kudu.table"-> kudu_table)).mode("append").format("kudu").save()
Note: After running the above code if you get any error like table not found, then you need to append impala:: to the kudu_table. For example val kudu_table = "impala::default.employees"
PySpark Code:
filename: kudu_pyspark3_example.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kudu Integration with Spark3").getOrCreate()
kudu_table = "default.employees"
kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
df=spark.read.option('kudu.master', kudu_master).option('kudu.table', kudu_table).format("kudu").load()
df.show()
spark.stop()
Using Spark with a Secure Kudu Cluster
The Kudu Spark integration is able to operate on secure Kudu clusters which have authentication and encryption enabled, but the submitter of the Spark job must provide the proper credentials.
Client mode:
To submit the Spark application in client mode, the submitting user must have an active Kerberos ticket granted through kinit.
kinit spark3-submit \
--master yarn \
--deploy-mode client \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
kudu_pyspark3_example.py
Note: In client mode, the user needs to be authenticated using kinit only. If you specify keytab and principal, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Cluster mode:
To submit the Spark application in cluster mode, the Kerberos principal name and keytab location must be provided through the --principal and --keytab arguments to spark3-submit.
spark3-submit \
--master yarn \
--deploy-mode cluster \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
--principal user/node1.hadoop.com@HADOOP.COM \
--keytab user.keytab \
kudu_pyspark3_example.py
Note: In cluster mode, the user needs to be authenticated using keytab and principal only. If we do authentication using kinit, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Resources
1. Developing Applications With Apache Kudu
2. Kudu integration with Spark
... View more
06-29-2023
08:41 PM
Hi @hightek2699 Don't install pyspark manually using pip install command. Use the cloudera provided pyspark.
... View more
06-14-2023
01:25 AM
1 Kudo
Hi @cirrus You can find the following optimize code. /tmp/test_pyspark.py from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math
spark = SparkSession.builder \
.appName('Test App') \
.getOrCreate()
num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))
data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))
start_time = datetime.now()
data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
end_time = datetime.now()
delta = end_time - start_time
# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")
start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time
# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")
spark.stop() Spark-submit command: spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py
... View more
06-14-2023
01:06 AM
Fun fact for those interested: In order to have 8 cores running you need in this example minimum 64m as xss options. If you chose 32m, then it will not give a stackoverflow error, but only 4 cores will be running 😲
... View more
05-25-2023
09:37 PM
In this blog post, you can see multiple Spark3 Integration examples. Spark3 Installation Spark3 SparkPi Example - Coming soon Spark3 Streaming with Kafka Example - Coming soon Spark3 Structured Streaming with Kafka Example - Coming soon Spark3 HBase Connector Example Spark3 Phoenix Connector Example - Coming soon Spark3 Hive Warehouse Connector Example - Coming soon Spark3 Kudu Example Spark3 Ozone Example - Coming soon Spark3 Iceberg Example - Coming soon Spark3 Atals Example- Coming soon Spark3 CDE Example - Coming soon Spark3 GPUs Example - Coming soon Spark3 with Custom logging - Coming soon Spark3 Hue Integration Note: If you are looking any specific examples other than above you can comment here.
... View more
05-19-2023
01:50 AM
Hi @sonnh You can go ahead the raise the hdfs case by uploading all required logs like NM logs and DN logs.
... View more
05-16-2023
02:08 AM
Hi @Paarth Spark HBase Connector (SHC) is not supported in CDP. You need to use HBase Spark Connector to access the HBase data using Spark. You can find the sample reference: https://docs.cloudera.com/runtime/7.2.10/managing-hbase/topics/hbase-using-hbase-spark-connector.html
... View more
04-25-2023
02:31 AM
2 Kudos
Dynamic Allocation in Apache Spark
1. Introduction
In Apache Spark, resource allocation is a critical aspect of optimizing the performance of Spark applications.
Spark provides two mechanisms to allocate resources:
Static Resource Allocation (SRA)
Dynamic Resource Allocation (DRA)
1. Static Resource Allocation
In static resource allocation, the resources are pre-allocated to the Spark application before it starts running. The amount of resources is fixed and cannot be changed during runtime. It means that if the Spark application requires more resources than what was allocated, it will result in longer execution times or even failure of the job.
Static resource allocation is suitable for scenarios where the resource requirements of the Spark application are known in advance and the workload is consistent throughout the job.
Disadvantages of Static Resource Allocation
Inefficient Resource Utilization: Static resource allocation mode may lead to inefficient resource utilization if the allocated resources are not fully utilized by the Spark application. This can result in idle resources and suboptimal performance.
Limited Flexibility: Static allocation mode does not allow the Spark application to adjust the allocated resources during runtime based on the workload. This can result in out-of-memory errors or insufficient resources when the workload increases.
2. Dynamic Resource Allocation
In dynamic resource allocation, the resources are allocated to the Spark application on an as-needed basis during runtime. The allocation is adjusted dynamically based on the workload and usage patterns of the application. This allows for better resource utilization and can help avoid underutilization or overutilization of resources.
Dynamic resource allocation is suitable for scenarios where the resource requirements of the Spark application are unknown or variable, and the workload is not consistent throughout the job.
Dynamic Resource Allocation (DRA)
Dynamic allocation is a feature in Apache Spark that allows for automatic adjustment of the number of executors allocated to an application. This feature is particularly useful for applications that have varying workloads and need to scale up or down depending on the amount of data being processed. It can help optimize the use of cluster resources and improve application performance.
When dynamic allocation is enabled, Spark can dynamically allocate and deallocate executor nodes based on the application workload. If the workload increases, Spark can automatically allocate additional executor nodes to handle the additional load. Similarly, if the workload decreases, Spark can deallocate executor nodes to free up resources.
Note:
In the Cloudera Data Platform (CDP), dynamic allocation is enabled by default.
Dynamic allocation is available for all the supported cluster managers i.e. Spark Standalone, Hadoop YARN, Apache Mesos, and Kubernetes.
Advantages of Dynamic Allocation:
Resource efficiency: Dynamic allocation enables Spark to allocate resources (CPU, memory, etc.) to the application based on the actual workload, which can help reduce the waste of resources and improve the overall efficiency of the cluster.
Scalability: Dynamic allocation allows Spark to scale up or down the number of executors allocated to an application depending on the workload. This enables the application to handle spikes in traffic or data volumes without requiring manual intervention.
Cost savings: By efficiently allocating resources, dynamic allocation can help reduce the overall cost of running Spark applications.
Fairness: Dynamic allocation can help ensure that multiple applications running on the same cluster are allocated resources fairly based on their actual workload.
Disadvantages of Dynamic Allocation:
Overhead: Dynamic allocation requires Spark to continuously monitor the workload and adjust the allocation of resources accordingly, which can create additional overhead and potentially impact the performance of the application.
Latency: Because dynamic allocation involves monitoring and adjusting the allocation of resources, there may be a latency cost associated with switching the number of executors allocated to an application.
Configuration complexity: Enabling dynamic allocation requires configuring several properties in Spark, which can increase the complexity of managing and deploying Spark applications.
Unpredictability: Dynamic allocation can be unpredictable, and the number of executors allocated to an application may change frequently, which can make it difficult to monitor and optimize the application's performance.
Increased network traffic: Dynamic allocation can increase network traffic in the cluster, as Spark needs to communicate with the Node Manager to request and release resources. This can result in increased overhead and impact the overall performance of the application.
Yarn Node Manager Auxiliary Spark Shuffle Service Overload: When using Spark Dynamic Resource Allocation, the Yarn Node Manager Auxiliary Service responsible for handling the shuffle process can become overloaded if the allocated resources are insufficient for the Spark application's workload. This can result in reduced performance and instability in the cluster. It is essential to carefully monitor the resource allocation and adjust the configuration parameters to ensure that the Yarn Node Manager Auxiliary Spark Shuffle Service has enough resources to handle the workload efficiently.
Resource Allocation Policy of scaling executors up and down:
Dynamic Allocation comes with the policy of scaling executors up and down as follows:
a) Scale-Up Policy
Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and the Spark application may need slightly more. The actual request is triggered when there have been pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, and then triggered again every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds thereafter if the queue of pending tasks persists. Additionally, the number of executors requested in each round increases exponentially from the previous round. For instance, an application will add 1 executor in the first round, and then 2, 4, 8 and so on executors in the subsequent rounds.
b) Scale Down Policy
Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds. Note that, under most circumstances, this condition is mutually exclusive with the request condition, in that an executor should not be idle if there are still pending tasks to be scheduled.
Different kinds of Spark Applications:
There are several kinds of Spark applications that can be developed and run on a Spark cluster.
Batch Applications
Streaming Applications
Structured Streaming Applications
Let's discuss how to enable dynamic allocation for each kind of application.
1. Enable Dynamic Allocation for Spark Batch Applications
To enable dynamic allocation for Spark batch applications, you need to use the following configuration properties:
Property Name
Default Value
Description
spark.shuffle.service.enabled
false
Enables the external shuffle service.
spark.dynamicAllocation.enabled
false
Set this to true to enable dynamic allocation.
spark.dynamicAllocation.minExecutors
0
Set this to the minimum number of executors that should be allocated to the application.
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.minExecutors
The initial number of executors to run if dynamic allocation is enabled. If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will be used as the initial number of executors.
spark.dynamicAllocation.maxExecutors
infinity
Set this to the maximum number of executors that should be allocated to the application.
In addition to the above parameters, there are some additional parameters available. In most of the scenarios, the additional parameter default values are sufficient.
spark-submit command:
spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=0 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20
Note: In the above spark-submit command, the minimum number of executors is 0 and the maximum number of executors is 20. Based on requirement we need to adjust the above parameter values.
2. Enable Dynamic Allocation for Spark Streaming Applications
To enable the dynamic allocation for streaming applications, mainly we need to use the following parameters.
Parameter
Default Value
Description
spark.streaming.dynamicAllocation.enabled
false
Set this to true to enable dynamic allocation for Spark Streaming applications.
spark.streaming.dynamicAllocation.minExecutors
Set this to the minimum number of executors that should be allocated to the application. Minimum Executors value must be > 0.
spark.streaming.dynamicAllocation.maxExecutors
infinity
Set this to the maximum number of executors that should be allocated to the application.
Note: Dynamic Allocation cannot be enabled for both streaming and batch at the same time. When we are running streaming applications first we need to disable the batch dynamic allocation.
spark-submit command:
spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.dynamicAllocation.enabled=true \
--conf spark.streaming.dynamicAllocation.minExecutors=1 \
--conf spark.streaming.dynamicAllocation.maxExecutors=20
Note: In the above Spark-submit command, the minimum number of executors is 1, and the maximum executors is 20.
3. Enable Dynamic Allocation for Spark Structured Streaming Applications
Unlike Spark Streaming, Spark Structured Streaming does not have separate spark parameters to enable the dynamic allocation and we need to use the Batch parameter to enable the Dynamic Allocation. There are an open SPARK-24815 Jira to pass the separate parameters for Spark Structured Streaming.
spark-submit command:
spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=0 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20
Note: For Structured Streaming applications, using batch allocation parameters are not efficient way to allocate the resources because it is not designed for streaming job patterns and works poorly for few types of applications.
References:
https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
https://docs.cloudera.com/runtime/latest/running-spark-applications/topics/spark-yarn-dynamic-allocation.html
https://issues.apache.org/jira/browse/SPARK-12133
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L62
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala
https://issues.apache.org/jira/browse/SPARK-24815
https://issues.apache.org/jira/browse/SPARK-3174
https://issues.apache.org/jira/browse/SPARK-4922
https://www.slideshare.net/databricks/dynamic-allocation-in-spark
... View more
Labels:
04-17-2023
08:30 AM
Hi @saivenkatg55 Could you please check where datalakedev host name is configured in your hadoop/hive configuration files? and also check you are able to ping the datalakedev hostname where you are running spark-sql command.
... View more
04-09-2023
09:49 PM
Hi @mrTao It is not good idea to use entire memory Cluster YARN memory. You can tune the memory from spark side by adjusting the memory using following parameters: --conf spark.executor.instances=5
--conf spark.driver.memory=10g
--conf spark.driver.memoryOverhead=1g
--conf spark.executor.memory=10g
--conf spark.executor.memoryOverhead=1g With the above memory configuration, YARN will allocate 66gb(Executor memory 11gb * Executor Instances(5) + Driver memory 11gb = 55g + 11g = 66g). Better check your spark-submit once again and tune the above parameters according to your requirement.
... View more