Created 08-16-2023 02:08 AM
Hello,
we are performing Hive queries with PySpark using the HWC in JDBC_CLUSTER mode.
Everything is running fine and we get the results for the queries, but we also receive a warning message saying that connection has been closed:
23/08/16 09:59:05 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
23/08/16 09:59:05 WARN transport.TIOStreamTransport: Error closing output stream.
java.net.SocketException: Connection or outbound has closed
at sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1181)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)
at org.apache.thrift.transport.TSocket.close(TSocket.java:235)
at org.apache.thrift.transport.TSaslTransport.close(TSaslTransport.java:400)
at org.apache.thrift.transport.TSaslClientTransport.close(TSaslClientTransport.java:37)
at org.apache.hadoop.hive.metastore.security.TFilterTransport.close(TFilterTransport.java:52)
at org.apache.hive.jdbc.HiveConnection.close(HiveConnection.java:1153)
at org.apache.commons.dbcp2.DelegatingConnection.closeInternal(DelegatingConnection.java:239)
at org.apache.commons.dbcp2.PoolableConnection.reallyClose(PoolableConnection.java:232)
at org.apache.commons.dbcp2.PoolableConnectionFactory.destroyObject(PoolableConnectionFactory.java:367)
at org.apache.commons.pool2.impl.GenericObjectPool.destroy(GenericObjectPool.java:921)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:468)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:365)
at org.apache.commons.dbcp2.PoolingDataSource.getConnection(PoolingDataSource.java:134)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1563)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.getConnector(HS2JDBCWrapper.scala:481)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.getConnector(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.util.QueryExecutionUtil.getConnection(QueryExecutionUtil.java:96)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.getTableSchema(JdbcDataSourceReader.java:116)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.readSchema(JdbcDataSourceReader.java:128)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.<init>(JdbcDataSourceReader.java:72)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector.getDataSourceReader(HiveWarehouseConnector.java:72)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector.createReader(HiveWarehouseConnector.java:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:224)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:187)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl.executeJdbcInternal(HiveWarehouseSessionImpl.java:295)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl.sql(HiveWarehouseSessionImpl.java:159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Once thrown, execution continues and ends with no errors or missing data.
The spark-submit command is the following:
spark-submit --master yarn --driver-memory 1g --queue <queue_name> --conf spark.pyspark.python=/opt/venv/pdr/bin/python3.6 --conf spark.pyspark.driver.python=/opt/venv/pdr/bin/python3.6 --jars /opt/cloudera/parcels/CDH/jars/hive-warehouse-connector-assembly-1.0.0.7.1.7.1000-141.jar --py-files /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.7.1000-141.zip /home/<path_to_Python_script>/script.py
Configuration settings inside the Python script (script.py) are the following:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.enableHiveSupport() \
.appName(appname) \
.config("spark.yarn.queue","<queue_name>") \
.config("spark.datasource.hive.warehouse.read.via.llap","false") \
.config("spark.sql.hive.hiveserver2.jdbc.url.principal","hive/_HOST@<domain>") \
.config("spark.datasource.hive.warehouse.read.mode","JDBC_CLUSTER") \
.config("spark.sql.extensions","com.hortonworks.spark.sql.rule.Extensions") \
.config("hive.support.concurrency", "true") \
.config("hive.enforce.bucketing","true") \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.config("hive.txn.manager","org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") \
.config("hive.compactor.initiator.on", "true") \
.config("hive.compactor.worker.threads","1") \
.config("hive.tez.container.size", "12288") \
.config("tez.queue.name","<queue_name>") \
.config("mapred.job.queuename","<queue_name>") \
.config("spark.executor.core",3) \
.config("spark.executor.memory","6g") \
.config("spark.shuffle.service.enabled","true") \
.config("spark.dynamicAllocation.enabled","true") \
.config("spark.dynamicAllocation.minExecutors",0) \
.config("spark.dynamicAllocation.initialExecutors",1) \
.config("spark.dynamicAllocation.maxExecutors",20) \
.config("spark.kryo.registrator","com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator") \
.config('spark.kryoserializer.buffer.max', '128m')\
.config('spark.sql.autoBroadcastJoinThreshold', -1)\
.config("spark.sql.hive.hiveserver2.jdbc.url","jdbc:hive2://<hive2_jdbc_URL>:10000/default;tez.queue.name=<queue_name>;ssl=true")
As said, script is correctly executed and results are returned.
Changing driver-memory and/or spark.executor.core / spark.executor.memory does not change the fact that the warning is still thrown.
Any idea?
Thank you,
Andrea
Created 08-16-2023 01:06 PM
@AndreaCavenago Does this error appear every time we run this spark-submit command?
As this is a warning message, and it does not have any real impact, we can avoid it by changing the log level.
In the script.py file, add the following two lines:
from pyspark import SparkContext
SparkContext.setLogLevel("ERROR")
This will avoid the WARN message. But it will still be good to address the actual issue.
Created 08-29-2023 03:56 AM
@AndreaCavenago For that you will have to check if the connection is getting interrupted/closed between the client and hiveserver2. Without thorough log analysis, it will be difficult to answer that. Could you open a support case for the same?
Created 08-16-2023 01:06 PM
@AndreaCavenago Does this error appear every time we run this spark-submit command?
As this is a warning message, and it does not have any real impact, we can avoid it by changing the log level.
In the script.py file, add the following two lines:
from pyspark import SparkContext
SparkContext.setLogLevel("ERROR")
This will avoid the WARN message. But it will still be good to address the actual issue.
Created 08-17-2023 03:29 AM
Thank you @smruti. Yes, it happens each time I run the spark-submit.
I will make a test with LogLevel set to ERROR, and keep looking for a solution.
Thank you,
Andrea
Created 08-21-2023 03:22 AM
@AndreaCavenago, If @smruti's reply has helped resolve your issue, can you kindly mark the appropriate reply as the solution? It will make it easier for others to find the answer in the future.
Regards,
Vidya Sargur,Created 08-29-2023 12:34 AM
Hello@VidyaSargur, the solution provided by @smruti hides the warning message, and I'm fine with that, but did not solve the issue that generate the message.
Thank you,
Andrea
Created 08-29-2023 03:56 AM
@AndreaCavenago For that you will have to check if the connection is getting interrupted/closed between the client and hiveserver2. Without thorough log analysis, it will be difficult to answer that. Could you open a support case for the same?
Created 08-31-2023 02:11 AM
Hello @smruti , unfortunately, I have limited access to the cluster (no CM access) and not able to create a support case at the moment. I suppose log analysis should be performed on HS2 log only, correct?
Thank you,
Andrea