Support Questions
Find answers, ask questions, and share your expertise

how to use threads in nifi?

Highlighted

how to use threads in nifi?

I have one config file inside my nifi environment, in which i should change, check and update values , but i should make this operation in independent threads so that(threads should't flush)

here are several things i am interested in:
1. Can i use Synchronized code blocks inside executeScript processor?
2.How can i use Thread locks inside nifi?

here is my code but it doesn't work properly what should i change to make my logic work?

 static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
public static synchronized void read() {
    try {
        String sCurrentLine;
        s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
        while ((sCurrentLine = s.readLine()) != null) {
            content += sCurrentLine;
        }

        ini.seek(0);
        /* def flowFile1 = session.create()
               flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml");
               session.write(flowFile1, new StreamCallback() {
                   @Override
                   public void process(InputStream inputStream1, OutputStream outputStream) throws IOException {

                       outputStream.write(content.getBytes(StandardCharsets.UTF_8))
                   }

               });
               session.transfer(flowFile1, REL_SUCCESS);*/

        def xml = new XmlParser().parseText(content);
        //xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'};
        def newxml1 = XmlUtil.serialize(xml);
        String data = newxml1;
        if (!data.isEmpty()) {
            ini.setLength(0);
            w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
            w.write(data);
            lock.release();
            w.close();

        }
        println data;

    }catch (FileNotFoundException e) {
        TimeUnit.SECONDS.sleep(50000);
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();

    } catch(OverlappingFileLockException e){
        TimeUnit.SECONDS.sleep(50000);
        lock.release();
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        w.close();
        ini.close();
    }
}

public static void write() {
    synchronized (Main1.class) {
        try {

            String sCurrentLine;
            s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
            while ((sCurrentLine = s.readLine()) != null) {
                content += sCurrentLine;
            }
            // println content;
            ini.seek(0);

            /* def flowFile = session.get();
             if (flowFile != null) return;
             def serviceName = flowFile.getAttribute('serviceName');
             def date = flowFile.getAttribute('filename').substring(0, 10);
             def xml = new XmlParser().parseText(content)
             if (serviceName == 'borderCrossDecl') {

                 xml.RS.borderCrossDecl.details.findAll({ p ->
                     p.runAs[0].text() == "false" && p.start[0].text() == date.toString();
                 }).each({ p ->
                     p.start[0].value = addDays(p.start[0].text())
                     p.runAs[0].value = "true"
                 })
             }*/

            def newXml = groovy.xml.XmlUtil.serialize(content)

            String data = newXml.toString();
            if (!data.isEmpty()) {
                ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                w.write(data);
                lock.release();
                w.close();

            }
        }catch (FileNotFoundException e) {
            TimeUnit.SECONDS.sleep(50000);
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();

        } catch(OverlappingFileLockException e){
            TimeUnit.SECONDS.sleep(50000);
            lock.release();
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            w.close();
            ini.close();
        }
    }
}
public static void main(String  [] args){
    read();
    write();
}