Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Error closing output stream Warning

avatar

Hello,
we are performing Hive queries with PySpark using the HWC in JDBC_CLUSTER mode.

Everything is running fine and we get the results for the queries, but we also receive a warning message saying that connection has been closed:

23/08/16 09:59:05 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
23/08/16 09:59:05 WARN transport.TIOStreamTransport: Error closing output stream.
java.net.SocketException: Connection or outbound has closed
at sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1181)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.close(FilterOutputStream.java:158)
at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110)
at org.apache.thrift.transport.TSocket.close(TSocket.java:235)
at org.apache.thrift.transport.TSaslTransport.close(TSaslTransport.java:400)
at org.apache.thrift.transport.TSaslClientTransport.close(TSaslClientTransport.java:37)
at org.apache.hadoop.hive.metastore.security.TFilterTransport.close(TFilterTransport.java:52)
at org.apache.hive.jdbc.HiveConnection.close(HiveConnection.java:1153)
at org.apache.commons.dbcp2.DelegatingConnection.closeInternal(DelegatingConnection.java:239)
at org.apache.commons.dbcp2.PoolableConnection.reallyClose(PoolableConnection.java:232)
at org.apache.commons.dbcp2.PoolableConnectionFactory.destroyObject(PoolableConnectionFactory.java:367)
at org.apache.commons.pool2.impl.GenericObjectPool.destroy(GenericObjectPool.java:921)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:468)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:365)
at org.apache.commons.dbcp2.PoolingDataSource.getConnection(PoolingDataSource.java:134)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1563)
at com.hortonworks.spark.sql.hive.llap.JDBCWrapper.getConnector(HS2JDBCWrapper.scala:481)
at com.hortonworks.spark.sql.hive.llap.DefaultJDBCWrapper.getConnector(HS2JDBCWrapper.scala)
at com.hortonworks.spark.sql.hive.llap.util.QueryExecutionUtil.getConnection(QueryExecutionUtil.java:96)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.getTableSchema(JdbcDataSourceReader.java:116)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.readSchema(JdbcDataSourceReader.java:128)
at com.hortonworks.spark.sql.hive.llap.JdbcDataSourceReader.<init>(JdbcDataSourceReader.java:72)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector.getDataSourceReader(HiveWarehouseConnector.java:72)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector.createReader(HiveWarehouseConnector.java:40)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$SourceHelpers.createReader(DataSourceV2Relation.scala:161)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.create(DataSourceV2Relation.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:224)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:187)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl.executeJdbcInternal(HiveWarehouseSessionImpl.java:295)
at com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl.sql(HiveWarehouseSessionImpl.java:159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)

Once thrown, execution continues and ends with no errors or missing data.

The spark-submit command is the following:

 

 

spark-submit --master yarn --driver-memory 1g --queue <queue_name> --conf spark.pyspark.python=/opt/venv/pdr/bin/python3.6 --conf spark.pyspark.driver.python=/opt/venv/pdr/bin/python3.6 --jars /opt/cloudera/parcels/CDH/jars/hive-warehouse-connector-assembly-1.0.0.7.1.7.1000-141.jar --py-files /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/pyspark_hwc-1.0.0.7.1.7.1000-141.zip /home/<path_to_Python_script>/script.py

 

 

Configuration settings inside the Python script (script.py) are the following:

 

 

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .enableHiveSupport() \
        .appName(appname) \
        .config("spark.yarn.queue","<queue_name>") \
        .config("spark.datasource.hive.warehouse.read.via.llap","false") \
        .config("spark.sql.hive.hiveserver2.jdbc.url.principal","hive/_HOST@<domain>") \
        .config("spark.datasource.hive.warehouse.read.mode","JDBC_CLUSTER") \
        .config("spark.sql.extensions","com.hortonworks.spark.sql.rule.Extensions") \
        .config("hive.support.concurrency", "true") \
        .config("hive.enforce.bucketing","true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .config("hive.txn.manager","org.apache.hadoop.hive.ql.lockmgr.DbTxnManager") \
        .config("hive.compactor.initiator.on", "true") \
        .config("hive.compactor.worker.threads","1") \
        .config("hive.tez.container.size", "12288") \
        .config("tez.queue.name","<queue_name>") \
        .config("mapred.job.queuename","<queue_name>") \
        .config("spark.executor.core",3) \
        .config("spark.executor.memory","6g") \
        .config("spark.shuffle.service.enabled","true") \
        .config("spark.dynamicAllocation.enabled","true") \
        .config("spark.dynamicAllocation.minExecutors",0) \
        .config("spark.dynamicAllocation.initialExecutors",1) \
        .config("spark.dynamicAllocation.maxExecutors",20) \
        .config("spark.kryo.registrator","com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator") \
     .config('spark.kryoserializer.buffer.max', '128m')\
    .config('spark.sql.autoBroadcastJoinThreshold', -1)\
.config("spark.sql.hive.hiveserver2.jdbc.url","jdbc:hive2://<hive2_jdbc_URL>:10000/default;tez.queue.name=<queue_name>;ssl=true") 

 

 

As said, script is correctly executed and results are returned.
Changing driver-memory and/or spark.executor.core / spark.executor.memory does not change the fact that the warning is still thrown.

Any idea?

 

Thank you,
Andrea

2 ACCEPTED SOLUTIONS

avatar
Master Collaborator
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar
Master Collaborator
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login
6 REPLIES 6

avatar
Master Collaborator
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar

Thank you @smruti. Yes, it happens each time I run the spark-submit.

I will make a test with LogLevel set to ERROR, and keep looking for a solution.

 

Thank you,

Andrea

avatar
Community Manager

@AndreaCavenago, If @smruti's reply has helped resolve your issue, can you kindly mark the appropriate reply as the solution? It will make it easier for others to find the answer in the future. 



Regards,

Vidya Sargur,
Community Manager


Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.
Learn more about the Cloudera Community:

avatar

Hello@VidyaSargur, the solution provided by @smruti hides the warning message, and I'm fine with that, but did not solve the issue that generate the message.

 

Thank you,

Andrea

avatar
Master Collaborator
hide-solution

This problem has been solved!

Want to get a detailed solution you have to login/registered on the community

Register/Login

avatar

Hello @smruti , unfortunately, I have limited access to the cluster (no CM access) and not able to create a support case at the moment. I suppose log analysis should be performed on HS2 log only, correct?

 

Thank you,

Andrea