Created 09-22-2017 01:36 PM
I am getting below error while working with Apache storm and HDFS.
Caused by: org.apache.hadoop.ipc.RemoteException: No lease on /user/test/xxx/ccc/bbb/value_fct_archive.csv (inode 5425306): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-247167534_42, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3521)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3324)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3162)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3122)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:843)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:500)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1496) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1396) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy41.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:457) ~[stormjar.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_77]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_77]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_77]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_77]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy43.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1489) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1284) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) ~[stormjar.jar:?]
I tried to resolve it by many ways like increasing ulimit or setting parallelism hint to 1 etc. but nothing worked for me.
My topology code is as below,
public class Topology {
private static final Logger LOG = LoggerFactory.getLogger(Topology.class);
public static StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
// read properties and send to MQTT spout
PropertyReader propReader = new PropertyReader();
Properties prop = propReader.getProperties();
String mqttService = (propReader.getValue(prop, "mqttservice")).trim();
String[] devicesTopics = ((propReader.getValue(prop, "devices_topics.MQTT")).trim()).split(",");
for(String devicesTopicsId:devicesTopics){
LOG.info("devicesTopics Id is "+devicesTopicsId);
String[] devices = devicesTopicsId.split(":");
String deviceId = devices[0];
LOG.info("device Id is "+deviceId);
String mqtttopic = devices[1];
LOG.info("Topic Name is "+mqtttopic);
//start topology logic
String mqttreader="mqttreader-"+deviceId;
LOG.info("Mqtt reader is "+mqttreader);
String stream="stream-"+deviceId;
LOG.info("Stream Id is "+stream);
builder.setSpout(mqttreader, new MQTTReader(mqttService,mqtttopic,deviceId,stream));
//set mqttprocessor bolt
String mqttprocesser="mqttprocesser-"+deviceId;
LOG.info("Mqtt processor is "+mqttprocesser);
builder.setBolt(mqttprocesser, new MQTTProcesser(stream)).shuffleGrouping(mqttreader,stream);
//set dateretriver bolt
String dateretriver="dateretriver-"+deviceId;
LOG.info("date retriver is "+dateretriver);
builder.setBolt(dateretriver, new DateRetriver(stream)).shuffleGrouping(mqttprocesser,stream);
//set archival bolt from dateretriver
String archive="archive-"+deviceId;
LOG.info("archival is "+archive);
builder.setBolt(archive, new Archival(stream)).shuffleGrouping(dateretriver,stream);
//get hive bolt
MQTTHiveSinker sinker = new MQTTHiveSinker();
String metaStoreURI = (propReader.getValue(prop, "hiveurl")).trim();
String dbName = (propReader.getValue(prop, "dbname.MQTT")).trim();
String tblName = (propReader.getValue(prop, "tblname.MQTT")).trim();
String[] colNames = ((propReader.getValue(prop, "colNames.MQTT")).trim()).split(",");
LOG.info("colName0 is "+colNames[0]);
LOG.info("colName1 is "+colNames[1]);
LOG.info("colName2 is "+colNames[2]);
LOG.info("colName3 is "+colNames[3]);
HiveBolt hiveBolt = sinker.buildHiveBolt(metaStoreURI, dbName, tblName, colNames);
//set hive bolt
String hivebolt="hivebolt-"+deviceId;
LOG.info("Hivbolt is "+hivebolt);
builder.setBolt(hivebolt, hiveBolt).shuffleGrouping(archive);
//set hdfs bolt
MQTTHDFSSinker hdfssinker = new MQTTHDFSSinker();
String hdfsboltId="hdfsbolt-"+deviceId;
LOG.info("hdfsbolt is "+hdfsboltId);
HdfsBolt hdfbolt = hdfssinker.makeHDFSBolt();
builder.setBolt(hdfsboltId,hdfbolt).shuffleGrouping(archive);
}
return builder.createTopology();
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
//conf.setMaxSpoutPending(1);
StormSubmitter.submitTopology(args[0], conf, buildTopology());
}
And HDFS streaming code as below,
public class MQTTHDFSSinker {
private static final Logger LOG = LoggerFactory.getLogger(MQTTHDFSSinker.class);
public HdfsBolt makeHDFSBolt(){
LOG.info("start of MQTTHDFSSinker.makeHDFSBolt");
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
FileNameFormat fileNameFormat = new CustomHDFSFileNameFormat("/user/test/xxx/ccc/bbb/","value_fct_archive","csv");
HdfsBolt bolt = new HdfsBolt().withFsUrl("hdfs://namenode:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
LOG.info("end of MQTTHDFSSinker.makeHDFSBolt");
return bolt;
}
}
Is it issue of multiple threads trying to write the file. Can anybody please help me for the error.
Created 09-27-2017 03:59 PM
Answer to your question “what should I do in my custom hdfs file name format so that it can also work…”
DefaultFileNameFormat#getName() uses the provided 'timestamp' to form the file name. So each time this is called with different timestamp, a new filename will be returned for different threads.
Where as CustomFileNameFormat is not using the 'timestamp' to form the unique file name for multiple threads. So when same CustomFileFormat is used to from multiple threads, it returns same filename, resulting in above collision. So this might be the reason for your case, where it succeeds with DefaultFileNameFormat. Please provide some unique name for each thread in the CustomFileFormat
Created 09-24-2017 02:53 PM
It could be issue of Multiple threads trying to write the same file, if the overwrite flag is set to true while creating the file
If the file created by the first thread is overwritten by thesecond thread, then first thread will experience the above exception.
Solution1 : If your case is multiple threads, then setting ‘overwrite’ flag to false, will resolve the issue,
Solution 2: If your case is not about creating files in multiple threads, please check whether some other client is deleting the file/parent directory.
Created 09-25-2017 04:37 AM
Hi @ajoseph
Thanks . But can you please tell me how could I set overwrite flag.
Created 09-25-2017 06:37 AM
What I have observed that using DefaultFileNameFormat works fine for me.
So can any body please tell me what should I do in my custom hdfs file name format so that it can also work without no lease file error.
public class CustomHDFSFileNameFormat implements FileNameFormat{
private static final long serialVersionUID = 1L;
private String filename=null;
private String extension=null;
private String path=null;
public CustomHDFSFileNameFormat(String path,String filename,String extension){
this.filename = filename;
this.extension = extension;
this.path = path;
}
public String getName(long rotation, long timeStamp) {
return (this.filename+"."+this.extension);
}
public String getPath() {
return this.path;
}
public void prepare(Map map, TopologyContext context) {
// TODO Auto-generated method stub
}
}
Created 09-27-2017 04:01 PM
Please find the reply above for the question “what should I do in my custom hdfs file name format so that it can also work…”
Created 09-27-2017 03:56 PM
Making the overwrite=false needs HdfsBolt changes.
HdfsBolt calls simple API to create the file FileSystem#create(path), for which overwrite=true by default.
HdfsBolt has to be changed to use the api FileSystem#create(Path, boolean overwrite)
Created 09-27-2017 03:59 PM
Answer to your question “what should I do in my custom hdfs file name format so that it can also work…”
DefaultFileNameFormat#getName() uses the provided 'timestamp' to form the file name. So each time this is called with different timestamp, a new filename will be returned for different threads.
Where as CustomFileNameFormat is not using the 'timestamp' to form the unique file name for multiple threads. So when same CustomFileFormat is used to from multiple threads, it returns same filename, resulting in above collision. So this might be the reason for your case, where it succeeds with DefaultFileNameFormat. Please provide some unique name for each thread in the CustomFileFormat
Created 09-28-2017 08:35 AM
Thanks, it worked