Created 04-19-2023 03:58 AM
Hi ,
I am trying to run a "alter table drop partition" command using spark sql which prints the metastore lost connection stack trace :
23/04/19 06:47:53 WARN metastore.RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect.
org.apache.thrift.transport.TTransportException: SASL authentication not complete
at org.apache.thrift.transport.TSaslTransport.write(TSaslTransport.java:472)
at org.apache.thrift.transport.TSaslClientTransport.write(TSaslClientTransport.java:37)
at org.apache.hadoop.hive.thrift.TFilterTransport.write(TFilterTransport.java:72)
at org.apache.thrift.protocol.TBinaryProtocol.writeI32(TBinaryProtocol.java:178)
at org.apache.thrift.protocol.TBinaryProtocol.writeMessageBegin(TBinaryProtocol.java:106)
at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:70)
at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.send_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2444)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2431)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1427)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:154)
at com.sun.proxy.$Proxy35.listPartitionsWithAuthInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2562)
at com.sun.proxy.$Proxy35.listPartitionsWithAuthInfo(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2700)
at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2726)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropPartitions$1$$anonfun$16.apply(HiveClientImpl.scala:568)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropPartitions$1$$anonfun$16.apply(HiveClientImpl.scala:563)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropPartitions$1.apply$mcV$sp(HiveClientImpl.scala:563)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropPartitions$1.apply(HiveClientImpl.scala:558)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$dropPartitions$1.apply(HiveClientImpl.scala:558)
at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:283)
at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:221)
at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:220)
at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:266)
at org.apache.spark.sql.hive.client.HiveClientImpl.dropPartitions(HiveClientImpl.scala:558)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$dropPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:1009)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$dropPartitions$1.apply(HiveExternalCatalog.scala:1007)
at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$dropPartitions$1.apply(HiveExternalCatalog.scala:1007)
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
at org.apache.spark.sql.hive.HiveExternalCatalog.dropPartitions(HiveExternalCatalog.scala:1007)
at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.dropPartitions(ExternalCatalogWithListener.scala:211)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.dropPartitions(SessionCatalog.scala:846)
at org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand.run(ddl.scala:545)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:651)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Below is the command :
spark.sql("alter table <table name> drop if exists partition(year= 2023)")
Created 04-19-2023 11:39 AM
@DataEngAa Welcome to the Cloudera Community!
To help you get the best possible solution, I have tagged our Hive experts @Shmoo and @mszurap who may be able to assist you further.
Please keep us updated on your post, and we hope you find a satisfactory solution to your query.
Regards,
Diana Torres,Created 04-20-2023 12:18 AM
Hi @DataEngAa ,
The stacktrace shows that the SparkSQL was trying to list the partitions first ("HiveMetaStoreClient.listPartitionsWithAuthInfo") when the connection was lost. The attached snippet does not show timing information, but most likely the request simply timed out after the predefined timeouts. Also likely the table you try to manipulate has lots of partitions (in Hive metastore, the partition directory count on hdfs is a different question)
The timeout is defined both on client (Spark) side and on server (Hive metastore) side. To increase the timeout to let it run for longer:
1. Set the "hive.metastore.client.socket.timeout=1800" in hive-site.xml for Hive service wide AND in the Hive gateway safety valves.
2. If this is CDP, set it on the HiveOnTez service side too - to let HS2 pick that value too.
3. Start your spark application with
--conf spark.hadoop.hive.metastore.client.socket.timeout=1800
The above increases the timeouts to 30 mintutes from the default 5 minutes, which is usually too low for big tables.
Best regards
Miklos
Created on 04-20-2023 01:10 AM - edited 04-20-2023 01:23 AM
Thank you @mszurap for you response . I tried the suggested work around already and it seems like the issue still persists .
I agree the table has lot of partitions but I am pretty sure the code times out before 5 mins .
I have also tried enforcing the hive-site.xml with the updated timeout which also did not help much.
Only thing which worked was adding spark.catalog.recoverPartitions(table) before issuing the drop partition command . I am really not sure as why recovering the partitions in the catalog eliminated the metastore warning .
Below is the updated code which is working without any warning :
spark.sql.catalog.recoverPartitions(orders)
spark.sql("alter table orders drop if exists partition(year=2023)")
data.write.mode('Overwrite').parquet(hdfsPath)
Any help here in understanding the problem will be much appreciated .