Member since
06-02-2020
331
Posts
64
Kudos Received
49
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
831 | 07-11-2024 01:55 AM | |
2299 | 07-09-2024 11:18 PM | |
2137 | 07-09-2024 04:26 AM | |
1580 | 07-09-2024 03:38 AM | |
1807 | 06-05-2024 02:03 AM |
05-25-2023
09:37 PM
In this blog post, you can see multiple Spark3 Integration examples. Spark3 Installation Spark3 SparkPi Example - Coming soon Spark3 Streaming with Kafka Example - Coming soon Spark3 Structured Streaming with Kafka Example - Coming soon Spark3 HBase Connector Example Spark3 Phoenix Connector Example - Coming soon Spark3 Hive Warehouse Connector Example - Coming soon Spark3 Kudu Example Spark3 Ozone Example - Coming soon Spark3 Iceberg Example - Coming soon Spark3 Atals Example- Coming soon Spark3 CDE Example - Coming soon Spark3 GPUs Example - Coming soon Spark3 with Custom logging - Coming soon Spark3 Hue Integration Note: If you are looking any specific examples other than above you can comment here.
... View more
05-19-2023
01:50 AM
Hi @sonnh You can go ahead the raise the hdfs case by uploading all required logs like NM logs and DN logs.
... View more
05-16-2023
02:08 AM
Hi @Paarth Spark HBase Connector (SHC) is not supported in CDP. You need to use HBase Spark Connector to access the HBase data using Spark. You can find the sample reference: https://docs.cloudera.com/runtime/7.2.10/managing-hbase/topics/hbase-using-hbase-spark-connector.html
... View more
05-16-2023
02:06 AM
Hi @sonnh Based on the following exception, its looks like due to datanode issues it is causing the issue. You can do one thing disable event log and submit the spark application. Still if you see the below exception better you can create a Cloudera case with hdfs component we will look into this issue. spark-submit --conf spark.eventLog.enabled=false 23/05/15 14:39:04 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[hidden.
11:50010,DS-3f249b69-b437-47ca-8433-b8305db6ea7f,DISK], DatanodeInfoWithStorage[hidden.10:50010,DS-db35488d-9ea2-45c5-938c-2f78b0b9ad5a,DISK]], original=[DatanodeInfoWithStorage[10.
210.11.11:50010,DS-3f249b69-b437-47ca-8433-b8305db6ea7f,DISK], DatanodeInfoWithStorage[hidden.10:50010,DS-db35488d-9ea2-45c5-938c-2f78b0b9ad5a,DISK]]). The current failed datanode r
eplacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
at org.apache.hadoop.hdfs.DataStreamer.findNewDatanode(DataStreamer.java:1304)
at org.apache.hadoop.hdfs.DataStreamer.addDatanode2ExistingPipeline(DataStreamer.java:1372)
at org.apache.hadoop.hdfs.DataStreamer.handleDatanodeReplacement(DataStreamer.java:1598)
at org.apache.hadoop.hdfs.DataStreamer.setupPipelineInternal(DataStreamer.java:1499)
at org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1481)
at org.apache.hadoop.hdfs.DataStreamer.processDatanodeOrExternalError(DataStreamer.java:1256)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:667)
... View more
05-14-2023
09:38 PM
Hi @ravitinku18 I will suggest to use 3rd and 4th approach. I will publish that article shortly.
... View more
05-08-2023
08:19 PM
Hi @davidebelvedere For Structured Streaming we need to implement in different way i.e using StreamingQueryListener. Part2 i will try to publish in May end.
... View more
04-26-2023
03:21 AM
Spark Python Integration Test Result Exceptions
In this article, just I talk about exceptions and their Python and Spark versions. Keep on watching this article where I will add some more exceptions and solutions.
1. TypeError: an integer is required (got type bytes)
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
File "<frozen zipimport>", line 259, in load_module
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 46, in <module>
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
File "<frozen zipimport>", line 259, in load_module
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 31, in <module>
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
File "<frozen zipimport>", line 259, in load_module
File "/opt/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
File "<frozen zipimport>", line 259, in load_module
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 146, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 127, in _make_cell_set_template_code
TypeError: an integer is required (got type bytes)
Python Versions:
Python 3.8
Python 3.9
Spark Versions:
Spark Version: 2.3.0
Spark Version: 2.3.1
Spark Version: 2.3.2
Spark Version: 2.3.3
Spark Version: 2.3.4
Spark Version: 2.4.0
Spark Version: 2.4.1
Spark Version: 2.4.2
Spark Version: 2.4.3
Spark Version: 2.4.4
Spark Version: 2.4.5
Spark Version: 2.4.6
Spark Version: 2.4.7
Spark Version: 2.4.8
2. TypeError: 'bytes' object cannot be interpreted as an integer
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 46, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 31, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 146, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 127, in _make_cell_set_template_code
TypeError: 'bytes' object cannot be interpreted as an integer
Python Versions:
Python 3.10
Spark Versions:
Spark Version: 2.3.0
Spark Version: 2.3.1
Spark Version: 2.3.2
Spark Version: 2.3.3
Spark Version: 2.3.4
Spark Version: 2.4.0
Spark Version: 2.4.1
Spark Version: 2.4.2
Spark Version: 2.4.3
Spark Version: 2.4.4
Spark Version: 2.4.5
Spark Version: 2.4.6
Spark Version: 2.4.7
Spark Version: 2.4.8
3. TypeError: code expected at least 16 arguments, got 15
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 46, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 31, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 146, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 127, in _make_cell_set_template_code
TypeError: code expected at least 16 arguments, got 15
Python Versions:
Python 3.11
Spark Versions:
Spark Version: 2.3.0
Spark Version: 2.3.1
Spark Version: 2.3.2
Spark Version: 2.3.3
Spark Version: 2.3.4
Spark Version: 2.4.0
Spark Version: 2.4.1
Spark Version: 2.4.2
Spark Version: 2.4.3
Spark Version: 2.4.4
Spark Version: 2.4.5
Spark Version: 2.4.6
Spark Version: 2.4.7
Spark Version: 2.4.8
4. TypeError: code() argument 13 must be str, not int
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 51, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 30, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/accumulators.py", line 97, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 71, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 209, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 172, in _make_cell_set_template_code
TypeError: code() argument 13 must be str, not int
Python Versions:
Python 3.11
Spark Versions:
Spark Version: 3.0.0
Spark Version: 3.0.1
Spark Version: 3.0.3
Spark Version: 3.0.3
5. SyntaxError: invalid syntax
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 53, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 34, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 31, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/find_spark_home.py", line 68
print("Could not find valid SPARK_HOME while searching {0}".format(paths), file=sys.stderr)
^
SyntaxError: invalid syntax
Python Versions:
Python 2.7
Spark Versions:
Spark Version: 3.1.1
Spark Version: 3.1.2
Spark Version: 3.1.3
Spark Version: 3.2.0
Spark Version: 3.2.1
Spark Version: 3.2.2
Spark Version: 3.2.3
6. ImportError: No module named 'typing'
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 53, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 34, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line 32, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 67, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle/__init__.py", line 4, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/cloudpickle/cloudpickle.py", line 54, in <module>
ImportError: No module named 'typing'
Python Versions:
Python 3.4
Spark Versions:
Spark Version: 3.1.1
Spark Version: 3.1.2
Spark Version: 3.1.3
Spark Version: 3.2.0
Spark Version: 3.2.1
Spark Version: 3.2.2
Spark Version: 3.2.3
Spark Version: 3.3.0
Spark Version: 3.3.1
7. AttributeError: 'NoneType' object has no attribute 'items'
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "<frozen importlib._bootstrap>", line 968, in _find_and_load
File "<frozen importlib._bootstrap>", line 957, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 664, in _load_unlocked
File "<frozen importlib._bootstrap>", line 634, in _load_backward_compatible
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 53, in <module>
File "<frozen importlib._bootstrap>", line 968, in _find_and_load
File "<frozen importlib._bootstrap>", line 957, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 664, in _load_unlocked
File "<frozen importlib._bootstrap>", line 634, in _load_backward_compatible
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 48, in <module>
File "<frozen importlib._bootstrap>", line 968, in _find_and_load
File "<frozen importlib._bootstrap>", line 957, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 664, in _load_unlocked
File "<frozen importlib._bootstrap>", line 634, in _load_backward_compatible
File "/opt/spark/python/lib/pyspark.zip/pyspark/traceback_utils.py", line 23, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in namedtuple
AttributeError: 'NoneType' object has no attribute 'items'
Python Versions:
Python 3.5
Spark Versions:
Spark Version: 3.1.1
Spark Version: 3.1.2
Spark Version: 3.1.3
Spark Version: 3.2.0
Spark Version: 3.2.1
Spark Version: 3.2.2
Spark Version: 3.2.3
Spark Version: 3.3.2
Spark Version: 3.4.0
8. SyntaxError: invalid syntax
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 71
def since(version: Union[str, float]) -> Callable[[F], F]:
^
SyntaxError: invalid syntax
Python Versions:
Python 2.7
Spark Versions:
Spark Version: 3.3.0
Spark Version: 3.3.1
Spark Version: 3.3.2
Spark Version: 3.4.0
9. SyntaxError: invalid syntax
Exception:
Traceback (most recent call last):
File "/opt/pyspark_udf_example.py", line 3, in <module>
from pyspark.sql import SparkSession
File "<frozen importlib._bootstrap>", line 968, in _find_and_load
File "<frozen importlib._bootstrap>", line 957, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 664, in _load_unlocked
File "<frozen importlib._bootstrap>", line 634, in _load_backward_compatible
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 53, in <module>
File "<frozen importlib._bootstrap>", line 968, in _find_and_load
File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 896, in _find_spec
File "<frozen importlib._bootstrap_external>", line 1171, in find_spec
File "<frozen importlib._bootstrap_external>", line 1147, in _get_spec
File "<frozen importlib._bootstrap_external>", line 1128, in _legacy_get_spec
File "<frozen importlib._bootstrap>", line 444, in spec_from_loader
File "<frozen importlib._bootstrap_external>", line 565, in spec_from_file_location
File "/opt/spark/python/lib/pyspark.zip/pyspark/conf.py", line 110
_jconf: Optional[JavaObject]
^
SyntaxError: invalid syntax
Python Versions:
Python 3.5
Spark Versions:
Spark Version: 3.3.0
Spark Version: 3.3.1
Spark Version: 3.3.2
Spark Version: 3.4.0
Note: The above all exceptions occurred while testing the pyspark code (sample udf) example with different Python versions.
... View more
Labels:
04-25-2023
02:31 AM
2 Kudos
Dynamic Allocation in Apache Spark
1. Introduction
In Apache Spark, resource allocation is a critical aspect of optimizing the performance of Spark applications.
Spark provides two mechanisms to allocate resources:
Static Resource Allocation (SRA)
Dynamic Resource Allocation (DRA)
1. Static Resource Allocation
In static resource allocation, the resources are pre-allocated to the Spark application before it starts running. The amount of resources is fixed and cannot be changed during runtime. It means that if the Spark application requires more resources than what was allocated, it will result in longer execution times or even failure of the job.
Static resource allocation is suitable for scenarios where the resource requirements of the Spark application are known in advance and the workload is consistent throughout the job.
Disadvantages of Static Resource Allocation
Inefficient Resource Utilization: Static resource allocation mode may lead to inefficient resource utilization if the allocated resources are not fully utilized by the Spark application. This can result in idle resources and suboptimal performance.
Limited Flexibility: Static allocation mode does not allow the Spark application to adjust the allocated resources during runtime based on the workload. This can result in out-of-memory errors or insufficient resources when the workload increases.
2. Dynamic Resource Allocation
In dynamic resource allocation, the resources are allocated to the Spark application on an as-needed basis during runtime. The allocation is adjusted dynamically based on the workload and usage patterns of the application. This allows for better resource utilization and can help avoid underutilization or overutilization of resources.
Dynamic resource allocation is suitable for scenarios where the resource requirements of the Spark application are unknown or variable, and the workload is not consistent throughout the job.
Dynamic Resource Allocation (DRA)
Dynamic allocation is a feature in Apache Spark that allows for automatic adjustment of the number of executors allocated to an application. This feature is particularly useful for applications that have varying workloads and need to scale up or down depending on the amount of data being processed. It can help optimize the use of cluster resources and improve application performance.
When dynamic allocation is enabled, Spark can dynamically allocate and deallocate executor nodes based on the application workload. If the workload increases, Spark can automatically allocate additional executor nodes to handle the additional load. Similarly, if the workload decreases, Spark can deallocate executor nodes to free up resources.
Note:
In the Cloudera Data Platform (CDP), dynamic allocation is enabled by default.
Dynamic allocation is available for all the supported cluster managers i.e. Spark Standalone, Hadoop YARN, Apache Mesos, and Kubernetes.
Advantages of Dynamic Allocation:
Resource efficiency: Dynamic allocation enables Spark to allocate resources (CPU, memory, etc.) to the application based on the actual workload, which can help reduce the waste of resources and improve the overall efficiency of the cluster.
Scalability: Dynamic allocation allows Spark to scale up or down the number of executors allocated to an application depending on the workload. This enables the application to handle spikes in traffic or data volumes without requiring manual intervention.
Cost savings: By efficiently allocating resources, dynamic allocation can help reduce the overall cost of running Spark applications.
Fairness: Dynamic allocation can help ensure that multiple applications running on the same cluster are allocated resources fairly based on their actual workload.
Disadvantages of Dynamic Allocation:
Overhead: Dynamic allocation requires Spark to continuously monitor the workload and adjust the allocation of resources accordingly, which can create additional overhead and potentially impact the performance of the application.
Latency: Because dynamic allocation involves monitoring and adjusting the allocation of resources, there may be a latency cost associated with switching the number of executors allocated to an application.
Configuration complexity: Enabling dynamic allocation requires configuring several properties in Spark, which can increase the complexity of managing and deploying Spark applications.
Unpredictability: Dynamic allocation can be unpredictable, and the number of executors allocated to an application may change frequently, which can make it difficult to monitor and optimize the application's performance.
Increased network traffic: Dynamic allocation can increase network traffic in the cluster, as Spark needs to communicate with the Node Manager to request and release resources. This can result in increased overhead and impact the overall performance of the application.
Yarn Node Manager Auxiliary Spark Shuffle Service Overload: When using Spark Dynamic Resource Allocation, the Yarn Node Manager Auxiliary Service responsible for handling the shuffle process can become overloaded if the allocated resources are insufficient for the Spark application's workload. This can result in reduced performance and instability in the cluster. It is essential to carefully monitor the resource allocation and adjust the configuration parameters to ensure that the Yarn Node Manager Auxiliary Spark Shuffle Service has enough resources to handle the workload efficiently.
Resource Allocation Policy of scaling executors up and down:
Dynamic Allocation comes with the policy of scaling executors up and down as follows:
a) Scale-Up Policy
Scale Up Policy requests new executors when there are pending tasks and increases the number of executors exponentially since executors start slow and the Spark application may need slightly more. The actual request is triggered when there have been pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, and then triggered again every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds thereafter if the queue of pending tasks persists. Additionally, the number of executors requested in each round increases exponentially from the previous round. For instance, an application will add 1 executor in the first round, and then 2, 4, 8 and so on executors in the subsequent rounds.
b) Scale Down Policy
Scale Down Policy removes executors that have been idle for spark.dynamicAllocation.executorIdleTimeout seconds. Note that, under most circumstances, this condition is mutually exclusive with the request condition, in that an executor should not be idle if there are still pending tasks to be scheduled.
Different kinds of Spark Applications:
There are several kinds of Spark applications that can be developed and run on a Spark cluster.
Batch Applications
Streaming Applications
Structured Streaming Applications
Let's discuss how to enable dynamic allocation for each kind of application.
1. Enable Dynamic Allocation for Spark Batch Applications
To enable dynamic allocation for Spark batch applications, you need to use the following configuration properties:
Property Name
Default Value
Description
spark.shuffle.service.enabled
false
Enables the external shuffle service.
spark.dynamicAllocation.enabled
false
Set this to true to enable dynamic allocation.
spark.dynamicAllocation.minExecutors
0
Set this to the minimum number of executors that should be allocated to the application.
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.minExecutors
The initial number of executors to run if dynamic allocation is enabled. If `--num-executors` (or `spark.executor.instances`) is set and larger than this value, it will be used as the initial number of executors.
spark.dynamicAllocation.maxExecutors
infinity
Set this to the maximum number of executors that should be allocated to the application.
In addition to the above parameters, there are some additional parameters available. In most of the scenarios, the additional parameter default values are sufficient.
spark-submit command:
spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=0 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20
Note: In the above spark-submit command, the minimum number of executors is 0 and the maximum number of executors is 20. Based on requirement we need to adjust the above parameter values.
2. Enable Dynamic Allocation for Spark Streaming Applications
To enable the dynamic allocation for streaming applications, mainly we need to use the following parameters.
Parameter
Default Value
Description
spark.streaming.dynamicAllocation.enabled
false
Set this to true to enable dynamic allocation for Spark Streaming applications.
spark.streaming.dynamicAllocation.minExecutors
Set this to the minimum number of executors that should be allocated to the application. Minimum Executors value must be > 0.
spark.streaming.dynamicAllocation.maxExecutors
infinity
Set this to the maximum number of executors that should be allocated to the application.
Note: Dynamic Allocation cannot be enabled for both streaming and batch at the same time. When we are running streaming applications first we need to disable the batch dynamic allocation.
spark-submit command:
spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.streaming.dynamicAllocation.enabled=true \
--conf spark.streaming.dynamicAllocation.minExecutors=1 \
--conf spark.streaming.dynamicAllocation.maxExecutors=20
Note: In the above Spark-submit command, the minimum number of executors is 1, and the maximum executors is 20.
3. Enable Dynamic Allocation for Spark Structured Streaming Applications
Unlike Spark Streaming, Spark Structured Streaming does not have separate spark parameters to enable the dynamic allocation and we need to use the Batch parameter to enable the Dynamic Allocation. There are an open SPARK-24815 Jira to pass the separate parameters for Spark Structured Streaming.
spark-submit command:
spark-submit \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=0 \
--conf spark.dynamicAllocation.initialExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20
Note: For Structured Streaming applications, using batch allocation parameters are not efficient way to allocate the resources because it is not designed for streaming job patterns and works poorly for few types of applications.
References:
https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation
https://docs.cloudera.com/runtime/latest/running-spark-applications/topics/spark-yarn-dynamic-allocation.html
https://issues.apache.org/jira/browse/SPARK-12133
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L62
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/Streaming.scala
https://issues.apache.org/jira/browse/SPARK-24815
https://issues.apache.org/jira/browse/SPARK-3174
https://issues.apache.org/jira/browse/SPARK-4922
https://www.slideshare.net/databricks/dynamic-allocation-in-spark
... View more
Labels:
04-24-2023
03:59 AM
5 Kudos
Spark Streaming Graceful shutdown Spark Streaming jobs are long-running jobs and their tasks need to be executed 7*24 hours, but there are cases like upgrading the code or updating the Spark configuration, where the program needs to be actively stopped. However, for distributed programs, there is no way to kill processes one by one. It is very important to shut down all configurations gracefully. There are two ways to shut down streaming applications: Non-graceful shutdown Graceful shutdown 1. Non-graceful shutdown A non-graceful shutdown is a violent shutdown process. Stopping Spark streaming violently may cause problems. For example, if your data source is Kafka, and a batch of data has been loaded into Spark streaming for processing. If it is forced to stop, consumption will be repeated or some data will be lost at the next startup. Disadvantages: Spark Streaming works based on the micro-batch mechanism, RDD is generated according to the interval time. If a violent shutdown is executed during the interval, it may cause data loss during this period or double calculation (assuming that the consumption data from Kafka has not been calculated yet, it is killed at this time, and double calculation or data loss may occur after the next program startup). Although a checkpoint mechanism is provided, it can be executed when the program starts recovery, but when there is a scene where the program changes, it must be deleted checkpoint, so there is a risk of loss. Non-graceful shutdown in two ways: kill -9 PID yarn application -kill applicationId The above command(s) will stop the streaming application but this could happen in the middle of the batch. Due to this, there is a possibility of data loss or data duplication. To solve this, we need to implement a Graceful Shutdown process to ensure that the Spark Streaming Application will only shut down between micro-batches, so we don’t lose any data. 2. Graceful shutdown A Graceful shutdown means completing the current data processing, stopping receiving new data, and terminating the application. It will ensure that Spark jobs shut down in a recoverable state and without any data loss or duplication. A graceful shutdown can improve application reliability since it guarantees the execution of pending tasks and reduces data loss that could be produced by the immediate context stop. Advantages of graceful shutdown: There are several advantages of using graceful shutdown in Spark Streaming: Prevent data loss: When a Spark Streaming application is shut down abruptly, it can lead to data loss as the application may not have processed all of the data in the current batch. Graceful shutdown ensures that the application completes the processing of the current batch before shutting down, reducing the risk of data loss. Reduce errors: Abrupt shutdown of a Spark Streaming application can result in errors such as lost partitions, incomplete batch processing, and other errors. Graceful shutdown ensures that the application stops processing new data in a controlled way, allowing it to complete its current processing and reduce the likelihood of errors. Better resource management: Graceful shutdown ensures that the Spark Streaming application releases all of its resources in a controlled manner, preventing resource leaks and allowing for better resource management. Improved performance: Graceful shutdown ensures that the Spark Streaming application completes its current batch processing before shutting down. This can lead to improved performance as the application has more time to optimize and complete its processing. Easier recovery: In case of errors or failures, a gracefully shut down Spark Streaming application can be recovered more easily than one that was abruptly shut down. The application can be restarted and continue processing from the last successfully processed batch, reducing the amount of data that needs to be reprocessed. In Spark Streaming, there are a few steps to follow for a graceful shutdown: Stop receiving data: The first step is to stop receiving data from the input sources. This can be done by calling the stop() method on the StreamingContext object. Wait for pending batches to complete: Once you have stopped receiving data, you should wait for all the pending batches to complete processing. You can use the awaitTerminationOrTimeout() method on the StreamingContext object to wait for a specified amount of time for the pending batches to complete. Stop the streaming context: If all the pending batches have completed processing within the specified timeout period, you can stop the StreamingContext object by calling the stop() method with the gracefulStop() flag set to true. This will ensure that all the output sinks are flushed before the context is stopped. Handle any exceptions: If there are any exceptions or errors during the shutdown process, you should handle them appropriately. You can use try-catch blocks to handle any exceptions that may occur. Different ways to shutdown/stop the Spark Streaming Application gracefully There are several ways to gracefully shutdown a Spark Streaming application: Explicitly calling the JVM Shutdown Hook in the driver program (Not Recommended) Using spark.streaming.stopGracefullyOnShutdown = true (Okay) Use an external approach to control internal program shutdown (Recommended) Expose the interface to the outside world and provide the shutdown function (Recommended) In this article, we will discuss the first two methods. 1. Explicitly calling the JVM Shutdown Hook in the driver program The easiest way to shut down the Spark Streaming application gracefully is by calling the streamingContext stop() method inside a JVM Shutdown Hook. A shutdown hook is a thread that is registered with the JVM to run when the JVM is about to shut down. JVM will shut down. For example, all non-daemon threads exited, System.exit was called or CTRL+C was typed. Using Java Use Runtime.getRuntime().addShutdownHook() to register the shutdown hook method, and call the stop method of spark before the JVM exits to perform an elegant shutdown. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("Gracefully stopping Spark Streaming Application.");
streamingContext.stop( true, true);
logger.info("The Spark Streaming Application has been successfully stopped.");
})); This can ensure that before the process is killed and before the driver ends, ssc.stop will be called to ensure that the data is processed. Using Scala sys.addShutdownHook {
logger.info("Gracefully stopping Spark Streaming Application.")
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
logger.info("The Spark Streaming Application has been successfully stopped.")
} Using Python import signal
import sys
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
def stop_gracefully(ssc):
print("Gracefully stopping Spark Streaming Application.")
ssc.stop(stopSparkContext=True, stopGracefully=True)
print("The Spark Streaming Application has been successfully stopped.")
sys.exit(0)
if __name__ == '__main__':
# Create Spark Config
sparkConf = SparkConf().setAppName("SparkStreamingGracefulShutdownHookApp")
# Create Spark Streaming context
ssc = StreamingContext(sparkConf, 30 * 1000)
# Register the shutdown hook
signal.signal(signal.SIGTERM, lambda sig, frame: stop_gracefully(ssc))
signal.signal(signal.SIGINT, lambda sig, frame: stop_gracefully(ssc))
# Start processing data
ssc.start()
ssc.awaitTermination() Note: This approach is recommended only for Spark version < 1.4. Drawbacks: There are some drawbacks to the current approach: There is a possibility that a deadlock situation will occur. There is no guarantee that a shutdown hook will be called by the JVM at all. 2. Using spark.streaming.stopGracefullyOnShutdown = true Solution1 will have too much trouble adding such repetitive codes in each program. From Spark version 1.4 onwards, Spark has builtin spark.streaming.stopGracefullyOnShutdown parameters to decide whether to close the Streaming program in a graceful way (see SPARK-7776 for details). By default spark.streaming.stopGracefullyOnShutdown parameter value is false. When spark.streaming.stopGracefullyOnShutdown is set to true, the Spark Streaming application will be stopped gracefully when a shutdown signal is received. This means that the application will complete its current batches and clean up resources before shutting down. To use the spark.streaming.stopGracefullyOnShutdown parameter, it must be set in the Spark configuration either via command-line arguments or a properties file or programmatically using the SparkConf object. spark-submit: spark-submit --conf spark.streaming.stopGracefullyOnShutdown=true ... Scala: import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
val sparkConf = new SparkConf().setAppName("SparkStreamingGracefulShutdownSignalApp").set("spark.streaming.stopGracefullyOnShutdown", "true")
val sc = new SparkContext(sparkConf) Python: from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
sparkConf = SparkConf().setAppName("SparkStreamingGracefulShutdownSignalApp").set("spark.streaming.stopGracefullyOnShutdown", "true")
sc = SparkContext(conf=sparkConf) By using this approach, we should not use the first explicit shutdown hook approach or call the ssc.stop() method in the driver along with this parameter. We can just set this parameter and then call methods ssc.start() and ssc.awaitTermination(). No need to call ssc.stop() method. Otherwise, the application might hang during the shutdown. Matters needing attention: By default, the spark.yarn.maxAppAttempts parameter uses the default value from yarn.resourcemanager.am.max-attempts value in Yarn. The default value is 2. So, after the kill command stops the first AM, YARN will automatically start another AM/driver. You have to kill the second one. You can set --conf spark.yarn.maxAppAttempts=1 during the spark submission process, but if this parameter is set to 1, there will be a risk that AM will fail and not retry, and the disaster recovery effect will become worse. This method is not recommended to kill the application using the Yarn application -kill <applicationid> command. This command will send a SIGTERM signal to the container, but then a SIGTERM signal will be sent. This time interval is yarn.nodemanager.sleep-delay-before-sigkill.ms determined by (default 250ms). If the value is set to 60000ms, it still cannot work normally. The log only has these two lines, as follows: 23/03/21 17:03:36 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
23/03/21 17:03:36 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook
23/03/21 17:03:36 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 receivers
23/03/21 17:03:36 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver The spark.streaming.stopGracefullyOnShutdown parameter will depend on this parameter: spark.streaming.gracefulStopTimeout (unit ms), the default is 10 times the interval between batches. When the processing is not completed after this time, it is forced to stop. Drawbacks: There are a few drawbacks to this approach: It can be run only on the same machine on which the driver program was run and not on any other node machine in the spark cluster. Make sure to set spark.yarn.maxAppAttempts parameter value will be 1. Implementation import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
// nc -lk 9999
object SparkStreamingGracefulShutdownSignalApp extends App with Serializable {
private val appName = getClass.getSimpleName.replace("$", "") // App Name
@transient private lazy val logger: Logger = Logger.getLogger(appName)
private val checkpointDirectory = s"/tmp/streaming/$appName/checkpoint" // Checkpoint Directory
private val batchInterval: Long = 30 * 1000 // 30 seconds batch interval
if (args.length < 2) {
logger.error(s"Usage\t: $appName <hostname> <port>")
logger.info(s"Example\t: $appName localhost 9999")
System.exit(1)
}
private def createContext(hostname: String, port: Int): StreamingContext = {
// Creating the SparkConf object
val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
sparkConf.set("spark.yarn.maxAppAttempts", "1")
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
//sparkConf.set("spark.streaming.gracefulStopTimeout", (10 * batchInterval).toString)
// Creating the StreamingContext object
logger.info(s"Creating StreamingContext with duration $batchInterval milli seconds ...")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval))
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
logger.info("StreamingContext created successfully ...")
// Create a socket stream on target hostname:port
val lines = ssc.socketTextStream(hostname, port)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc
}
// Get StreamingContext from checkpoint data or create a new one
private val Array(hostname, port) = args
logger.info(s"Hostname $hostname and Port $port ...")
private val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => createContext(hostname, port.toInt))
ssc.start()
logger.info("StreamingContext Started ...")
//Waiting for task termination
ssc.awaitTermination()
} Spark Submit Command cat run_spark_streaming_graceful_shutdown_app.sh #!/bin/bash
echo "Running <$0> script"
HOST_NAME=$(hostname -f)
PORT=9999
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 2 \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=false \
--name SparkStreamingGracefulShutdownSignalApp \
--class SparkStreamingGracefulShutdownSignalApp \
/tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar "$HOST_NAME" "$PORT"
echo "Finished <$0> script" How to pass the shutdown signal: To shut down the Spark Streaming application, we need to pass the shutdown signal explicitly. (Note: Do not use -9 to force it to close; otherwise, the hook cannot capture.) There are multiple ways we can pass the shutdown signal: Use Ctrl + C command at the terminal (It will work only in client mode). Find the driver process and kill that process by sending the SIGTERM(15) signal. Go to Spark UI > Go to Executors tab > Find Driver host Log in to the above mentioned driver machine address and run the following command to find out the PID process. ps -ef | grep java | grep ApplicationMaster | grep <applicationId> Note: You need to replace your applicationId in the above command. For Example: ps -ef | grep java | grep ApplicationMaster | grep application_1679416741274_0004 Run the following command to kill the process. ps -ef | grep java | grep ApplicationMaster | grep <applicationId> | awk '{print $2}' | xargs kill -SIGTERM Note: We can use either SIGTERM or 15 while killing the process. For Example: ps -ef | grep java | grep ApplicationMaster | grep application_1679416741274_0004 | awk '{print $2}' | xargs kill -SIGTERM In Part2, remaining approaches to shut down the Spark streaming application will be covered.
... View more
Labels:
04-17-2023
08:30 AM
Hi @saivenkatg55 Could you please check where datalakedev host name is configured in your hadoop/hive configuration files? and also check you are able to ping the datalakedev hostname where you are running spark-sql command.
... View more