Member since
06-02-2020
331
Posts
63
Kudos Received
49
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
624 | 07-11-2024 01:55 AM | |
1450 | 07-09-2024 11:18 PM | |
1512 | 07-09-2024 04:26 AM | |
1153 | 07-09-2024 03:38 AM | |
1372 | 06-05-2024 02:03 AM |
07-28-2023
02:12 AM
Hi @MrPoseido By using SHS, just we can the see the list of applications and its status. If SHS is down still you can copy the files to another cluster where SHS service is running and you can visualize. Cloudera team is working on new feature to enable Spark History Server HA feature in future releases.
... View more
07-26-2023
05:28 AM
Hi @fazlulhaque / @davidebelvedere / @ravitinku18 Please find the Part2: https://community.cloudera.com/t5/Community-Articles/Spark-Streaming-Graceful-Shutdown-Part2/ta-p/373348
... View more
07-19-2023
02:27 AM
Spark Streaming Graceful shutdown
1. Introduction
In the first part of the blog post, we discussed the importance of graceful shutdown in Spark Streaming applications.
We also saw different ways to shutdown a Spark Streaming application gracefully, including:
Explicitly calling the JVM Shutdown Hook in the driver program.
Using spark.streaming.stopGracefullyOnShutdown = true setting.
In the second part, we will discuss a better way to gracefully shutdown the streaming application using other approaches.
2. Use an external approach to control internal program shutdown
This approach allows an external process to control the shutdown of the internal program by sending a marker record, which is detected by the internal program. The internal program can then initiate a graceful shutdown procedure, ensuring that any unsaved data is saved and any resources are properly released before exiting.
Benefits:
It is more reliable than relying on the internal application to shut down gracefully on its own.
It allows the external application to have more control over the shutdown process.
It can be used to shut down the internal application even if it is not responding to requests.
Drawbacks:
It requires the external application to be aware of the internal application's shutdown process.
It can add complexity to the shutdown process.
2.1 Different Implementations:
There are different ways to implement Spark Streaming shutdown gracefully using this approach.
Using a File System: In this approach, the marker record is a file that is written to a file system (e.g., HDFS, NFS, AWS S3, Google Cloud Storage etc.). The marker file is created in a specified directory, and its presence indicates that the streaming application should be stopped. The Spark Streaming application periodically checks for the existence of the file and if it is found, the application stops processing new data and shuts down gracefully. This approach is straightforward to implement and can work well in most scenarios.
Using a Database: In this approach, a marker record is stored in a relational database (e.g., MySQL, PostgreSQL, Oracle), indicating that the application should stop processing new data. A marker record is a row in a database table. The application checks the value of the marker record at regular intervals and stops when it indicates that the application should be stopped. This approach provides a centralized way of managing the marker record without introducing a new marker record into the input stream.
Using a Messaging System: In this approach, a marker message is sent to a message queue (e.g., Kafka, RabbitMQ, Pulsar) when the application should be stopped. The Spark Streaming application subscribes to the message topic and listens for the marker message. When the marker message is received, the application stops processing new data and shuts down gracefully. This approach provides a more flexible and scalable way of managing the marker record, as multiple instances of the application can listen to the same message queue.
Using a Distributed Cache: In this approach, the marker record is stored in a distributed cache (e.g., Redis) and periodically checked by the streaming application. If the marker record is present in the cache, the application stops processing new data and shuts down gracefully after processing the current batch.
Each of these approaches has its advantages and disadvantages, and the best approach depends on the specific requirements of the use case.
2.2 Using a File System:
We can use any file system to create a marker file. In this example, I used the HDFS file system to create the marker file.
Here are the steps to achieve graceful shutdown using the Marker Approach with HDFS:
Define a marker file: The marker file is a simple text file that is created when the application is stopped. It should have a unique name that is unlikely to be used in the normal data stream.
Periodically check for the presence of the marker file: The streaming application can use the Hadoop FileSystem API to periodically check for the presence of the marker file in the HDFS directory. This can be done every few seconds or minutes, depending on the application's requirements.
If the marker file is detected, call ssc.stop() to shut down the application gracefully: Once the marker file is detected, the streaming application should call ssc.stop() to shut down the application gracefully.
Implementation:
SparkStreamingKafkaGracefulShutdownMarkerApp.scala
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkStreamingKafkaGracefulShutdownMarkerApp extends App with Serializable {
// Create a logger instance for logging messages
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
// Define AppName
private val appName = getClass.getSimpleName.replace("$", "")
if (args.length < 4) {
logger.error(s"Usage\t: $appName <bootstrapServers> <groupId> <topics> <markerFile>")
logger.info(s"Example\t: $appName <localhost:9092> <my_group> <test_topic> </tmp/graceful_shutdown>")
System.exit(1)
}
// Consume command line arguments
private val Array(bootstrapServers, groupId, topic, markerFile) = args
// Create a SparkConf object
val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
// Create StreamingContext, with batch duration in seconds
private val duration = if (args.length > 4) args(4).toInt else 30
logger.info(s"Creating StreamingContext with batch duration $duration seconds...")
private val batchDuration = Seconds(duration)
val streamingContext = new StreamingContext(sparkConf, batchDuration)
logger.info("StreamingContext created successfully ...")
// Define Kafka Parameters
private val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// Create a set of topics from a string
private val topics: Set[String] = topic.split(",").map(_.trim).toSet
// Create a streaming context from Kafka
private val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Get the lines of text from Kafka
val lines = stream.map(record => record.value())
// Split lines into words
val words = lines.flatMap(_.split(" "))
// Map every word to a tuple
private val wordMap = words.map(word => (word, 1))
// Count occurrences of each word
val wordCounts = wordMap.reduceByKey(_ + _)
// Print the word count
wordCounts.print()
// Start stream processing
streamingContext.start()
logger.info("StreamingContext Started ...")
// Gracefully shutdown the Spark Streaming application when the termination marker is received
StopByMarkerFileSystem.stopByMarkerFile(streamingContext, markerFile)
// Wait for the computation to terminate
streamingContext.awaitTermination()
}
StopByMarkerFileSystem.scala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.log4j.Logger
import org.apache.spark.streaming.StreamingContext
object StopByMarkerFileSystem {
// Create a logger instance for logging messages
@transient private lazy val logger: Logger = Logger.getLogger(getClass.getSimpleName.replace("$", ""))
/**
* Stops the Spark Streaming context gracefully based on the presence of a marker file.
*
* @param ssc The StreamingContext to stop
* @param markerFile The path of the marker file indicating when to stop
*/
def stopByMarkerFile(ssc: StreamingContext, markerFile: String): Unit = {
var isStop = false
while (!isStop) {
if (!isStop && checkMarkerFileExists(markerFile)) {
logger.info("Marker file exists, stopping the Spark Streaming Context gracefully ...")
// Stop the streaming context if it has not already terminated
ssc.stop(stopSparkContext = true, stopGracefully = true)
// Delete the marker file
deleteMarkerFile(markerFile)
logger.info("Spark Streaming Context is Stopped ...")
isStop = true
}
}
}
/**
* Checks if a marker file exists in the file system.
*
* @param markerFile The path of the marker file
* @return True if the marker file exists, false otherwise
*/
private def checkMarkerFileExists(markerFile: String): Boolean = {
// Create a new Path object with the provided marker file path
val path = new Path(markerFile)
// Get the FileSystem associated with the path
val fs = path.getFileSystem(new Configuration())
// Check if the marker file exists in the file system
val markerFileExists = fs.exists(path)
if (markerFileExists)
logger.info(s"MarkerFile $markerFile exists")
markerFileExists
}
/**
* Deletes a marker file from the file system.
*
* @param markerFile The path of the marker file to delete
*/
private def deleteMarkerFile(markerFile: String): Unit = {
logger.info(s"Deleting marker file: $markerFile")
// Create a new Path object with the provided marker file path
val path = new Path(markerFile)
// Get the FileSystem associated with the path
val fs = path.getFileSystem(new Configuration())
// Check if the marker file exists
if (fs.exists(path)) {
// Delete the marker file
fs.delete(path, true)
// Log a message indicating the successful deletion of the marker file
logger.info(s"MarkerFile $markerFile successfully deleted")
}
}
}
Spark Submit Command:
Before running the below script you need to update the BOOTSTRAP_SERVERS value.
#!/bin/bash
echo "Running <$0> script"
BOOTSTRAP_SERVERS="localhost:9092"
GROUP_ID="test-group"
TOPICS="test-topic"
MARKER_FILE="/apps/spark/streaming/shutdown/spark_streaming_kafka_marker_file"
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 2 \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=false \
--name SparkStreamingKafkaGracefulShutdownMarkerApp \
--class SparkStreamingKafkaGracefulShutdownMarkerApp \
/tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar $BOOTSTRAP_SERVERS $GROUP_ID $TOPICS $MARKER_FILE
echo "Finished <$0> script"
How to pass Shutdown Signal:
hdfs dfs -mkdir -p /apps/spark/streaming/shutdown
hdfs dfs -touch /apps/spark/streaming/shutdown/spark_streaming_kafka_marker_file
3. Expose the interface to the outside world and provide the shutdown function
This approach internally exposes a socket thread or http service to receive requests, and waits for the trigger to close the streaming program. A socket thread or http service needs to be started in the driver.
It is recommended to use the http service as the socket is slightly low-level for processing and it is a little more complicated. If we use http service, we can directly use the embedded jetty to expose an http interface to the outside. One more advantage of using the http service is availability of embedded Jetty libraries within Spark, so we don’t required to introduce additional dependencies.
In the driver program, start an embedded Jetty server or any other HTTP server of your choice. The server should listen on a specific port to accept incoming requests. When the shutdown request is received, the HTTP service should send a signal or command to the Spark Streaming application to initiate the shutdown process.
To initiate the shutdown, you can use various methods such as the curl commands, web browsers, or other HTTP client tools. Send an HTTP request to the driver's IP address (which can be seen in the application log of the program startup, or on the Spark master UI page) and the configured port for the shutdown endpoint. This request will trigger the shutdown process and gracefully stop the Spark Streaming application.
Benefits:
Here are some of the benefits of using an HTTP service to gracefully shutdown a Spark Streaming application:
It is easy to implement.
It does not relay on any external storage system.
Drawbacks:
Here are some of the drawbacks of using an HTTP service to gracefully shutdown a Spark Streaming application:
It requires an additional HTTP service to be running.
Ensure that the driver application is reachable from the outside world.
Make sure each application is using unique port number.
Implementation:
Here are some steps on how to implement this approach:
Create a simple HTTP server that listens for a shutdown request.
When the shutdown request is received, the HTTP server can then send a signal to the Spark Streaming application to shut down gracefully.
Find the IP address where the driver is located. This can be seen in the log of the program startup, or on the Spark master UI page.
Once you have the IP address, you can use curl or a web browser to send the shutdown request to the HTTP server.
SparkStreamingKafkaGracefulShutdownHttpApp.scala
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkStreamingKafkaGracefulShutdownHttpApp extends App with Serializable {
// Create a logger instance for logging messages
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
// Define AppName
private val appName = getClass.getSimpleName.replace("$", "")
if (args.length < 4) {
logger.error(s"Usage\t: $appName <bootstrapServers> <groupId> <topics> <jettyPort>")
logger.info(s"Example\t: $appName <localhost:9092> <my_group> <test_topic> <3443>")
System.exit(1)
}
// Consume command line arguments
private val Array(bootstrapServers, groupId, topic, jettyPort) = args
// Create a SparkConf object
val sparkConf = new SparkConf().setAppName(appName).setIfMissing("spark.master", "local[2]")
// Create StreamingContext, with batch duration in seconds
private val duration = if (args.length > 4) args(4).toInt else 30
private val batchDuration = Seconds(duration)
logger.info(s"Creating StreamingContext with batch duration $duration seconds...")
val streamingContext = new StreamingContext(sparkConf, batchDuration)
logger.info("StreamingContext created successfully ...")
// Define Kafka Parameters
private val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
// Create a set of topics from a string
private val topics: Set[String] = topic.split(",").map(_.trim).toSet
// Create a streaming context from Kafka
private val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// Get the lines of text from Kafka
val lines = stream.map(record => record.value())
// Split lines into words
val words = lines.flatMap(_.split(" "))
// Map every word to a tuple
private val wordMap = words.map(word => (word, 1))
// Count occurrences of each word
val wordCounts = wordMap.reduceByKey(_ + _)
// Print the word count
wordCounts.print()
// Start stream processing
streamingContext.start()
logger.info("StreamingContext Started ...")
// Start the HTTP server that accepts the stop request
StopByHttpHandler.httpServer(jettyPort.toInt, streamingContext, appName)
// Wait for the computation to terminate
streamingContext.awaitTermination()
}
StopByHttpHandler.scala
import org.apache.log4j.Logger
import org.apache.spark.streaming._
import org.spark_project.jetty.server.handler.{AbstractHandler, ContextHandler}
import org.spark_project.jetty.server.{Request, Server}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
object StopByHttpHandler {
// Create a logger instance for logging messages
@transient private lazy val logger: Logger = Logger.getLogger(getClass.getSimpleName.replace("$", ""))
/**
* Responsible for starting the guardian jetty service
*
* @param port - The port number exposed externally
* @param ssc - Streaming Context
* @param appName - The name of the Spark application
*/
def httpServer(port: Int, ssc: StreamingContext, appName: String): Unit = {
// Log a debug message indicating the start of the HTTP server
logger.debug("Starting the HTTP Server with the port " + port)
// Create a new Jetty server instance
val server = new Server(port)
// Create a new context handler
val context = new ContextHandler()
// Set the context path for the HTTP server
val contextPath = "/shutdown/" + appName
logger.info(s"Server Context Path $contextPath")
context.setContextPath(contextPath)
// Set the handler for the context to a custom handler for stopping the StreamingContext
context.setHandler(new StopStreamingContextHandler(ssc))
// Set the handler for the server
server.setHandler(context)
// Start the Jetty server
server.start()
// Log a message indicating the successful start of the HTTP server
logger.info("HttpServer started successfully")
}
/**
* Responsible for accepting http requests to gracefully shutdown the streaming application
*
* @param ssc - Streaming Context
*/
private class StopStreamingContextHandler(ssc: StreamingContext) extends AbstractHandler {
override def handle(target: String, baseRequest: Request, request: HttpServletRequest,
response: HttpServletResponse): Unit = {
// Log the target of the request
logger.info("Serving target: " + target)
// Log a message indicating the graceful stopping of the Spark Streaming application
logger.info("Gracefully stopping Spark Streaming Application")
// Stop the StreamingContext, including the underlying SparkContext, gracefully
ssc.stop(stopSparkContext = true, stopGracefully = true)
// Log a message indicating the successful stopping of the application
logger.info("Spark Streaming Application successfully stopped")
// Set the content type and status of the HTTP response
response.setContentType("text/html; charset=utf-8")
response.setStatus(HttpServletResponse.SC_OK)
// Write a response message to the HTTP response
response.getWriter.println("The Spark Streaming application has been successfully stopped.")
// Mark the base request as handled
baseRequest.setHandled(true)
}
}
}
Spark Submit Command:
Before running the below script you need to update the BOOTSTRAP_SERVERS value.
#!/bin/bash
echo "Running <$0> script"
BOOTSTRAP_SERVERS="localhost:9092"
GROUP_ID="test-group"
TOPICS="test-topic"
HTTP_PORT=3443
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 2g \
--executor-memory 2g \
--num-executors 2 \
--executor-cores 2 \
--conf spark.dynamicAllocation.enabled=false \
--name SparkStreamingKafkaGracefulShutdownHttpApp \
--class SparkStreamingKafkaGracefulShutdownHttpApp \
/tmp/spark-streaming-graceful-shutdown-1.0.0-SNAPSHOT.jar $BOOTSTRAP_SERVERS $GROUP_ID $TOPICS $HTTP_PORT
echo "Finished <$0> script"
How to pass the shutdown signal:
Step 1: Navigate to the Spark UI -> Executors -> Find out the driver. For example, the driver host is localhost.
Step 2: Run the following command in your browser to stop the application.
http://localhost:3443/shutdown/SparkStreamingKafkaGracefulShutdownHttpApp
In your case, you need to replace the Spark application driver host.
We will get the following message after a successful shutdown:
The Spark Streaming application has been successfully stopped.
4. Conclusion:
In this article, we discussed the importance of graceful shutdown in Spark Streaming applications. We saw that graceful shutdown is required to ensure that all data is processed and that no data is lost. We also saw different ways to shutdown a streaming application gracefully. Finally, we discussed the recommended ways to use graceful shutdown in production.
5. References
Spark Streaming Graceful Shutdown - Part1
Source Code
... View more
07-17-2023
03:04 AM
1 Kudo
Spark3 HBase Integration This blog post will guide you through the process of integrating Spark 3 with HBase, providing you with valuable insights and step-by-step instructions. Note: This blog post will cover both Spark3 and HBase services only when they exists in the same cluster. If HBase is in remote cluster, you can go through the hbase-configure-spark-connector-remote-cluster tutorial. Prerequisites To use HBase-Spark3 connector, we need to ensure that CDP version is 7.1.7 SP1 Spark3 parcel or above. Ensure that every Spark node has the HBase Master, Region Server, or Gateway role assigned to it. If no HBase role is assigned to a Spark node, add the HBase Gateway role to it, which ensures that the HBase configuration files are available on the Spark node. HBase Integration with Spark3 HBase integration with Spark can be achieved using the HBase-Spark Connector, which provides a seamless way to interact with HBase from within Spark applications. Here's how you can integrate HBase with Spark3 using the HBase Spark Connector: 1. Configure HBase-Spark connector Step1: Adding the hbase-spark connector jars to classpath. Cloudera Manager --> Spark3 --> Configuration Ensure that the HBase service is selected in Spark Service as a dependency. Locate the Spark 3 Client Advanced Configuration Snippet (Safety Valve) for spark3-conf/spark-defaults.conf property or search for it by typing its name in the Search box. Add the following properties to ensure that all required HBase platform dependencies are available on the classpath for the Spark executors and drivers. spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar Note: Before adding above parameters, you need to find out and replace VERSION-NUMBER from your cluster. For example version-number is 1.0.0.3.2.7171000.2-1: spark.driver.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar
spark.executor.extraClassPath=/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar Save the Changes and restart the Spark3 service. Step2: This step is optional when we are not using any kind of Spark SQL Filters. Go to Cloudera Manager > HBase > Configuration. Locate the RegionServer Environment Advanced Configuration Snippet (Safety Valve) property or Search for regionserver environment. Click the plus icon to add the following property: Key: HBASE_CLASSPATH Value: /opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-***VERSION-NUMBER***.jar:/opt/cloudera/parcels/CDH/jars/scala-library-***SCALA-VERSION***.jar Note: Ensure that the listed jars have the correct version number in their name. Example: /opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-1.0.0.3.2.7171000.2-1.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.12.10.jar Save Changes and Restart the Region Server. 2. HBase-Spark connector Example 2.1 Schema: In order to use the HBase-Spark connector, you need to define the schema to map the attributes/columns from HBase to Spark or vice versa. Let's assume you have employee table with columns id (Long type), name (String), age (Short), salary(Float). The employee personal information i.e, the id, name and age are stored in the per column family and professional information, i.e salary is stored in the prof column family. The key in HBase table will be the id attribute. Spark HBase Type/Table Employee employees Id id:Long key Name name: String per:name Age age: Short per:age Salary salary: Float prof:salary There are two ways we can specify the schema: Using hbase.columns.mapping property: This is very simple way to map the hbase columns to the Spark data types.For example: val hbase_table = "employees"
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load() Using HBaseTableCatalog.tableCatalog property: This is also very useful if you want to specify the schema in json format and also useful when you migrate your code from SHC to HBase Spark Connector. For example: val catalog = s"""{
|"table":{"namespace":"my_nm", "name":"employees"},
|"rowkey":"key",
|"columns":{
|"id":{"cf":"rowkey", "col":"key", "type":"long"},
|"name":{"cf":"per", "col":"name", "type":"string"},
|"age":{"cf":"per", "col":"age", "type":"short"},
|"salary":{"cf":"prof", "col":"salary", "type":"float"}
|}
|}""".stripMargin
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
val df = spark.read.format("org.apache.hadoop.hbase.spark").options(Map(HBaseTableCatalog.tableCatalog->catalog)).load()
df.show() In the above example, in the catalog definition section, you can see the namespace is configured with the my_nm. 2.2 HBase: Step 1: Launch the hbase-shell hbase shell Step 2: Create the HBase table. create 'employees', 'per', 'prof' Step 3: Quit the HBase shell quit 2.3 Spark: Step 1: Launch the spark3-shell If you are configured HBase Spark Connector at cluster level then you can run the following command to launch the spark3-shell. spark3-shell If you are not configured HBase Spark Connector at cluster level then you can use the following command to launch the spark3-shell. spark3-shell --jars /opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-<VERSION-NUMBER>.jar,/opt/cloudera/parcels/SPARK3/lib/spark3/hbase_connectors/lib/hbase-spark3-protocol-shaded-<VERSION-NUMBER>.jar Note: You need to replace the VERSION-NUMBER according to your cluster. Step 2: Run the following spark code to write the data to HBase. // Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(1L, "Ranga", 34, 15000.5f),
Employee(2L, "Nishanth", 5, 35000.5f),
Employee(3L, "Meena", 30, 25000.5f)
).toDF()
// Save the data to kudu
val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"
employeeDF.write.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping",hbase_column_mapping).option("hbase.table",hbase_table).option("hbase.spark.use.hbasecontext",false).save() Step 3: Run the following spark code to read the data from HBase. val hbase_column_mapping = "id LONG :key, name STRING per:name,age SHORT per:age, salary FLOAT prof:salary"
val hbase_table = "employees"
// Load the data from HBase
val df = spark.read.format("org.apache.hadoop.hbase.spark").option("hbase.columns.mapping", hbase_column_mapping).option("hbase.table", hbase_table).option("hbase.spark.use.hbasecontext", false).load()
// Display the data
df.show()
// Filter the data
df.filter("age>10").show() PySpark code: /tmp/hbase_spark_connector_app.py: from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, ShortType, FloatType
import json
def main():
spark = SparkSession.builder.appName("HBase Spark Connector App").getOrCreate()
data = [(1, "Ranga", 34, 15000.5), (2, "Nishanth", 5, 35000.5),(3, "Meena", 30, 25000.5)]
schema = StructType([ \
StructField("id",LongType(),True), \
StructField("name",StringType(),True), \
StructField("age",ShortType(),True), \
StructField("salary", FloatType(), True)
])
employeeDF = spark.createDataFrame(data=data,schema=schema)
catalog = json.dumps({
"table":{"namespace":"default", "name":"employees"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"long"},
"name":{"cf":"per", "col":"name", "type":"string"},
"age":{"cf":"per", "col":"age", "type":"short"},
"salary":{"cf":"prof", "col":"salary", "type":"float"}
}
})
employeeDF.write.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).save()
df = spark.read.format("org.apache.hadoop.hbase.spark").options(catalog=catalog).option("hbase.spark.use.hbasecontext", False).load()
df.show()
spark.stop()
if __name__ == "__main__":
main() Spark-Submit: spark3-submit --master yarn \
--deploy-mode client \
/tmp/hbase_spark_connector_app.py 3. Running in Secure Cluster For running in a Kerberos enabled cluster, the user has to include HBase related jars into the classpath as the HBase token retrieval and renewal is done by Spark, and is independent of the connector. In other words, the user needs to initiate the environment in the normal way, either through kinit or by providing principal/keytab. 4. Tuning Parameters: There are several tuning parameters in the Hbase-Spark connector. For example: hbase.spark.query.batchsize - Set the maximum number of values to return for each call to next() in scan. hbase.spark.query.cachedrows - The number of rows for caching that will be passed to scan. Refer more configurations from here. 5. Troubleshooting: 1. org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user: org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user ?rangareddy@HADOOP.COM',action: scannerOpen, tableName:employees, family:prof, column: salary
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.authorizeAccess(RangerAuthorizationCoprocessor.java:568)
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:1013)
at org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor.preScannerOpen(RangerAuthorizationCoprocessor.java:710)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1231)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost$40.call(RegionCoprocessorHost.java:1228)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost$ObserverOperationWithoutResult.callObserver(CoprocessorHost.java:558)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.execOperation(CoprocessorHost.java:631) Problem: The User doesn't have proper permission(s) to access the table/namespace. To access the HBase data, we need to provide a Ranger permission(s) to the user. Solution: Go to Ranger UI --> Access Manager --> HBASE --> Click on cm_hbase --> Create a new policy or use existing policy and provide necessary permissions to access HBase table. 2. java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity of the array: 2
at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:789)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:943)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:929)
at org.apache.hadoop.hbase.spark.datasources.Utils$.hbaseFieldToScalaType(Utils.scala:53)
at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildRow$2(DefaultSource.scala:289)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.hadoop.hbase.spark.HBaseRelation.buildRow(DefaultSource.scala:280)
at org.apache.hadoop.hbase.spark.HBaseRelation.$anonfun$buildScan$9(DefaultSource.scala:366)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) Problem: java.lang.IllegalArgumentException will occur due to schema mismatch between Spark and HBase. For example, from Spark side data type is Int and HBase side data type is String. Solution: There are two solutions for this issue: 1. Providing the correct the schema to map from both Spark and HBase side 2. From Spark side convert the data type to String. 3. java.lang.NullPointerException at org.apache.hadoop.hbase.spark.HBaseRelation java.lang.NullPointerException
at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:138)
at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:78)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) Problem: When trying to read/write the data using HBase Spark Connector, the connector will try to get/initialize the HBaseContext. While getting the HBaseContext value, first it will check the hbase.spark.use.hbasecontext parameter value (by default true) is true then it will get the HBaseContext from cache and in cache HBaseContext does not exist so you are getting the NullPointerException. HBaseSparkConf configuration: val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
val DEFAULT_USE_HBASECONTEXT = true Get the HBaseContext code: //create or get latest HBaseContext val hbaseContext:HBaseContext = if (useHBaseContext) {
LatestHBaseContextCache.latest
} else {
val config = HBaseConfiguration.create()
configResources.map(resource => resource.split(",").foreach(r => config.addResource(r)))
new HBaseContext(sqlContext.sparkContext, config)
} There is already open jira for this issue - https://issues.apache.org/jira/browse/HBASE-18570 Solution: There are multiple solutions for this issue: Set the hbase.spark.use.hbasecontext as false\ Initialise the HBaseContext and load the data using HBase Spark Connector We need to wait till fix the HBASE-18570 jira. 4.org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: employee: 1 time, servers with issues: null
at org.apache.hadoop.hbase.client.BatchErrors.makeException(BatchErrors.java:54)
at org.apache.hadoop.hbase.client.AsyncRequestFutureImpl.getErrors(AsyncRequestFutureImpl.java:1196)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doFlush(BufferedMutatorImpl.java:309)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.close(BufferedMutatorImpl.java:241)
at org.apache.hadoop.hbase.mapred.TableOutputFormat$TableRecordWriter.close(TableOutputFormat.java:91)
at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.closeWriter(SparkHadoopWriter.scala:251)
at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
... 9 more Problem: While saving the data to HBase, if table doesn't exist we will get the above exception. Solution: Creating the HBase table with proper column families. 6. References Configuring HBase-Spark connector using Cloudera Manager when HBase and Spark are on the same cluster hbase-connectors /spark/ hbase-connectors/spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources /HBaseSparkConf.scala HBASE-18570
... View more
07-11-2023
11:03 PM
1 Kudo
Spark3 Kudu Integration
This blog post will guide you through the process of integrating Spark 3 with Kudu, providing you with valuable insights and step-by-step instructions.
Apache Kudu
Kudu is a distributed columnar storage engine optimized for OLAP workloads. Kudu runs on commodity hardware, is horizontally scalable, and supports highly available operations.
Kudu Integration with Spark
Apache Kudu can be integrated with Apache Spark using the built-in Spark-SQL-Kudu library. This integration allows us to read from and write to Kudu tables directly from Spark applications using Spark SQL.
Kudu integrates with Spark through the Data Source API as of version 1.0.0. To integrate Kudu with Spark3, we need to use kudu-spark3_2.12 library.
Below are the step-by-step instructions to seamlessly integrate Kudu with Spark3:
Impala/Kudu:
Step 1: Launch the impala-shell
Go to the Cloudera Manager > Impala > Status > Copy the Impala Shell Command and run the command from the shell.
For example,
impala-shell -i node1 -d default -k --ssl --ca_cert=/var/lib/cloudera-scm-agent/agent-cert/cm-auto-global_cacerts.pem
Step 2: Create the Kudu table.
CREATE TABLE employees
(
id BIGINT,
name STRING,
age SMALLINT,
SALARY FLOAT,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
Step 3: Insert the data into the Kudu table
INSERT INTO employees VALUES (1, "Ranga", 28, 10000), (2, "Nishanth", 5, 40000), (3, "Meena", 30, 24000);
Step 4: Verify the data from the Kudu table
SELECT * FROM employees;
Step 5: Quit the Impala shell
quit;
Spark:
Pre-requisites:
Verify the user has proper permission(s) to access the table/database to avoid the following exception: org.apache.kudu.client.NonRecoverableException: Unauthorized action
at org.apache.kudu.client.KuduException.transformException(KuduException.java:110)
at org.apache.kudu.client.KuduClient.joinAndHandleException(KuduClient.java:470)
at org.apache.kudu.client.KuduClient.openTable(KuduClient.java:288)
at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:327)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
To provide right permission to the user go to Ranger UI > Access Manager > KUDU and click on cm_kudu > Use existing policy or create a new policy and provide required permission.
You need to provide the exact number of kudu master values else similar f exception(s) will occur.
To collect the kudu master values, go to the Cloudera Manager > Kudu > Instances > Copy the Hostname whose Role Type is Master. 23/05/15 12:59:38 WARN client.ConnectToCluster: [kudu-nio-0]: Could not connect to a leader master. Client configured with 1 master(s) (node1:7051) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Could not connect to a leader master. Client configured with 1 master(s) (node1) but cluster indicates it expects 2 master(s) (node1:7051,node2:7051)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.kudu.spark.kudu.KuduContext.<init>(KuduContext.scala:169)
at org.apache.kudu.spark.kudu.KuduRelation.<init>(DefaultSource.scala:325)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:132)
at org.apache.kudu.spark.kudu.DefaultSource.createRelation(DefaultSource.scala:97)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
... 47 elided
Step 6: Launch the spark3-shell by passing kudu-spark3 jar.
There are two ways to pass the kudu-spark3 connector jar to spark3-shell:
Using Jars: spark3-shell --jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar
Using Packages: spark3-shell --packages org.apache.kudu:kudu-spark3_2.12:<kudu-cdp-version> --repositories https://repository.cloudera.com/artifactory/cloudera-repos/
You can use any of the above options.
For <kudu-cdp-version>, check Cloudera Runtime component versions in Release Notes.
Step 7: Run the spark code to retrieve data from kudu.
Note: Before running the following code, you must replace the kudu.master value collected from prerequisite.
val kudu_table = "default.employees"
val kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
// Load the data from kudu
val df = spark.read.options(Map("kudu.master" -> kudu_master, "kudu.table" -> kudu_table)).format("kudu").load()
// Display the data
df.show()
// Create sample dataset to insert
case class Employee(id:Long, name: String, age: Short, salary: Float)
val employeeDF = Seq(
Employee(4L, "Employee6", 56, 1500.5f),
Employee(5L, "Employee7", 30, 15000.5f)
).toDF()
// Save the data to kudu
employeeDF.write.options(Map("kudu.master"-> kudu_master, "kudu.table"-> kudu_table)).mode("append").format("kudu").save()
Note: After running the above code if you get any error like table not found, then you need to append impala:: to the kudu_table. For example val kudu_table = "impala::default.employees"
PySpark Code:
filename: kudu_pyspark3_example.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Kudu Integration with Spark3").getOrCreate()
kudu_table = "default.employees"
kudu_master = "kudu.master1:7051,kudu.master2:7051,kudu.master2:7051"
df=spark.read.option('kudu.master', kudu_master).option('kudu.table', kudu_table).format("kudu").load()
df.show()
spark.stop()
Using Spark with a Secure Kudu Cluster
The Kudu Spark integration is able to operate on secure Kudu clusters which have authentication and encryption enabled, but the submitter of the Spark job must provide the proper credentials.
Client mode:
To submit the Spark application in client mode, the submitting user must have an active Kerberos ticket granted through kinit.
kinit spark3-submit \
--master yarn \
--deploy-mode client \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
kudu_pyspark3_example.py
Note: In client mode, the user needs to be authenticated using kinit only. If you specify keytab and principal, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Cluster mode:
To submit the Spark application in cluster mode, the Kerberos principal name and keytab location must be provided through the --principal and --keytab arguments to spark3-submit.
spark3-submit \
--master yarn \
--deploy-mode cluster \
--jars /opt/cloudera/parcels/CDH/lib/kudu/kudu-spark3_2.12.jar \
--principal user/node1.hadoop.com@HADOOP.COM \
--keytab user.keytab \
kudu_pyspark3_example.py
Note: In cluster mode, the user needs to be authenticated using keytab and principal only. If we do authentication using kinit, it will throw the following exception.
java.security.PrivilegedActionException: org.apache.kudu.client.NonRecoverableException: Couldn't find a valid master in (node1:7051). Exceptions received: [org.apache.kudu.client.NonRecoverableException: server requires authentication, but client does not have Kerberos credentials (tgt). Authentication tokens were not used because no token is available]
Resources
1. Developing Applications With Apache Kudu
2. Kudu integration with Spark
... View more
06-29-2023
08:41 PM
Hi @hightek2699 Don't install pyspark manually using pip install command. Use the cloudera provided pyspark.
... View more
06-14-2023
01:25 AM
1 Kudo
Hi @cirrus You can find the following optimize code. /tmp/test_pyspark.py from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession
from datetime import datetime
import math
spark = SparkSession.builder \
.appName('Test App') \
.getOrCreate()
num_rows = 2350000
num_columns = 2500
records_per_file=5000
num_partitions = int(math.ceil(num_rows/records_per_file))
data = spark.range(num_rows).repartition(num_partitions)
print("Number of Partitions: " + str(data.rdd.getNumPartitions()))
start_time = datetime.now()
data = data.select(*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
#data = data.select("*",*[expr('rand() * 2 - 1 as col'+str(i)) for i in range(num_columns)])
end_time = datetime.now()
delta = end_time - start_time
# time difference in seconds
print("Time difference to select the columns is "+ str(delta.total_seconds()) +" seconds")
start_time = datetime.now()
data.write.format("parquet").mode("overwrite").save("/tmp/test")
end_time = datetime.now()
delta = end_time - start_time
# time difference in seconds
print("Time difference for writing the data to HDFS is "+ str(delta.total_seconds()) +" seconds")
spark.stop() Spark-submit command: spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=16G \
--conf spark.driver.memoryOverhead=1g \
--conf spark.executor.memory=16G \
--conf spark.executor.memoryOverhead=1g \
--conf spark.memory.fraction=0.8 \
--conf spark.memory.storageFraction=0.4 \
--conf spark.executor.cores=5 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.driver.extraJavaOptions="-Xss1024m" \
--conf spark.executor.extraJavaOptions="-Xss1024m" /tmp/test_pyspark.py
... View more
06-12-2023
10:48 PM
1 Kudo
Hi @cirrus If you have more questions, i will suggest to open a cloudera case we will provide all required information. After creating cloudera case, please attach application logs, event logs and if possible code as well to the case.
... View more
06-12-2023
02:12 AM
Hi @hightek2699 Could you please share me the what is the exact issue when you are running inside virtual environment. And also provide the steps what you have followed.
... View more
06-09-2023
11:40 AM
1 Kudo
Hi @cirrus Application is failed with StackOverflowError. To resolve the StackOverflowError you need to increase the Stack size. You can refer the following article how to increase the StackSize. https://rangareddy.github.io/SparkStackOverflow/
... View more