Support Questions

Find answers, ask questions, and share your expertise

Not able to execute Python based Hadoop Streaming jobs

avatar
Expert Contributor

I have a 5 node hadoop cluster on which I can execute the following streaming job successfully

 

 sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/apat63_99.txt -output /foo1 -mapper 'wc -l'-numReduceTasks 0

But when I try to execute a streaming job using python

sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/apat63_99.txt -output /foo5 -mapper 'AttributeMax.py 8'-file '/tmp/AttributeMax.py'-numReduceTasks 1

I get an error

packageJobJar:[/tmp/AttributeMax.py,/tmp/hadoop-hdfs/hadoop-unjar2062240123197790813/][]/tmp/streamjob4074525553604040275.jar tmpDir=null14/08/2911:22:58 WARN mapred.JobClient:UseGenericOptionsParserfor parsing the arguments.Applications should implement Toolfor the same.14/08/2911:22:58 INFO mapred.FileInputFormat:Total input paths to process :114/08/2911:22:59 INFO streaming.StreamJob: getLocalDirs():[/tmp/hadoop-hdfs/mapred/local]14/08/2911:22:59 INFO streaming.StreamJob:Running job: job_201408272304_003014/08/2911:22:59 INFO streaming.StreamJob:To kill this job, run:14/08/2911:22:59 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=jt1:8021-kill job_201408272304_003014/08/2911:22:59 INFO streaming.StreamJob:Tracking URL: http://jt1:50030/jobdetails.jsp?jobid=job_201408272304_003014/08/2911:23:00 INFO streaming.StreamJob:  map 0%  reduce 0%14/08/2911:23:46 INFO streaming.StreamJob:  map 100%  reduce 100%14/08/2911:23:46 INFO streaming.StreamJob:To kill this job, run:14/08/2911:23:46 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=jt1:8021-kill job_201408272304_003014/08/2911:23:46 INFO streaming.StreamJob:Tracking URL: http://jt1:50030/jobdetails.jsp?jobid=job_201408272304_003014/08/2911:23:46 ERROR streaming.StreamJob:Jobnot successful.Error: NA14/08/2911:23:46 INFO streaming.StreamJob: killJob...

In my job tracker console I see errors

java.io.IOException: log:null
R/W/S=2359/0/0in:NA [rec/s] out:NA [rec/s]minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=mapred
HADOOP_USER=null
last Hadoop input:|null|last tool output:|null|Date:FriAug2911:22:43 CDT 2014java.io.IOException:Broken pipe
    at java.io.FileOutputStream.writeBytes(NativeMethod)    at java.io.FileOutputStream.write(FileOutputStream.java:282)    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)    at java.io.DataOutputStream.write(DataOutputStream.java:90)    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:110)    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)    at org.apache.hadoop.streaming.Pipe

The python code itself is pretty simple

#!/usr/bin/env pythonimport sys
index = int(sys.argv[1])max =0for line in sys.stdin
    fields = line.strip().split(",")if fields[index].isdigit():        val = int(fields[index])if(val > max😞            max = val
        else:print max
1 ACCEPTED SOLUTION

avatar
Expert Contributor

I was able to solve the problem. I have to specify "python" as well in the mapper like

 

sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/cite75_99.txt -output /foo -mapper 'python RandomSample.py 10' -file RandomSale.py -numReduceTasks 1

 

View solution in original post

4 REPLIES 4

avatar
Rising Star
The "Broken pipe" suggests that the child Python process failed. I'd add logging statements in the Python code to log to a local file. You should also add a global catch-all, to log any fatal errors before exit.

> if fields[index].isdigit():

That is not safe. What if your input line doesn't have that many fields. Same for:

> val = int(fields[index])

It'll fail if that field cannot be converted to an int.

avatar
Expert Contributor

I changed my python code to 

 

#!/usr/bin/env python
import sys, random
file = open("/tmp/log.txt", "w")
for line in sys.stdin:
file.write("line: " + line + "\n")
file.close()

 

When i run my job, I see exactly the same error and the file /tmp/log.txt is not created on any machine. so I guess the script is not even being invoked I suppose.

avatar
Expert Contributor

I was able to solve the problem. I have to specify "python" as well in the mapper like

 

sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/cite75_99.txt -output /foo -mapper 'python RandomSample.py 10' -file RandomSale.py -numReduceTasks 1

 

avatar
New Contributor

sudo -u hdfs hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.8.0.jar -input /user/root/in/purchases.txt -output /foo2 -mapper 'python mapper.py' -file mapper.py -numReduceTasks 1 -reducer 'python reducer.py' -file reducer.py