Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

how to use threads in nifi?

Highlighted

how to use threads in nifi?

New Contributor

I have one config file inside my nifi environment, in which i should change, check and update values , but i should make this operation in independent threads so that(threads should't flush)

here are several things i am interested in:
1. Can i use Synchronized code blocks inside executeScript processor?
2.How can i use Thread locks inside nifi?

here is my code but it doesn't work properly what should i change to make my logic work?

 static String content = "";
static File file = new File("C:/Users/Desktop/test/conf.xml");
static BufferedReader s;
static BufferedWriter w;
static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
static FileLock lock= ini.getChannel().lock();
public static synchronized void read() {
    try {
        String sCurrentLine;
        s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
        while ((sCurrentLine = s.readLine()) != null) {
            content += sCurrentLine;
        }

        ini.seek(0);
        /* def flowFile1 = session.create()
               flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml");
               session.write(flowFile1, new StreamCallback() {
                   @Override
                   public void process(InputStream inputStream1, OutputStream outputStream) throws IOException {

                       outputStream.write(content.getBytes(StandardCharsets.UTF_8))
                   }

               });
               session.transfer(flowFile1, REL_SUCCESS);*/

        def xml = new XmlParser().parseText(content);
        //xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'};
        def newxml1 = XmlUtil.serialize(xml);
        String data = newxml1;
        if (!data.isEmpty()) {
            ini.setLength(0);
            w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
            w.write(data);
            lock.release();
            w.close();

        }
        println data;

    }catch (FileNotFoundException e) {
        TimeUnit.SECONDS.sleep(50000);
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();

    } catch(OverlappingFileLockException e){
        TimeUnit.SECONDS.sleep(50000);
        lock.release();
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        w.close();
        ini.close();
    }
}

public static void write() {
    synchronized (Main1.class) {
        try {

            String sCurrentLine;
            s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8"));
            while ((sCurrentLine = s.readLine()) != null) {
                content += sCurrentLine;
            }
            // println content;
            ini.seek(0);

            /* def flowFile = session.get();
             if (flowFile != null) return;
             def serviceName = flowFile.getAttribute('serviceName');
             def date = flowFile.getAttribute('filename').substring(0, 10);
             def xml = new XmlParser().parseText(content)
             if (serviceName == 'borderCrossDecl') {

                 xml.RS.borderCrossDecl.details.findAll({ p ->
                     p.runAs[0].text() == "false" && p.start[0].text() == date.toString();
                 }).each({ p ->
                     p.start[0].value = addDays(p.start[0].text())
                     p.runAs[0].value = "true"
                 })
             }*/

            def newXml = groovy.xml.XmlUtil.serialize(content)

            String data = newXml.toString();
            if (!data.isEmpty()) {
                ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8"));
                w.write(data);
                lock.release();
                w.close();

            }
        }catch (FileNotFoundException e) {
            TimeUnit.SECONDS.sleep(50000);
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();

        } catch(OverlappingFileLockException e){
            TimeUnit.SECONDS.sleep(50000);
            lock.release();
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            w.close();
            ini.close();
        }
    }
}
public static void main(String  [] args){
    read();
    write();
}