Reply
Highlighted
New Contributor
Posts: 1
Registered: ‎06-07-2017

How to resolve broken pipe issue in Hive streaming with Python?

I am facing Broken pipe issue in Hive while streaming larger number of rows (33000) into python from a single table. Same script works fine till 7656 number of rows.

 

 0: jdbc:hive2://xxx.xx.xxx.xxxx:10000> insert overwrite table test.transform_inner_temp select * from test.transform_inner_temp_view2 limit 7656;
INFO  : Table test.transform_inner_temp stats: [numFiles=1, numRows=7656, totalSize=4447368, rawDataSize=4439712]
No rows affected (19.867 seconds)

0: jdbc:hive2://xxx.xx.xxx.xxxx:10000> insert overwrite table test.transform_inner_temp select * from test.transform_inner_temp_view2;
INFO  : Status: Running (Executing on YARN cluster with App id application_xxxxxxxxxxxxx_xxxxx)

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.
        at org.apache.hadoop.hive.ql.exec.ScriptOperator.process(ScriptOperator.java:456)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:838)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:838)
        at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:133)
        at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:170)
        at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:555)
        ... 18 more
Caused by: java.io.IOException: Broken pipe
        at java.io.FileOutputStream.writeBytes(Native Method)
        at java.io.FileOutputStream.write(FileOutputStream.java:345)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.hadoop.hive.ql.exec.TextRecordWriter.write(TextRecordWriter.java:53)
        at org.apache.hadoop.hive.ql.exec.ScriptOperator.process(ScriptOperator.java:425)
        ... 24 more

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.
        at org.apache.hadoop.hive.ql.exec.ScriptOperator.process(ScriptOperator.java:456)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:838)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:838)
        at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:133)
        at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:170)
        at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:555)
        ... 18 more
Caused by: java.io.IOException: Broken pipe
        at java.io.FileOutputStream.writeBytes(Native Method)
        at java.io.FileOutputStream.write(FileOutputStream.java:345)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.hadoop.hive.ql.exec.TextRecordWriter.write(TextRecordWriter.java:53)
        at org.apache.hadoop.hive.ql.exec.ScriptOperator.process(ScriptOperator.java:425)
        ... 24 more
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_xxxxxxxxxxx_xxxxx_1_00 [Map 1] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 (state=08S01,code=2)

I could see same kind of issue only with join condition alone in below link.

https://community.hortonworks.com/questions/11025/hive-query-issue.html

I tried the solution which suggested in above link by setting hive.vectorized.execution.enabled=false; But issue is same in my case.

Also I could see same error in below link and selected columns from single table test.transform_inner_temp1 where join results already written.

 

https://stackoverflow.com/questions/13730119/hive-broken-pipe-error

 

If the script is wrong then it shouldn't work fine with smaller rows. Hope the script is fine and the issue is something with hive set up or memory set up issue. I have put lot of effort in internet but couldn't figure it out similar kind of issue. Please provide your suggestions/solutions.

 

Hive Code:

CREATE VIEW IF NOT EXISTS test.transform_inner_temp_view2
as
select * from 
(select transform (*)
USING "scl enable python27 'python TestPython.py'" 
as (Col_1     STRING,
col_2        STRING,
...
..
col_125 STRING
)
FROM
test.transform_inner_temp1 a) b;

Tried python script in three different ways as below. But issue is not resolved.

Script1:

 

#!/usr/bin/env python
'''
Created on June 2, 2017

@author: test
'''
import sys
from datetime import datetime
import decimal
import string
D = decimal.Decimal

while True:
    line = sys.stdin.readline()

    if not line:
        break
    line = string.strip(line, "\n ")
    outList = []
    TempList = line.strip().split('\t')
    col_1 = TempList[0]
    ... 
    ....
    col_125 = TempList[34] + TempList[32]

    outList.extend((col_1,....col_125))
    outValue = "\t".join(map(str,outList))
    print "%s"%(outValue)

Script2:

 

#!/usr/bin/env python
'''
Created on June 2, 2017

@author: test
'''
import sys
from datetime import datetime
import decimal
import string
D = decimal.Decimal

try:
    for line in sys.stdin:
    line = sys.stdin.readline()   
        TempList = line.strip().split('\t')
    col_1 = TempList[0]
        ... 
        ....
        col_125 = TempList[34] + TempList[32]

        outList.extend((col_1,....col_125))
        outValue = "\t".join(map(str,outList))
        print "%s"%(outValue)
except:
    print sys.exc_info()

 

Script 3:

 

#!/usr/bin/env python
'''
Created on June 2, 2017

@author: test
'''
import sys
from datetime import datetime
import decimal
import string
D = decimal.Decimal
for line in sys.stdin:
    line = sys.stdin.readline()   
    TempList = line.strip().split('\t')
    col_1 = TempList[0]
    ... 
    ....
    col_125 = TempList[34] + TempList[32]
    outList.extend((col_1,....col_125))
    outValue = "\t".join(map(str,outList))
    print "%s"%(outValue)

I am using Apache Hive (version 1.2.1000.2.5.3.0-37) with beeline. Thank in advance

 

Announcements