Support Questions

Find answers, ask questions, and share your expertise

How to debug a SQL query that works using a spark Jupyter Notebook, but fails when executed from Livy?

avatar
Explorer

I have a Spark sql query that works when I execute from inside a Jupyter Notebook that has a a PySpark kernel but fails when I execute it submitting to a Livy session. Usually there's no difference when I execute my queries both ways.

 

I tried to get the spark session parameters with the command below and to guarantee that they are both the same:

spark.sparkContext.getConf().getAll() 

 

I'm using spark 2.3. How can I debug this problem? I know that the query works using spark, but I can't make it work submitting with Livy.

 

Here is the query:

INSERT INTO sbx_xxxxx.dados_auxiliares_mediana_preco

    select

        p.ano, p.cod_cfi, p.preco, p.repeticoes, sum(p2.repeticoes) as qtd_antes

    from sbx_operacoes_digitais.dados_auxiliares_moda_preco p

    left join sbx_operacoes_digitais.dados_auxiliares_moda_preco p2

        on p.ano = p2.ano and p.cod_cfi = p2.cod_cfi and p2.preco <= p.preco

    group by p.ano, p.cod_cfi, p.preco, p.repeticoes

 

Here are the stack trace of the query returned by Livy:

An error occurred while calling o99.sql.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:115)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:75)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange(coordinator id: 1868588554) hashpartitioning(ano#100, cod_cfi#101, 2001), coordinator[target post-shuffle partition size: 67108864]
+- *(1) FileScan orc sbx_operacoes_digitais.dados_auxiliares_moda_preco[ano#100,cod_cfi#101,preco#103,repeticoes#104] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://BNDOOP03/corporativo/sbx_operacoes_digitais/dados_auxiliares_moda_preco_..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<ano:int,cod_cfi:int,preco:decimal(14,2),repeticoes:decimal(14,2)>

at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
... 23 more
Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 58 more

It looks like that the important part is a failed assertion, but It does not gives me any useful info:

Caused by: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:156)
at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)

 

Any help to teach me how to debut it is greatly appreciated

1 ACCEPTED SOLUTION

avatar
Explorer

I solved the problem turning off this option

("spark.sql.adaptive.enabled", "true")

View solution in original post

2 REPLIES 2

avatar
Explorer

Now I'm trying to check the session configuration setting the property:

'spark.logConf' = "true"

 

I believe that setting it "true" would make the session properties be saved in spark log and that I'd be able to check them issuing the command


yarn logs -applicationId application_1660776720083_9876 > yarn.log

 

but I can't find the session values in my yarn.log file. How would it be displayed? Am I doing something wrong? I'm using spark 2.3.4.

avatar
Explorer

I solved the problem turning off this option

("spark.sql.adaptive.enabled", "true")