Created 12-11-2017 12:48 AM
I am trying to copy the data from one directory in HDFS to another directory in HDFS but I am facing few issues.
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); LOGGER.info("Connected"); Path source=new Path("/data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/"); Path target=new Path("/data_dev/deepak/dest/raw/epics/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/"); System.out.println(source); System.out.println(target); System.out.println("source"+fs.exists(source)); System.out.println("source"+fs.exists(target)); FileSystem srcfs = FileSystem.get(conf); FileSystem dstFS = FileSystem.get(conf); RemoteIterator<LocatedFileStatus> sourceFiles = srcfs.listFiles(source, false); LOGGER.info(sourceFiles.toString()); LOGGER.info("source File System "+fs.toString()); LOGGER.info("destniation File System"+dstFS.toString()); if(!fs.exists(target)) { fs.create(target); LOGGER.info("created thr path"); } if(sourceFiles != null) { while(sourceFiles.hasNext()){ System.out.println(sourceFiles.toString()); Path srcfilepath = sourceFiles.next().getPath(); System.out.println(srcfilepath); if(FileUtil.copy(srcfs, srcfilepath, dstFS, target, false,true, conf)){ System.out.println("Copied Successfully" ); } else { System.out.println("Copy Failed"); } } } srcfs.close(); dstFS.close(); fs.close(); }
If the destination directory doesn't exist then I am creating the destination directory in the code above. So only if the destination directory is not there I am getting this error.
But if the destination directory is already there, the program runs fine but my source directory files( file format is .lzo) are not copied to the destination directory.
But this program works fine if I copy from other directories which has normal text file.
I have posted the code snippet and error log in the code section below.
Any help would be appreciated. Thanks !
Log: hadoop jar Moving.jar Dec 10, 2017 6:07:30 PM com.ghs.misc.Moving main INFO: Connected /data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00 /data_dev/deepak/dest/raw/epics/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00 sourcetrue sourcefalse Dec 10, 2017 6:07:30 PM com.ghs.misc.Moving main INFO: org.apache.hadoop.fs.FileSystem$6@29a1c0b7 Dec 10, 2017 6:07:30 PM com.ghs.misc.Moving main INFO: source File System DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_61931562_1(auth:KERBEROS)]] Dec 10, 2017 6:07:30 PM com.ghs.misc.Moving main INFO: destniation File SystemDFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_61931562_1 (auth:KERBEROS)]] Dec 10, 2017 6:07:30 PM com.ghs.misc.Moving main INFO: created thr path org.apache.hadoop.fs.FileSystem$6@29a1c0b7 /data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/HQAQA.lzo Copied Successfully org.apache.hadoop.fs.FileSystem$6@29a1c0b7 /data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/HQAQA.lzo.index Copied Successfully org.apache.hadoop.fs.FileSystem$6@29a1c0b7 /data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/Test1.txt Copied Successfully org.apache.hadoop.fs.FileSystem$6@29a1c0b7 /data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/Test2.txt Copied Successfully org.apache.hadoop.fs.FileSystem$6@29a1c0b7 /data_dev/deepak/src/raw/epic/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00/Test3.txt Copied Successfully17/12/10 18:07:34 ERROR hdfs.DFSClient: Failed to close inode 364006128 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /data_dev/deepak/dest/raw/epics/cl_qanswer_qa/hdp_process_date=2017-07-25/hour=00/minute=00 (inode 364006128): File does not exist. Holder DFSClient_NONMAPREDUCE_61931562_1 does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3693) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3781) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3748) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:549) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347) at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554) at org.apache.hadoop.ipc.Client.call(Client.java:1498) at org.apache.hadoop.ipc.Client.call(Client.java:1398) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) at com.sun.proxy.$Proxy10.complete(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:503) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:291) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:203) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:185) at com.sun.proxy.$Proxy11.complete(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2496) at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2472) at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2437) at org.apache.hadoop.hdfs.DFSClient.closeAllFilesBeingWritten(DFSClient.java:949) at org.apache.hadoop.hdfs.DFSClient.closeOutputStreams(DFSClient.java:981) at org.apache.hadoop.hdfs.DistributedFileSystem.close(DistributedFileSystem.java:1211) at com.ghs.misc.Moving.main(Moving.java:67) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:233) at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
Created 12-11-2017 04:02 PM
0down voteaccept |
I found the mistake. While creating a directory i was using filesystem.create() to create a directory. But actually it was creating a FSDataOutputStream at the indicated Path. So i changed it to filesystem.mkdirs(targetpath) which resolved my error. Now my code works fine. Sorry for the silly mistake and wasting you time guys. <code>if(!fs.exists(target)){ fs.mkdirs(target);//I have used this ->fs.create(target); LOGGER.info("created the path");} |
Created 12-11-2017 04:02 PM
0down voteaccept |
I found the mistake. While creating a directory i was using filesystem.create() to create a directory. But actually it was creating a FSDataOutputStream at the indicated Path. So i changed it to filesystem.mkdirs(targetpath) which resolved my error. Now my code works fine. Sorry for the silly mistake and wasting you time guys. <code>if(!fs.exists(target)){ fs.mkdirs(target);//I have used this ->fs.create(target); LOGGER.info("created the path");} |