Member since
04-18-2024
4
Posts
1
Kudos Received
0
Solutions
07-15-2024
08:20 PM
Hi @RangaReddy , Exception stack trace: Currently we are running our spark jobs on yarn using same code and we never get his issue. Could it be caused by lack of memory. 2. We didn't hard code the clientmode any where. I was working fine in yarn not with Kubernetes. 3. we have tried by providing the following but it didn't work. And we also downloaded these jars and placed in the jars folder. But no Luck. --packages org.apache.hadoop:hadoop-aws:3.3.4 \ --packages com.amazonaws:aws-java-sdk-bundle:1.12.262 \ --packages org.apache.spark:spark-hadoop-cloud_2.12:3.5.1 \ --packages org.apache.hadoop:hadoop-client-api:3.3.4 \ --packages org.apache.hadoop:hadoop-client-runtime:3.3.4 \
... View more
06-17-2024
10:38 PM
Hi , We are in the process of migrating from YARN to Kubernetes for its benefits and upgrading our Spark version from 2.4.4 to 3.5.1. As part of this transition, we have decided to use Scala version 2.12.18 and have upgraded Java from version 8 to 11. Currently, I am encountering three main issues: I am experiencing an ArithmeticException due to long overflow. Could the switch from Java 8 to 11 be causing this issue? The deployment mode specified as cluster in the spark-submit command is being overridden to client. I am unable to use AWS Hadoop package classes in spark-submit, despite including the jars in the container. $SPARK_HOME/bin/spark-submit \ --master k8s://$K8S_SERVER \ \ --deploy-mode cluster \ --name testing \ --class dt.cerebrum.iotengine.sparkjobs.streaming \ --conf spark.kubernetes.file.upload.path=s3a://cb-spark/path \ --conf spark.hadoop.fs.s3a.endpoint="http://xxxxxxx.xxx" \ --conf spark.hadoop.fs.s3a.access.key="xxxx" \ --conf spark.hadoop.fs.s3a.secret.key="xxxxxxxxx" \ --conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.hadoop.fs.s3a.fast.upload=true \ --conf spark.hadoop.fs.s3a.path.style.access="true" \ s3a://cb-spark/iot_engine.jar Any assistance you could provide on these issues would be greatly appreciated. Thank you.
... View more
Labels:
- Labels:
-
Apache Spark
06-17-2024
10:10 PM
Hi @RangaReddy , thank you very much for your response and suggestions. I tried the steps you recommended, and while they were helpful, I found that the issue was ultimately resolved by increasing the executor memory and by setting the spark.file.transferTo=false. I appreciate your assistance.
... View more
04-18-2024
03:39 AM
1 Kudo
While executing the spark application in yarn i was facing the following issue twice or thrice a day these days. ERROR MicroBatchExecution:91 - Query [id = 0ec965b7-5dda-43be-940c-3ec8672bcd5c, runId = 17c15719-ab20-4488-ba37-ccf6a6ca27e1] terminated with error org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 185575 (start at DeviceLocationDataListener.scala:148) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Stream is corrupted at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:470) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(generated.java:184) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:206) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$class.isEmpty(Iterator.scala:331) at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334) at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111) at scala.collection.AbstractIterator.nonEmpty(Iterator.scala:1334) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is corrupted at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157) at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:170) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:361) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:348) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.util.Utils$.copyStream(Utils.scala:369) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:462) ... 32 more at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) I suppose, i was casused when executing the following code: df.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
------mongo insert 1---------
------mongo insert 2---------
batchDF
.select(DeviceIOIDS.TENANTGROUPUID.value, DeviceIOIDS.WORKERID.value, DeviceIOIDS.TASKUID.value)
.groupBy(DeviceIOIDS.TENANTGROUPUID.value, DeviceIOIDS.WORKERID.value, DeviceIOIDS.TASKUID.value)
.count()
.withColumnRenamed("count", "recordcount")
.withColumn(DeviceIOIDS.ISTRANSFERRED.value, lit(0))
.withColumn(DeviceIOIDS.INSERTDATETIME.value, current_timestamp())
.withColumn(DeviceIOIDS.INSERTDATE.value, current_date())
.write
.mode("Append")
.mongo(
WriteConfig(
"mongo.dbname".getConfigValue,
"mongo.devicelocationtransferredstatus".getConfigValue
)
)
batchDF.unpersist()
} Thanks for the help
... View more
Labels:
- Labels:
-
Apache Spark