Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Python streaming is not working

Highlighted

Python streaming is not working

New Contributor

Hi,

 

I have similar problem, my python code is working fine when I am running it locally using cat command, but it is not working fine when I am running it on Hadoop. Please find below my code, error, command to run the program and permission on my files:

 

Permission on my files:

 

-rwxrwxr-x 1 cloudera cloudera 690 Apr 20 14:15 flight_mapper.py
-rw-r--r-- 1 cloudera cloudera 2865221 Apr 19 08:21 flight_records.csv
-rwxrwxr-x 1 cloudera cloudera 501 Apr 19 13:39 flight_reducer.py
-rwxrwxrwx 1 cloudera cloudera 1349 Apr 21 06:20 framework.py

 

Framework.py code:

 

#! /usr/bin/env python
import os
import sys

from itertools import groupby
from operator import itemgetter

separator = "\t"

class Streaming(object):

@staticmethod
def GetJobConf(name):
name = name.replace(".","_").upper()
return os.environ.get(name)


def __init__(self,infile=sys.stdin,separator = separator):
self.infile = infile
self.sep = separator

def Status(self,message):
sys.stderr.write("reporter:status:{}\n".format(message))

def Counter(self,counter,amount=1,group="Python Streaming"):
msg = "reporter:counter:{0},{1},{2}\n".format(group,counter,amount)
sys.stderr.write(msg)

def Emit(self,key,value):
sys.stdout.write("{0}{1}{2}\n".format(key,self.sep,value))

def Read(self):
for line in self.infile:
yield line.rstrip()

def __iter__(self):
for line in self.Read():
yield line


class Mapper(Streaming):

def Map(self):
raise NotImplementedError("Mapper must implement a Map method")

class Reducer(Streaming):

def Reduce(self):
raise NotImplementedError("Reducer must implement a Reduce method")

def __iter__(self):
generator = (line.split(self.sep,1) for line in self.Read())
for item in groupby(generator,itemgetter(0)):
yield item

 

flight_mapper.py

 

 

#! /usr/bin/env python

import sys
import csv
from framework import Mapper

class FlightMapper(Mapper):
def __init__(self,infile=sys.stdin,separator='\t'):
super(FlightMapper,self).__init__(infile,separator)

def Map(self):
reader = csv.reader(self)
for row in reader:
if len(row[3].strip()) == 0:
continue
if len(row[6].strip()) == 0:
row[6] = 0
self.Emit(row[3],row[6])
else:
sys.stdout.write("{0}\t{1}\n").format(row[3],row[6])
self.Emit(row[3],row[6])

if __name__ == '__main__':
mapper = FlightMapper(sys.stdin)
mapper.Map()

 

flight_reducer.py:

 

#! /usr/bin/env python
import sys

from framework import Reducer
from itertools import groupby
from operator import itemgetter

class FlightReducer(Reducer):

def Reduce(self):
for key, val in self:
total = 0.0
count = 0
for item in val:
total += float(item[1])
count += 1
self.Emit(key,float(total)/float(count))

if __name__ == '__main__':
reducer = FlightReducer(sys.stdin)
reducer.Reduce()

 

Error log:

 

2017-04-21 06:34:14,341 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2017-04-21 06:34:14,411 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s).
2017-04-21 06:34:14,411 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started
2017-04-21 06:34:14,420 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens:
2017-04-21 06:34:14,420 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1492704251350_0012, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@492e5810)
2017-04-21 06:34:14,496 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now.
2017-04-21 06:34:14,761 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /yarn/nm/usercache/cloudera/appcache/application_1492704251350_0012
2017-04-21 06:34:15,329 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
2017-04-21 06:34:15,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1
2017-04-21 06:34:15,765 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
2017-04-21 06:34:15,955 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: hdfs://quickstart.cloudera:8020/user/cloudera/hadoop_practicals_input/flight_records.csv:1432610+1432611
2017-04-21 06:34:15,982 INFO [main] org.apache.hadoop.mapred.MapTask: numReduceTasks: 1
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 4194300(16777200)
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 16
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 13421773
2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 16777216
2017-04-21 06:34:15,997 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 4194300; length = 1048576
2017-04-21 06:34:16,000 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2017-04-21 06:34:16,010 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/python, ./flight_mapper.py]
2017-04-21 06:34:16,016 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
2017-04-21 06:34:16,016 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
2017-04-21 06:34:16,017 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
2017-04-21 06:34:16,018 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.cache.localFiles is deprecated. Instead, use mapreduce.job.cache.local.files
2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
2017-04-21 06:34:16,021 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
2017-04-21 06:34:16,022 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
2017-04-21 06:34:16,023 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
2017-04-21 06:34:16,024 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
2017-04-21 06:34:16,025 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: job.local.dir is deprecated. Instead, use mapreduce.job.local.dir
2017-04-21 06:34:16,025 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
2017-04-21 06:34:16,049 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,049 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,050 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,063 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done
2017-04-21 06:34:16,068 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1000/0/0 in:NA [rec/s] out:NA [rec/s]
2017-04-21 06:34:16,076 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1751/0/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 HOST=null
USER=cloudera
HADOOP_USER=null
last tool output: |null|

java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Stream closed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed.waitOutputThreads(): subprocess exited with code 1 in org.apache.hadoop.streaming.PipeMapRed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Stream closed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed.waitOutputThreads(): subprocess exited with code 1 in org.apache.hadoop.streaming.PipeMapRed
2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:java.io.IOException: Stream closed
2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Stream closed
at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434)
at java.io.OutputStream.write(OutputStream.java:116)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

2017-04-21 06:34:16,085 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task
2017-04-21 06:34:16,090 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete hdfs://quickstart.cloudera:8020/user/cloudera/average_delay/_temporary/1/_temporary/attempt_1492704251350_0012_m_000000_0
2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system...
2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped.
2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete.

 

 

Command:

 

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -Dstream.non.zero.exit.is.failure=false -input 'hadoop_practicals_input/flight_records.csv' -output average_delay -mapper 'python ./flight_mapper.py' -reducer 'python ./flight_reducer.py' -file ./flight_mapper.py -file ./flight_reducer.py -file ./framework.py

Don't have an account?
Coming from Hortonworks? Activate your account here