Support Questions
Find answers, ask questions, and share your expertise

How to merge two FlowFile's using custom script?

How to merge two FlowFile's using custom script?

New Contributor

I have two operators: `ListenHTTP` and `QueryElasticsearchHTTP`. The first operator triggers the execution of the second operator, so that finally I have two FlowFile's:

{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}
{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

Both files have the field `eid` and they should be merged by this field. The expected result is this one:

{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}

So, I want to get a single FlowFile.

I tried to use `MergeContent`, but then I get this result, which is incorrect: {"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}

Now I try to write a custom script in Python using `ExecuteScript`, but I am not sure how to perform the merging and tackle multiple FlowFile's.

Any help will be highly appreciated since I was unable to find the solution:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class ModJSON(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "eid": obj['eid'],
          "zid": obj['zid'],
          // ...
          "dates": obj['dates']
        }
    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
 
flowFileList = session.get(2)
if (!flowFileList.isEmpty()) {
    if (flowFileList[0] != None && flowFileList[1] != None):
        myAttr = flowFile[1].getAttribute('dates')
        flowFile = session.putAllAttributes(flowFile[0], myAttr)
        flowFile = session.write(flowFile, ModJSON())
        flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  }
}
session.transfer(flowFile, REL_SUCCESS)
session.commit()
1 REPLY 1
Highlighted

Re: How to merge two FlowFile's using custom script?

@Jorge Blanc

Seems like you want to merge the files in a particular order?

Don't have an account?