Created 06-28-2017 02:43 PM
I m using hadoop-2.7.2. using CentOS-7 (64 Bit). Trying to extract the pdf file using apache-tika. when i execute this code on eclipse i m able extacted file then i created a jar and exceuted that jar on hadoop in ubuntu machine its also work fine.but when i used that jar in ambari installed hadoop with same code. i m getting this error Error: java.net.MalformedURLException: unknown protocol: hdfs. i don't understand why its not working.i tried alot to resolve this issue but failed. plz someone help me.
this is my code
package tikka.com; import java.io.IOException; import java.net.URL; import java.util.Date; import org.apache.commons.compress.utils.IOUtils.*; import org.tukaani.xz.ARMOptions.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class TikaMapreduce extends Configured implements Tool { public static class TikaMapper extends Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static void main(String[] args) throws Exception { int exit = ToolRunner.run(new Configuration(), new TikaMapreduce(), args); System.exit(exit); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); Job job = new Job(conf, "TikaMapreduce"); job.setJarByClass(getClass()); job.setJobName("TikRead"); job.setInputFormatClass(TikaFileInputFormat.class); System.out.println("read input file"); FileInputFormat.addInputPath(job, new Path("hdfs://hostname:8020/input-data/pwc-canada-issues-reporting-form.pdf")); job.setMapperClass(TikaMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TikaOutPutFormt.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://hostname:8020/output-data/pdfoutput"+(new Date().getTime()))); System.out.println("output pdf"); return job.waitForCompletion(true) ? 0 : 1; } }
package tikka.com; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class TikaFileInputFormat extends FileInputFormat<Text, Text> { @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub //return new TikaRecordReader(); return new TikaRecordReader(); } }
package tikka.com; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TikaOutPutFormt extends FileOutputFormat<Text, Text> { @Override public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub Path path=FileOutputFormat.getOutputPath(context); Path fullapth=new Path(path,"PDF.txt"); FileSystem fs=path.getFileSystem(context.getConfiguration()); FSDataOutputStream output=fs.create(fullapth,context); return new TikaRecordWrite(output); } }
package tikka.com; import java.io.IOException; import java.net.URL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.tika.Tika; import org.apache.tika.exception.TikaException; public class TikaRecordReader extends RecordReader<Text, Text> { private Text key = new Text(); private Text value = new Text(); private FileSplit fileSplit; private Configuration conf; private boolean processed = false; @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return value; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return processed ? 1.0f : 0.0f; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub this.fileSplit = (FileSplit) split; this.conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub if (!processed) { Path path = fileSplit.getPath(); key.set(path.toString()); @SuppressWarnings("unused") FileSystem fs = path.getFileSystem(conf); @SuppressWarnings("unused") FSDataInputStream fin = null; try { String con = new Tika().parseToString(new URL(path.toString())); String string = con.replaceAll("[$%&+,:;=?#|']", " "); String string2 = string.replaceAll("\\s+", " "); String lo = string2.toLowerCase(); value.set(lo); } catch (TikaException e) { // TODO Auto-generated catch block e.printStackTrace(); } processed = true; return true; } else { return false; } } }
package tikka.com; import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; public class TikaRecordWrite extends RecordWriter<Text, Text> { private DataOutputStream out; public TikaRecordWrite(DataOutputStream output) { // TODO Auto-generated constructor stub out=output; try { out.writeBytes("result:\r\n"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub out.close(); } @Override public void write(Text key, Text value) throws IOException, InterruptedException { // TODO Auto-generated method stub out.writeBytes(key.toString()); out.writeBytes(","); out.writeBytes(value.toString()); out.writeBytes("\r\n"); } }
output after executed jar file
read input file output pdf 17/06/28 02:29:10 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 17/06/28 02:29:11 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 17/06/28 02:29:12 INFO input.FileInputFormat: Total input paths to process : 1 17/06/28 02:29:12 INFO mapreduce.JobSubmitter: number of splits:1 17/06/28 02:29:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1498630846170_0002 17/06/28 02:29:13 INFO impl.YarnClientImpl: Submitted application application_1498630846170_0002 17/06/28 02:29:13 INFO mapreduce.Job: The url to track the job: https://hostname:8088/proxy/application_1498630846170_0002/ 17/06/28 02:29:13 INFO mapreduce.Job: Running job: job_1498630846170_0002 17/06/28 02:29:23 INFO mapreduce.Job: Job job_1498630846170_0002 running in uber mode : false 17/06/28 02:29:23 INFO mapreduce.Job: map 0% reduce 0% 17/06/28 02:29:51 INFO mapreduce.Job: Task Id : attempt_1498630846170_0002_m_000000_0, Status : FAILED Error: java.net.MalformedURLException: unknown protocol: hdfs at java.net.URL.<init>(URL.java:600) at java.net.URL.<init>(URL.java:490) at java.net.URL.<init>(URL.java:439) at tikka.com.TikaRecordReader.nextKeyValue(TikaRecordReader.java:77) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 17/06/28 02:30:09 INFO mapreduce.Job: Task Id : attempt_1498630846170_0002_m_000000_1, Status : FAILED Error: java.net.MalformedURLException: unknown protocol: hdfs at java.net.URL.<init>(URL.java:600) at java.net.URL.<init>(URL.java:490) at java.net.URL.<init>(URL.java:439) at tikka.com.TikaRecordReader.nextKeyValue(TikaRecordReader.java:77) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 17/06/28 02:30:24 INFO mapreduce.Job: Task Id : attempt_1498630846170_0002_m_000000_2, Status : FAILED Error: java.net.MalformedURLException: unknown protocol: hdfs at java.net.URL.<init>(URL.java:600) at java.net.URL.<init>(URL.java:490) at java.net.URL.<init>(URL.java:439) at tikka.com.TikaRecordReader.nextKeyValue(TikaRecordReader.java:77) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:556) at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 17/06/28 02:30:39 INFO mapreduce.Job: map 100% reduce 0% 17/06/28 02:30:40 INFO mapreduce.Job: map 100% reduce 100% 17/06/28 02:30:44 INFO mapreduce.Job: Job job_1498630846170_0002 failed with state FAILED due to: Task failed task_1498630846170_0002_m_000000 Job failed as tasks failed. failedMaps:1 failedReduces:0 17/06/28 02:30:44 INFO mapreduce.Job: Counters: 13 Job Counters Failed map tasks=4 Killed reduce tasks=1 Launched map tasks=4 Other local map tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=67789 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=67789 Total time spent by all reduce tasks (ms)=0 Total vcore-milliseconds taken by all map tasks=67789 Total vcore-milliseconds taken by all reduce tasks=0 Total megabyte-milliseconds taken by all map tasks=69415936 Total megabyte-milliseconds taken by all reduce tasks=0
thank in advance.
Created 06-29-2017 06:41 AM
Is that the same distribution hadoop version between CentOS and Ubuntu?
Created 06-30-2017 01:22 AM
on ubuntu used hadoop-2.7.3 version and in centos used hadoop-2.7.2 version.
Created 06-29-2017 06:59 AM
It might be a hdfs client jar / classpath issue.
Can you please check if your classpath is pointing to correct Hadoop jars?
You can also use the "hadop classpath" command output to find the classpath to be used.
Example:
# javac -cp `hadoop classpath` -d TikaMapreduce.java # java -cp `hadoop classpath`:.: tikka.com.TikaMapreduce.java
.
You can also add your own JAR directories in the classpath like:
-cp `hadoop classpath`:$YOUR/LIB/PATH:.:
.
Created 06-30-2017 02:53 AM
i have set this way of my classpath. Is this right way? if i m wrong plz guide me for right way.
export HADOOP_CLASSPATH=/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/*::/opt/hadoop/contrib/capacity-scheduler/*.jar
Created 06-30-2017 03:01 AM
Can you please try like this:
#### First check if the `hadoop classpath` is returning proper results. # hadoop classpath #### If yes, then set the CLASSPATH as following. #### Please NOTE the value is in back tick `` Not single quote '' # export CLASSPATH=`hadoop classpath`
.
Also please note that the "HADOOP_CLASSPATH" variable is normally used by the standard hadoop scripts, But for your standalone java code you should set the value in CLASSPATH variable instead.
.
Created 06-30-2017 04:18 AM
yes i got the result.you can see.
[hdusr@vfarm01d hadoop]$ hadoop classpath /opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/contrib/capacity-scheduler/*.jar
and i have set as you said
export CLASSPATH=`hadoop classpath`
but still getting same error.