Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

No lease on file (inode 5425306): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_-247167534_42, pendingcreates: 1]

avatar
Contributor

I am getting below error while working with Apache storm and HDFS.

Caused by: org.apache.hadoop.ipc.RemoteException: No lease on /user/test/xxx/ccc/bbb/value_fct_archive.csv (inode 5425306): File does not exist. [Lease.  Holder: DFSClient_NONMAPREDUCE_-247167534_42, pendingcreates: 1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3521)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3324)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3162)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3122)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:843)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:500)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1552) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1496) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.Client.call(Client.java:1396) ~[stormjar.jar:?]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy41.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:457) ~[stormjar.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_77]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_77]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_77]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_77]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194) ~[stormjar.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176) ~[stormjar.jar:?]
at com.sun.proxy.$Proxy43.addBlock(Unknown Source) ~[?:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1489) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1284) ~[stormjar.jar:?]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:463) ~[stormjar.jar:?]

I tried to resolve it by many ways like increasing ulimit or setting parallelism hint to 1 etc. but nothing worked for me.

My topology code is as below,

public class Topology {

private static final Logger LOG = LoggerFactory.getLogger(Topology.class);

public static StormTopology buildTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    // read properties and send to MQTT spout
    PropertyReader propReader = new PropertyReader();
    Properties prop = propReader.getProperties();
    String mqttService = (propReader.getValue(prop, "mqttservice")).trim();

    String[] devicesTopics = ((propReader.getValue(prop, "devices_topics.MQTT")).trim()).split(",");
    for(String devicesTopicsId:devicesTopics){
        LOG.info("devicesTopics Id  is "+devicesTopicsId);
        String[] devices = devicesTopicsId.split(":");
        String deviceId = devices[0];
        LOG.info("device Id  is "+deviceId);
        String mqtttopic = devices[1];
        LOG.info("Topic Name  is "+mqtttopic);
        //start topology logic
        String mqttreader="mqttreader-"+deviceId;
        LOG.info("Mqtt reader  is "+mqttreader);
        String stream="stream-"+deviceId;
        LOG.info("Stream Id  is "+stream);
        builder.setSpout(mqttreader, new MQTTReader(mqttService,mqtttopic,deviceId,stream));

        //set mqttprocessor bolt
        String mqttprocesser="mqttprocesser-"+deviceId;
        LOG.info("Mqtt processor  is "+mqttprocesser);
        builder.setBolt(mqttprocesser, new MQTTProcesser(stream)).shuffleGrouping(mqttreader,stream);

        //set dateretriver bolt
        String dateretriver="dateretriver-"+deviceId;
        LOG.info("date retriver  is "+dateretriver);
        builder.setBolt(dateretriver, new DateRetriver(stream)).shuffleGrouping(mqttprocesser,stream);

        //set archival bolt from dateretriver
        String archive="archive-"+deviceId;
        LOG.info("archival  is "+archive);
        builder.setBolt(archive, new Archival(stream)).shuffleGrouping(dateretriver,stream);

        //get hive bolt
        MQTTHiveSinker sinker = new MQTTHiveSinker();
        String metaStoreURI = (propReader.getValue(prop, "hiveurl")).trim();
        String dbName = (propReader.getValue(prop, "dbname.MQTT")).trim();
        String tblName = (propReader.getValue(prop, "tblname.MQTT")).trim();
        String[] colNames = ((propReader.getValue(prop, "colNames.MQTT")).trim()).split(",");
        LOG.info("colName0  is "+colNames[0]);
        LOG.info("colName1  is "+colNames[1]);
        LOG.info("colName2  is "+colNames[2]);
        LOG.info("colName3  is "+colNames[3]);
        HiveBolt hiveBolt = sinker.buildHiveBolt(metaStoreURI, dbName, tblName, colNames);

        //set hive bolt
        String hivebolt="hivebolt-"+deviceId;
        LOG.info("Hivbolt  is "+hivebolt);
        builder.setBolt(hivebolt, hiveBolt).shuffleGrouping(archive);

        //set hdfs bolt
        MQTTHDFSSinker hdfssinker = new MQTTHDFSSinker();
        String hdfsboltId="hdfsbolt-"+deviceId;
        LOG.info("hdfsbolt  is "+hdfsboltId);
        HdfsBolt hdfbolt = hdfssinker.makeHDFSBolt();
        builder.setBolt(hdfsboltId,hdfbolt).shuffleGrouping(archive);
    }   
    return builder.createTopology();
}

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {     
    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(2);
    //conf.setMaxSpoutPending(1);
    StormSubmitter.submitTopology(args[0], conf, buildTopology());
} 

