Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Using threads inside nifi ExecuteScript processos

Highlighted

Using threads inside nifi ExecuteScript processos

New Contributor

I want to use natural threads for read - write operation in config file , i have used threads implemented in groovy, but my code neither throws exception nor makes anything else , here are several subjects i am interested in:

  • Is it possible to implement natural Threads inside nifi executeScript processor groovy code?
  • How should i implement this logic inside nifi environment without threads?

    here is my code example, WHAT SHOULD I CHANGE TO MAKE THIS CODE WORK?

static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
def static thread=Thread.start{
try {
    String sCurrentLine;
    s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
    while ((sCurrentLine = s.readLine()) != null) {
        content += sCurrentLine;
    }

    ini.seek(0);
     def flowFile1 = session.create()
           flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml");
           session.write(flowFile1, new StreamCallback() {
               @Override
               public void process(InputStream inputStream1, OutputStream outputStream) throws IOException {

                   outputStream.write(content.getBytes(StandardCharsets.UTF_8))
               }

           });
           session.transfer(flowFile1, REL_SUCCESS);

    def xml = new XmlParser().parseText(content);
    xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'};
    def newxml1 = XmlUtil.serialize(xml);
    String data = newxml1;
    if (!data.isEmpty()) {
        ini.setLength(0);
        w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
        w.write(data);
        lock.release();
        w.close();

    }


}catch (FileNotFoundException e) {
    TimeUnit.SECONDS.sleep(50000);
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();

} catch(OverlappingFileLockException e){
    TimeUnit.SECONDS.sleep(50000);
    lock.release();
    e.printStackTrace();
} catch (Exception e) {
    e.printStackTrace();
} finally {

    lock.release()
    thread.stop();
}


}

 def static thread2=Thread.start{
    try {

        String sCurrentLine;
        s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
        while ((sCurrentLine = s.readLine()) != null) {
            content += sCurrentLine;
        }
        ini.seek(0);

        def flowFile = session.get();
         if (flowFile != null) return;
         def serviceName = flowFile.getAttribute('serviceName');
         def date = flowFile.getAttribute('filename').substring(0, 10);
         def xml = new XmlParser().parseText(content)
         if (serviceName == 'borderCrossDecl') {

             xml.RS.borderCrossDecl.details.findAll({ p ->
                 p.runAs[0].text() == "false" && p.start[0].text() == date.toString();
             }).each({ p ->
                 p.start[0].value = addDays(p.start[0].text())
                 p.runAs[0].value = "true"
             })
         }
        def xml1 = new XmlParser().parseText(content);
        def newXml = XmlUtil.serialize(xml1)
        String data = newXml.toString();
        if (!data.isEmpty()) {
            ini.setLength(0);
            w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
            w.write(data);
            lock.release();
            w.close();

        }
    }catch (FileNotFoundException e) {
        TimeUnit.SECONDS.sleep(50000);
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();

    } catch(OverlappingFileLockException e){
        TimeUnit.SECONDS.sleep(50000);
        lock.release();
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lock.release();
        thread2.stop();
    }

};

 thread.join();
 thread2.join();