Reply
Highlighted
New Contributor
Posts: 4
Registered: ‎04-19-2017

Python streaming is not working

Hi,

 

I have similar problem, my python code is working fine when I am running it locally using cat command, but it is not working fine when I am running it on Hadoop. Please find below my code, error, command to run the program and permission on my files:

 

Permission on my files:

 

-rwxrwxr-x 1 cloudera cloudera 690 Apr 20 14:15 flight_mapper.py
-rw-r--r-- 1 cloudera cloudera 2865221 Apr 19 08:21 flight_records.csv
-rwxrwxr-x 1 cloudera cloudera 501 Apr 19 13:39 flight_reducer.py
-rwxrwxrwx 1 cloudera cloudera 1349 Apr 21 06:20 framework.py

 

Framework.py code:

 

#! /usr/bin/env python
import os
import sys

from itertools import groupby
from operator import itemgetter

separator = "\t"

class Streaming(object):

@staticmethod
def GetJobConf(name):
name = name.replace(".","_").upper()
return os.environ.get(name)


def __init__(self,infile=sys.stdin,separator = separator):
self.infile = infile
self.sep = separator

def Status(self,message):
sys.stderr.write("reporter:status:{}\n".format(message))

def Counter(self,counter,amount=1,group="Python Streaming"):
msg = "reporter:counter:{0},{1},{2}\n".format(group,counter,amount)
sys.stderr.write(msg)

def Emit(self,key,value):
sys.stdout.write("{0}{1}{2}\n".format(key,self.sep,value))

def Read(self):
for line in self.infile:
yield line.rstrip()

def __iter__(self):
for line in self.Read():
yield line


class Mapper(Streaming):

def Map(self):
raise NotImplementedError("Mapper must implement a Map method")

class Reducer(Streaming):

def Reduce(self):
raise NotImplementedError("Reducer must implement a Reduce method")

def __iter__(self):
generator = (line.split(self.sep,1) for line in self.Read())
for item in groupby(generator,itemgetter(0)):
yield item

 

flight_mapper.py

 

 

#! /usr/bin/env python

import sys
import csv
from framework import Mapper

class FlightMapper(Mapper):
def __init__(self,infile=sys.stdin,separator='\t'):
super(FlightMapper,self).__init__(infile,separator)

def Map(self):
reader = csv.reader(self)
for row in reader:
if len(row[3].strip()) == 0:
continue
if len(row[6].strip()) == 0:
row[6] = 0
self.Emit(row[3],row[6])
else:
sys.stdout.write("{0}\t{1}\n").format(row[3],row[6])
self.Emit(row[3],row[6])

if __name__ == '__main__':
mapper = FlightMapper(sys.stdin)
mapper.Map()

 

flight_reducer.py:

 

#! /usr/bin/env python
import sys

from framework import Reducer
from itertools import groupby
from operator import itemgetter

class FlightReducer(Reducer):

def Reduce(self):
for key, val in self:
total = 0.0
count = 0
for item in val:
total += float(item[1])
count += 1
self.Emit(key,float(total)/float(count))

if __name__ == '__main__':
reducer = FlightReducer(sys.stdin)
reducer.Reduce()

 

Error log:

 

2017-04-21 06:34:14,341 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2017-04-21 06:34:14,411 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2017-04-21 06:34:14,411 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2017-04-21 06:34:14,420 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2017-04-21 06:34:14,420 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1492704251350_0012, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@492e5810)
2017-04-21 06:34:14,496 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2017-04-21 06:34:14,761 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /yarn/nm/usercache/cloudera/appcache/application_1492704251350_0012
2017-04-21 06:34:15,329 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2017-04-21 06:34:15,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1
2017-04-21 06:34:15,765 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
2017-04-21 06:34:15,955 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: hdfs://quickstart.cloudera:8020/user/cloudera/hadoop_practicals_input/flight_records.csv:1432610+1432611
2017-04-21 06:34:15,982 INFO [main] org.apache.hadoop.mapred.MapTask: numReduceTasks: 1
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 4194300(16777200)
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 16
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 13421773
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 16777216
2017-04-21 06:34:15,997 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 4194300; length = 1048576
2017-04-21 06:34:16,000 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2017-04-21 06:34:16,010 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/python, ./flight_mapper.py]
2017-04-21 06:34:16,016 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
2017-04-21 06:34:16,016 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
2017-04-21 06:34:16,017 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2017-04-21 06:34:16,018 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.cache.localFiles is deprecated. Instead, use mapreduce.job.cache.local.files
2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
2017-04-21 06:34:16,021 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
2017-04-21 06:34:16,022 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
2017-04-21 06:34:16,023 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
2017-04-21 06:34:16,024 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
2017-04-21 06:34:16,025 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: job.local.dir is deprecated. Instead, use mapreduce.job.local.dir
2017-04-21 06:34:16,025 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
2017-04-21 06:34:16,049 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,049 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,050 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,063 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2017-04-21 06:34:16,068 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1000/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,076 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1751/0/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 HOST=null
USER=cloudera
HADOOP_USER=null
last tool output: |null|

java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Stream closed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed.waitOutputThreads(): subprocess exited with code 1 in org.apache.hadoop.streaming.PipeMapRed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Stream closed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed.waitOutputThreads(): subprocess exited with code 1 in org.apache.hadoop.streaming.PipeMapRed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:java.io.IOException: Stream closed
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

2017-04-21 06:34:16,085 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2017-04-21 06:34:16,090 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete hdfs://quickstart.cloudera:8020/user/cloudera/average_delay/_temporary/1/_temporary/attempt_1492704251350_0012_m_000000_0
2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system...
2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped.
2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete.

 

 

Command:

 

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -Dstream.non.zero.exit.is.failure=false -input 'hadoop_practicals_input/flight_records.csv' -output average_delay -mapper 'python ./flight_mapper.py' -reducer 'python ./flight_reducer.py' -file ./flight_mapper.py -file ./flight_reducer.py -file ./framework.py

Announcements