And HDFS streaming code as below,

public class MQTTHDFSSinker {

private static final Logger LOG = LoggerFactory.getLogger(MQTTHDFSSinker.class);
public HdfsBolt makeHDFSBolt(){
    LOG.info("start of MQTTHDFSSinker.makeHDFSBolt");
    RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");

    // sync the filesystem after every 1k tuples
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);

    // rotate files when they reach 5MB
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

    FileNameFormat fileNameFormat = new CustomHDFSFileNameFormat("/user/test/xxx/ccc/bbb/","value_fct_archive","csv");

    HdfsBolt bolt = new HdfsBolt().withFsUrl("hdfs://namenode:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
    LOG.info("end of MQTTHDFSSinker.makeHDFSBolt");
    return bolt;
}

}

Is it issue of multiple threads trying to write the file. Can anybody please help me for the error.

1 ACCEPTED SOLUTION

avatar
Explorer

Hi @parag dharmadhikari

Answer to your question “what should I do in my custom hdfs file name format so that it can also work…”

DefaultFileNameFormat#getName() uses the provided 'timestamp' to form the file name. So each time this is called with different timestamp, a new filename will be returned for different threads.

Where as CustomFileNameFormat is not using the 'timestamp' to form the unique file name for multiple threads. So when same CustomFileFormat is used to from multiple threads, it returns same filename, resulting in above collision. So this might be the reason for your case, where it succeeds with DefaultFileNameFormat. Please provide some unique name for each thread in the CustomFileFormat

View solution in original post

7 REPLIES 7

avatar
Explorer

Hi @parag dharmadhikari

It could be issue of Multiple threads trying to write the same file, if the overwrite flag is set to true while creating the file

If the file created by the first thread is overwritten by thesecond thread, then first thread will experience the above exception.

Solution1 : If your case is multiple threads, then setting ‘overwrite’ flag to false, will resolve the issue,

Solution 2: If your case is not about creating files in multiple threads, please check whether some other client is deleting the file/parent directory.

avatar
Contributor

Hi @ajoseph

Thanks . But can you please tell me how could I set overwrite flag.

avatar
Contributor

What I have observed that using DefaultFileNameFormat works fine for me.

So can any body please tell me what should I do in my custom hdfs file name format so that it can also work without no lease file error.

public class CustomHDFSFileNameFormat implements FileNameFormat{

private static final long serialVersionUID = 1L;
private String filename=null;
private String extension=null;
private String path=null;

public CustomHDFSFileNameFormat(String path,String filename,String extension){
    this.filename = filename;
    this.extension = extension;
    this.path = path;
}
public String getName(long rotation, long timeStamp) {
    return (this.filename+"."+this.extension);
}

public String getPath() {
    return this.path;
}

public void prepare(Map map, TopologyContext context) {
    // TODO Auto-generated method stub

}

}

avatar
Explorer

Please find the reply above for the question “what should I do in my custom hdfs file name format so that it can also work…”

avatar
Explorer

Hi @parag dharmadhikari

  1. Answer to your question “how could I set overwrite flag.”

Making the overwrite=false needs HdfsBolt changes.

HdfsBolt calls simple API to create the file FileSystem#create(path), for which overwrite=true by default.

HdfsBolt has to be changed to use the api FileSystem#create(Path, boolean overwrite)


avatar
Explorer

Hi @parag dharmadhikari

Answer to your question “what should I do in my custom hdfs file name format so that it can also work…”

DefaultFileNameFormat#getName() uses the provided 'timestamp' to form the file name. So each time this is called with different timestamp, a new filename will be returned for different threads.

Where as CustomFileNameFormat is not using the 'timestamp' to form the unique file name for multiple threads. So when same CustomFileFormat is used to from multiple threads, it returns same filename, resulting in above collision. So this might be the reason for your case, where it succeeds with DefaultFileNameFormat. Please provide some unique name for each thread in the CustomFileFormat

avatar
Contributor

Thanks, it worked