Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Not able to execute Python based Hadoop Streaming jobs

avatar
Expert Contributor

I have a 5 node hadoop cluster on which I can execute the following streaming job successfully

 

 sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/apat63_99.txt -output /foo1 -mapper 'wc -l'-numReduceTasks 0

But when I try to execute a streaming job using python

sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/apat63_99.txt -output /foo5 -mapper 'AttributeMax.py 8'-file '/tmp/AttributeMax.py'-numReduceTasks 1

I get an error

packageJobJar:[/tmp/AttributeMax.py,/tmp/hadoop-hdfs/hadoop-unjar2062240123197790813/][]/tmp/streamjob4074525553604040275.jar tmpDir=null14/08/2911:22:58 WARN mapred.JobClient:UseGenericOptionsParserfor parsing the arguments.Applications should implement Toolfor the same.14/08/2911:22:58 INFO mapred.FileInputFormat:Total input paths to process :114/08/2911:22:59 INFO streaming.StreamJob: getLocalDirs():[/tmp/hadoop-hdfs/mapred/local]14/08/2911:22:59 INFO streaming.StreamJob:Running job: job_201408272304_003014/08/2911:22:59 INFO streaming.StreamJob:To kill this job, run:14/08/2911:22:59 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=jt1:8021-kill job_201408272304_003014/08/2911:22:59 INFO streaming.StreamJob:Tracking URL: http://jt1:50030/jobdetails.jsp?jobid=job_201408272304_003014/08/2911:23:00 INFO streaming.StreamJob:  map 0%  reduce 0%14/08/2911:23:46 INFO streaming.StreamJob:  map 100%  reduce 100%14/08/2911:23:46 INFO streaming.StreamJob:To kill this job, run:14/08/2911:23:46 INFO streaming.StreamJob: UNDEF/bin/hadoop job  -Dmapred.job.tracker=jt1:8021-kill job_201408272304_003014/08/2911:23:46 INFO streaming.StreamJob:Tracking URL: http://jt1:50030/jobdetails.jsp?jobid=job_201408272304_003014/08/2911:23:46 ERROR streaming.StreamJob:Jobnot successful.Error: NA14/08/2911:23:46 INFO streaming.StreamJob: killJob...

In my job tracker console I see errors

java.io.IOException: log:null
R/W/S=2359/0/0in:NA [rec/s] out:NA [rec/s]minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=mapred
HADOOP_USER=null
last Hadoop input:|null|last tool output:|null|Date:FriAug2911:22:43 CDT 2014java.io.IOException:Broken pipe
    at java.io.FileOutputStream.writeBytes(NativeMethod)    at java.io.FileOutputStream.write(FileOutputStream.java:282)    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)    at java.io.DataOutputStream.write(DataOutputStream.java:90)    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:110)    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)    at org.apache.hadoop.streaming.Pipe

The python code itself is pretty simple

#!/usr/bin/env pythonimport sys
index = int(sys.argv[1])max =0for line in sys.stdin
    fields = line.strip().split(",")if fields[index].isdigit():        val = int(fields[index])if(val > max😞            max = val
        else:print max
1 ACCEPTED SOLUTION

avatar
Expert Contributor

I was able to solve the problem. I have to specify "python" as well in the mapper like

 

sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/cite75_99.txt -output /foo -mapper 'python RandomSample.py 10' -file RandomSale.py -numReduceTasks 1

 

View solution in original post

4 REPLIES 4

avatar
Rising Star
The "Broken pipe" suggests that the child Python process failed. I'd add logging statements in the Python code to log to a local file. You should also add a global catch-all, to log any fatal errors before exit.

> if fields[index].isdigit():

That is not safe. What if your input line doesn't have that many fields. Same for:

> val = int(fields[index])

It'll fail if that field cannot be converted to an int.

avatar
Expert Contributor

I changed my python code to 

 

#!/usr/bin/env python
import sys, random
file = open("/tmp/log.txt", "w")
for line in sys.stdin:
file.write("line: " + line + "\n")
file.close()

 

When i run my job, I see exactly the same error and the file /tmp/log.txt is not created on any machine. so I guess the script is not even being invoked I suppose.

avatar
Expert Contributor

I was able to solve the problem. I have to specify "python" as well in the mapper like

 

sudo -u hdfs hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar -input /sample/cite75_99.txt -output /foo -mapper 'python RandomSample.py 10' -file RandomSale.py -numReduceTasks 1

 

avatar
New Contributor

sudo -u hdfs hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.8.0.jar -input /user/root/in/purchases.txt -output /foo2 -mapper 'python mapper.py' -file mapper.py -numReduceTasks 1 -reducer 'python reducer.py' -file reducer.py