Created 09-01-2017 06:24 PM
I have two operators: `ListenHTTP` and `QueryElasticsearchHTTP`. The first operator triggers the execution of the second operator, so that finally I have two FlowFile's:
{"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}
{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
Both files have the field `eid` and they should be merged by this field. The expected result is this one:
{"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4}
So, I want to get a single FlowFile.
I tried to use `MergeContent`, but then I get this result, which is incorrect: {"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}}
Now I try to write a custom script in Python using `ExecuteScript`, but I am not sure how to perform the merging and tackle multiple FlowFile's.
Any help will be highly appreciated since I was unable to find the solution:
import json import java.io from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import StreamCallback class ModJSON(StreamCallback): def __init__(self): pass def process(self, inputStream, outputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) obj = json.loads(text) newObj = { "eid": obj['eid'], "zid": obj['zid'], // ... "dates": obj['dates'] } outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) flowFileList = session.get(2) if (!flowFileList.isEmpty()) { if (flowFileList[0] != None && flowFileList[1] != None): myAttr = flowFile[1].getAttribute('dates') flowFile = session.putAllAttributes(flowFile[0], myAttr) flowFile = session.write(flowFile, ModJSON()) flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') } } session.transfer(flowFile, REL_SUCCESS) session.commit()
Created 09-01-2017 06:47 PM
Seems like you want to merge the files in a particular order?