Member since
04-19-2017
4
Posts
0
Kudos Received
0
Solutions
04-21-2017
06:50 AM
Hi, I have similar problem, my python code is working fine when I am running it locally using cat command, but it is not working fine when I am running it on Hadoop. Please find below my code, error, command to run the program and permission on my files: Permission on my files: -rwxrwxr-x 1 cloudera cloudera 690 Apr 20 14:15 flight_mapper.py -rw-r--r-- 1 cloudera cloudera 2865221 Apr 19 08:21 flight_records.csv -rwxrwxr-x 1 cloudera cloudera 501 Apr 19 13:39 flight_reducer.py -rwxrwxrwx 1 cloudera cloudera 1349 Apr 21 06:20 framework.py Framework.py code: #! /usr/bin/env python import os import sys from itertools import groupby from operator import itemgetter separator = "\t" class Streaming(object): @staticmethod def GetJobConf(name): name = name.replace(".","_").upper() return os.environ.get(name) def __init__(self,infile=sys.stdin,separator = separator): self.infile = infile self.sep = separator def Status(self,message): sys.stderr.write("reporter:status:{}\n".format(message)) def Counter(self,counter,amount=1,group="Python Streaming"): msg = "reporter:counter:{0},{1},{2}\n".format(group,counter,amount) sys.stderr.write(msg) def Emit(self,key,value): sys.stdout.write("{0}{1}{2}\n".format(key,self.sep,value)) def Read(self): for line in self.infile: yield line.rstrip() def __iter__(self): for line in self.Read(): yield line class Mapper(Streaming): def Map(self): raise NotImplementedError("Mapper must implement a Map method") class Reducer(Streaming): def Reduce(self): raise NotImplementedError("Reducer must implement a Reduce method") def __iter__(self): generator = (line.split(self.sep,1) for line in self.Read()) for item in groupby(generator,itemgetter(0)): yield item flight_mapper.py #! /usr/bin/env python import sys import csv from framework import Mapper class FlightMapper(Mapper): def __init__(self,infile=sys.stdin,separator='\t'): super(FlightMapper,self).__init__(infile,separator) def Map(self): reader = csv.reader(self) for row in reader: if len(row[3].strip()) == 0: continue if len(row[6].strip()) == 0: row[6] = 0 self.Emit(row[3],row[6]) else: sys.stdout.write("{0}\t{1}\n").format(row[3],row[6]) self.Emit(row[3],row[6]) if __name__ == '__main__': mapper = FlightMapper(sys.stdin) mapper.Map() flight_reducer.py: #! /usr/bin/env python import sys from framework import Reducer from itertools import groupby from operator import itemgetter class FlightReducer(Reducer): def Reduce(self): for key, val in self: total = 0.0 count = 0 for item in val: total += float(item[1]) count += 1 self.Emit(key,float(total)/float(count)) if __name__ == '__main__': reducer = FlightReducer(sys.stdin) reducer.Reduce() Error log: 2017-04-21 06:34:14,341 INFO [main] org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties from hadoop-metrics2.properties 2017-04-21 06:34:14,411 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at 10 second(s). 2017-04-21 06:34:14,411 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system started 2017-04-21 06:34:14,420 INFO [main] org.apache.hadoop.mapred.YarnChild: Executing with tokens: 2017-04-21 06:34:14,420 INFO [main] org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service: job_1492704251350_0012, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@492e5810) 2017-04-21 06:34:14,496 INFO [main] org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Got null now. 2017-04-21 06:34:14,761 INFO [main] org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child: /yarn/nm/usercache/cloudera/appcache/application_1492704251350_0012 2017-04-21 06:34:15,329 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 2017-04-21 06:34:15,751 INFO [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: File Output Committer Algorithm version is 1 2017-04-21 06:34:15,765 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ] 2017-04-21 06:34:15,955 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: hdfs://quickstart.cloudera:8020/user/cloudera/hadoop_practicals_input/flight_records.csv:1432610+1432611 2017-04-21 06:34:15,982 INFO [main] org.apache.hadoop.mapred.MapTask: numReduceTasks: 1 2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 4194300(16777200) 2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 16 2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 13421773 2017-04-21 06:34:15,996 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 16777216 2017-04-21 06:34:15,997 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 4194300; length = 1048576 2017-04-21 06:34:16,000 INFO [main] org.apache.hadoop.mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 2017-04-21 06:34:16,010 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed exec [/usr/bin/python, ./flight_mapper.py] 2017-04-21 06:34:16,016 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 2017-04-21 06:34:16,016 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 2017-04-21 06:34:16,017 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords 2017-04-21 06:34:16,018 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir 2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file 2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.cache.localFiles is deprecated. Instead, use mapreduce.job.cache.local.files 2017-04-21 06:34:16,020 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 2017-04-21 06:34:16,021 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start 2017-04-21 06:34:16,022 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 2017-04-21 06:34:16,023 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 2017-04-21 06:34:16,024 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length 2017-04-21 06:34:16,025 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: job.local.dir is deprecated. Instead, use mapreduce.job.local.dir 2017-04-21 06:34:16,025 INFO [main] org.apache.hadoop.conf.Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir 2017-04-21 06:34:16,049 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s] 2017-04-21 06:34:16,049 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s] 2017-04-21 06:34:16,050 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100/0/0 in:NA [rec/s] out:NA [rec/s] 2017-04-21 06:34:16,063 INFO [Thread-14] org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done 2017-04-21 06:34:16,068 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1000/0/0 in:NA [rec/s] out:NA [rec/s] 2017-04-21 06:34:16,076 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=1751/0/0 in:NA [rec/s] out:NA [rec/s] minRecWrittenToEnableSkip_=9223372036854775807 HOST=null USER=cloudera HADOOP_USER=null last tool output: |null| java.io.IOException: Stream closed at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434) at java.io.OutputStream.write(OutputStream.java:116) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Stream closed 2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed.waitOutputThreads(): subprocess exited with code 1 in org.apache.hadoop.streaming.PipeMapRed 2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished 2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Stream closed 2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: PipeMapRed.waitOutputThreads(): subprocess exited with code 1 in org.apache.hadoop.streaming.PipeMapRed 2017-04-21 06:34:16,079 INFO [main] org.apache.hadoop.streaming.PipeMapRed: mapRedFinished 2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:java.io.IOException: Stream closed 2017-04-21 06:34:16,079 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Stream closed at java.lang.ProcessBuilder$NullOutputStream.write(ProcessBuilder.java:434) at java.io.OutputStream.write(OutputStream.java:116) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 2017-04-21 06:34:16,085 INFO [main] org.apache.hadoop.mapred.Task: Runnning cleanup for the task 2017-04-21 06:34:16,090 WARN [main] org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter: Could not delete hdfs://quickstart.cloudera:8020/user/cloudera/average_delay/_temporary/1/_temporary/attempt_1492704251350_0012_m_000000_0 2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Stopping MapTask metrics system... 2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system stopped. 2017-04-21 06:34:16,094 INFO [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics system shutdown complete. Command: hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -Dstream.non.zero.exit.is.failure=false -input 'hadoop_practicals_input/flight_records.csv' -output average_delay -mapper 'python ./flight_mapper.py' -reducer 'python ./flight_reducer.py' -file ./flight_mapper.py -file ./flight_reducer.py -file ./framework.py
... View more
04-20-2017
06:26 AM
I found the location, it was the same where dr3x mentioned. My bad, I didn't noticed properly.
... View more
04-19-2017
01:33 PM
Hi, I have recently downloaded cloudera quick start VM for Virtual Box and I am looking for hadoop streaming jar file so that I can do some python streaming, but I am not able to find it. Please help me to get this jar file. Thanks
... View more