Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Nifi:How to make appropriate logic for Custom nifi procesessors

Highlighted

Nifi:How to make appropriate logic for Custom nifi procesessors

I want to make custom nifi processors and here are several subjects i am interested in:

  1. I want to get xml file from processor than parse it extract text values and put them as attribute for newle created flowfile but i want to update(i mean set new value to one of this tag value) and roll it back to folder,how can i roll back this flowfile?
  2. if i want thid file to be used by several processor should i use filec lock or when getting flowfile make keep source file false, which one is best practice?

now i want OnTrigger Code like this:

public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
            final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
            final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
            final ComponentLog logger = getLogger();
            if (fileQueue.size() < 100) {
                final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
                if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) {
                    try {
                        final Set<File> listing = performListing(directory, fileFilterRef.get(), context.getProperty(RECURSE).asBoolean().booleanValue());
    
                        queueLock.lock();
                        try {
                            listing.removeAll(inProcess);
                            if (!keepingSourceFile) {
                                listing.removeAll(recentlyProcessed);
                            }
    
                            fileQueue.clear();
                            fileQueue.addAll(listing);
    
                            queueLastUpdated.set(System.currentTimeMillis());
                            recentlyProcessed.clear();
    
                            if (listing.isEmpty()) {
                                context.yield();
                            }
                        } finally {
                            queueLock.unlock();
                        }
                    } finally {
                        listingLock.unlock();
                    }
                }
            }
            final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
            final List<File> files = new ArrayList<>(batchSize);
            queueLock.lock();
            try {
                fileQueue.drainTo(files, batchSize);
                if (files.isEmpty()) {
                    return;
                } else {
                    inProcess.addAll(files);
                }
            } finally {
                queueLock.unlock();
            }
    
            //make  xml parsing
            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
            try {
                dBuilder = dbFactory.newDocumentBuilder();
            } catch (ParserConfigurationException e) {
                e.printStackTrace();
            }
            Document doc = null;
            try {
                File f=  files.get(0);
                doc = dBuilder.parse(f);
            } catch (SAXException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            NodeList nList = doc.getElementsByTagName("localAttributes");
            for (int temp = 0; temp < nList.getLength(); temp++) {
    
                Node nNode = nList.item(temp);
    
    
                if (nNode.getNodeType() == Node.ELEMENT_NODE) {
    
                    Element eElement = (Element) nNode;
    
    
                    start = eElement.getElementsByTagName("start").item(0).getTextContent();
                    startDate = eElement.getElementsByTagName("startDate").item(0).getTextContent();
                    endDate = eElement.getElementsByTagName("endDate").item(0).getTextContent();
                    patch = eElement.getElementsByTagName("patch").item(0).getTextContent();
                    runAs = eElement.getElementsByTagName("runAs").item(0).getTextContent();
    
                }
            }


            final ListIterator<File> itr = files.listIterator();
    
            FlowFile flowFile = null;
            try {
                final Path directoryPath = directory.toPath();
                while (itr.hasNext()) {
                    final File file = itr.next();
                    final Path filePath = file.toPath();
                    final Path relativePath = directoryPath.relativize(filePath.getParent());
                    String relativePathString = relativePath.toString() + "/";
                    if (relativePathString.isEmpty()) {
                        relativePathString = "./";
                    }
                    final Path absPath = filePath.toAbsolutePath();
                    final String absPathString = absPath.getParent().toString() + "/";
    
                    flowFile = session.create();
                    final long importStart = System.nanoTime();
                    flowFile = session.importFrom(filePath, keepingSourceFile, flowFile);
                    final long importNanos = System.nanoTime() - importStart;
                    final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
    
                    flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
                    flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
                    flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                        /*flowFile = session.putAttribute(flowFile, "start", start);
                        flowFile = session.putAttribute(flowFile, "startDate", startDate);
                        flowFile = session.putAttribute(flowFile, "endDate", endDate);
                        flowFile = session.putAttribute(flowFile, "runAs", runAs);
                        flowFile = session.putAttribute(flowFile, "patch", patch);*/
    
                    Map<String, String> attributes = getAttributesFromFile(filePath);
                    if (attributes.size() > 0) {
                        flowFile = session.putAllAttributes(flowFile, attributes);
                    }
    
                    FlowFile flowFile1= session.create();
                    flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName());
                    flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString);
                    flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                    flowFile = session.putAttribute(flowFile, "start", start);
                    flowFile = session.putAttribute(flowFile, "startDate", startDate);
                    flowFile = session.putAttribute(flowFile, "endDate", endDate);
                    flowFile = session.putAttribute(flowFile, "runAs", runAs);
                    flowFile = session.putAttribute(flowFile, "patch", patch);
    
                    session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis);
                    session.transfer(flowFile1, REL_SUCCESS);
    
                    FlowFile flowFile3=session.create();
                    flowFile3=session.importFrom(filePath, keepingSourceFile, flowFile);
    
                    NodeList run = doc.getElementsByTagName("runAs");
                    run.item(0).setNodeValue("false");
                     session.transfer(flowFile3,REL_ROLLBACK);
                    session.remove(flowFile);


 
 
 
 
 
 
 
 



Don't have an account?
Coming from Hortonworks? Activate your account here