Support Questions

Find answers, ask questions, and share your expertise

Error closing output stream Warning

avatar

Hello,
we are performing Hive queries with PySpark using the HWC in JDBC_CLUSTER mode.

Everything is running fine and we get the results for the queries, but we also receive a warning message saying that connection has been closed:

23/08/16 09:59:05 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
23/08/16 09:59:05 WARN transport.TIOStreamTransport: Error closing output stream.
java.net.SocketException: Connection or outbound has closed
at sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1181)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)
at org.apache.thrift.transport.TSocket.close(TSocket.java:235)
at org.apache.thrift.transport.TSaslTransport.close(TSaslTransport.java:400)
at org.apache.thrift.transport.TSaslClientTransport.close(TSaslClientTransport.java:37)
at org.apache.hadoop.hive.metastore.security.TFilterTransport.close(TFilterTransport.java:52)
at org.apache.hive.jdbc.HiveConnection.close(HiveConnection.java:1153)
at org.apache.commons.dbcp2.DelegatingConnection.closeInternal(DelegatingConnection.java:239)
at org.apache.commons.dbcp2.PoolableConnection.reallyClose(PoolableConnection.java:232)
at org.apache.commons.dbcp2.PoolableConnectionFactory.destroyObject(PoolableConnectionFactory.java:367)
at org.apache.commons.pool2.impl.GenericObjectPool.destroy(GenericObjectPool.java:921)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:468)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:365)
at org.apache.commons.dbcp2.PoolingDataSource.getConnection(PoolingDataSource.java:134)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1563)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.getConnector(HS2JDBCWrapper.scala:481)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.getConnector(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.util.QueryExecutionUtil.getConnection(QueryExecutionUtil.java:96)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.getTableSchema(JdbcDataSourceReader.java:116)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.readSchema(JdbcDataSourceReader.java:128)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.<init>(JdbcDataSourceReader.java:72)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector.getDataSourceReader(HiveWarehouseConnector.java:72)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector.createReader(HiveWarehouseConnector.java:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:224)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:187)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl.executeJdbcInternal(HiveWarehouseSessionImpl.java:295)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl.sql(HiveWarehouseSessionImpl.java:159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)

Once thrown, execution continues and ends with no errors or missing data.

The spark-submit command is the following:

 

 

spark-submit --master yarn --driver-memory 1g --queue <queue_name> --conf spark.pyspark.python=/opt/venv/pdr/bin/python3.6 --conf spark.pyspark.driver.python=/opt/venv/pdr/bin/python3.6 --jars /opt/cloudera/parcels/CDH/jars/hive-warehouse-connector-assembly-1.0.0.7.1.7.1000-141.jar --py-files /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.7.1000-141.zip /home/<path_to_Python_script>/script.py

 

 

Configuration settings inside the Python script (script.py) are the following:

 

 

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .enableHiveSupport() \
        .appName(appname) \
        .config("spark.yarn.queue","<queue_name>") \
        .config("spark.datasource.hive.warehouse.read.via.llap","false") \
        .config("spark.sql.hive.hiveserver2.jdbc.url.principal","hive/_HOST@<domain>") \
        .config("spark.datasource.hive.warehouse.read.mode","JDBC_CLUSTER") \
        .config("spark.sql.extensions","com.hortonworks.spark.sql.rule.Extensions") \
        .config("hive.support.concurrency", "true") \
        .config("hive.enforce.bucketing","true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .config("hive.txn.manager","org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") \
        .config("hive.compactor.initiator.on", "true") \
        .config("hive.compactor.worker.threads","1") \
        .config("hive.tez.container.size", "12288") \
        .config("tez.queue.name","<queue_name>") \
        .config("mapred.job.queuename","<queue_name>") \
        .config("spark.executor.core",3) \
        .config("spark.executor.memory","6g") \
        .config("spark.shuffle.service.enabled","true") \
        .config("spark.dynamicAllocation.enabled","true") \
        .config("spark.dynamicAllocation.minExecutors",0) \
        .config("spark.dynamicAllocation.initialExecutors",1) \
        .config("spark.dynamicAllocation.maxExecutors",20) \
        .config("spark.kryo.registrator","com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator") \
     .config('spark.kryoserializer.buffer.max', '128m')\
    .config('spark.sql.autoBroadcastJoinThreshold', -1)\
.config("spark.sql.hive.hiveserver2.jdbc.url","jdbc:hive2://<hive2_jdbc_URL>:10000/default;tez.queue.name=<queue_name>;ssl=true") 

 

 

As said, script is correctly executed and results are returned.
Changing driver-memory and/or spark.executor.core / spark.executor.memory does not change the fact that the warning is still thrown.

Any idea?

 

Thank you,
Andrea

2 ACCEPTED SOLUTIONS

avatar
Master Collaborator

@AndreaCavenago Does this error appear every time we run this spark-submit command?

As this is a warning message, and it does not have any real impact, we can avoid it by changing the log level.

In the script.py file, add the following two lines:

from pyspark import SparkContext
SparkContext.setLogLevel("ERROR")

This will avoid the WARN message. But it will still be good to address the actual issue.

View solution in original post

avatar
Master Collaborator

@AndreaCavenago For that you will have to check if the connection is getting interrupted/closed between the client and hiveserver2. Without thorough log analysis, it will be difficult to answer that. Could you open a support case for the same?

View solution in original post

6 REPLIES 6

avatar
Master Collaborator

@AndreaCavenago Does this error appear every time we run this spark-submit command?

As this is a warning message, and it does not have any real impact, we can avoid it by changing the log level.

In the script.py file, add the following two lines:

from pyspark import SparkContext
SparkContext.setLogLevel("ERROR")

This will avoid the WARN message. But it will still be good to address the actual issue.

avatar

Thank you @smruti. Yes, it happens each time I run the spark-submit.

I will make a test with LogLevel set to ERROR, and keep looking for a solution.

 

Thank you,

Andrea

avatar
Community Manager

@AndreaCavenago, If @smruti's reply has helped resolve your issue, can you kindly mark the appropriate reply as the solution? It will make it easier for others to find the answer in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

Hello@VidyaSargur, the solution provided by @smruti hides the warning message, and I'm fine with that, but did not solve the issue that generate the message.

 

Thank you,

Andrea

avatar
Master Collaborator

@AndreaCavenago For that you will have to check if the connection is getting interrupted/closed between the client and hiveserver2. Without thorough log analysis, it will be difficult to answer that. Could you open a support case for the same?

avatar

Hello @smruti , unfortunately, I have limited access to the cluster (no CM access) and not able to create a support case at the moment. I suppose log analysis should be performed on HS2 log only, correct?

 

Thank you,

Andrea