Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Nifi: How to make write and check operations in file in one thread (processor)?

Nifi: How to make write and check operations in file in one thread (processor)?

New Contributor

How to make only one point (Processor or Service) that will write the file and make it working in single thread, in my case i have workflow like this executescript1(single thread processor with write operations)->updateAttribute->InvokeHttpProcessor->executescript1(single thread processor with check operations (it is the first processor)) i have tried the code below but it nor fulfills sucessfully neither trows exception,

WHAT SHOULD I CHANGE?

here is my code:

  File file = new File("C:/Users/Desktop/test/conf.xml");
            String content = "";
            BufferedReader s;
            BufferedWriter w;
            RandomAccessFile ini= new RandomAccessFile(file, "rwd");
            FileLock lock= ini.getChannel().lock();
    
            try {
             def flowFile=session.get();
    		 if(flowFile==null){
            
                    String sCurrentLine;
                s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
    
    
                while ((sCurrentLine = s.readLine()) != null) {
                    content += sCurrentLine;
                }
                ini.seek(0);
    			
    			  def flowFile1=session.create()
                    flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml");
                    session.write(flowFile1, new StreamCallback() {
                        @Override
                        public void process(InputStream inputStream1, OutputStream outputStream) throws IOException {
    
                            outputStream.write(content.getBytes(StandardCharsets.UTF_8))
                        }
    
                    });
    
                    
                    session.transfer(flowFile1,REL_SUCCESS);
    			
    			 def xml = new XmlParser().parseText(content);
    			 xml.'**'.findAll{it.name() == 'run'}.each{ it.replaceBody 'false'}
                def newxml=XmlUtil.serialize(xml);
                
                String data =newxml;
    			
                if (!data.isEmpty()) {
                    ini.setLength(0);
                    w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                    w.write(data);
                    lock.release();
                    w.close();
    
    
    
    
                }
    			}
    			
    			
    			else{
    			def   serviceName=flowFile.getAttribute('serviceName');
    			def date=flowFile.getAttribute('filename').substring(0,10);
                if(serviceName=='Decl'){
              def xml = new XmlParser().parseText(content)
                for(int i=0;i<names.size();i++) {
                    date = names.get(i).substring(0, 10);
                    xml.RS.Decl.details.findAll({ p ->
                        p.runAs[0].text() == "false" && p.start[0].text() == date.toString()
                    }).each({ p ->
                        p.start[0].value = addDays( p.start[0].text())
                        p.runAs[0].value = "true"
                    })
                }
                def newXml=  groovy.xml.XmlUtil.serialize( xml )
               
                data = newXml.toString()
                if (!data.isEmpty()) {
                     ini.setLength(0);
                    w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                    w.write(data);
    				    lock.release();
                    w.close();
    
    			
    			}
    			}
    			else  if(serviceName=='TaxyFee'){
    			  def xml = new XmlParser().parseText(content)
                for(int i=0;i<names.size();i++) {
                    date = names.get(i).substring(0, 10);
                    xml.RS.TaxyFee.details.findAll({ p ->
                        p.runAs[0].text() == "false" && p.start[0].text() == date.toString()
                    }).each({ p ->
                        p.start[0].value = addDays( p.start[0].text())
                        p.runAs[0].value = "true"
                    })
                }
                def newXml=  groovy.xml.XmlUtil.serialize( xml )
               
                data = newXml.toString()
                if (!data.isEmpty()) {
                     ini.setLength(0);
                    w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                    w.write(data);
    				    lock.release();
                    w.close();
    
    			
    			}
    			}
    			}
    
    			
    
            } catch (FileNotFoundException e) {
                //e.printStackTrace();
                TimeUnit.SECONDS.sleep(50000);
            } catch (IOException e) {
                e.printStackTrace();
    
            } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(50000);
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
    
                //lock.release();
                ini.close();
            }