Member since
09-01-2017
1
Post
0
Kudos Received
0
Solutions
09-01-2017
06:24 PM
I have two operators: `ListenHTTP` and `QueryElasticsearchHTTP`. The first operator triggers the execution of the second operator, so that finally I have two FlowFile's: {"eid":"1","zid":1,"latitude":38.3,"longitude":2.4} {"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}} Both files have the field `eid` and they should be merged by this field. The expected result is this one: {"event-date":["2017-08-08T00:00:00","2017-02-23T00:00:00"],"eid":1,"zid":1,"latitude":38.3,"longitude":2.4} So, I want to get a single FlowFile. I tried to use `MergeContent`, but then I get this result, which is incorrect: {"eid":"1","zid":1,"latitude":38.3,"longitude":2.4}{"eid":"1","dates":{"event-date":["2017-08-08","2017-02-23"]}} Now I try to write a custom script in Python using `ExecuteScript`, but I am not sure how to perform the merging and tackle multiple FlowFile's. Any help will be highly appreciated since I was unable to find the solution: import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
newObj = {
"eid": obj['eid'],
"zid": obj['zid'],
// ...
"dates": obj['dates']
}
outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8')))
flowFileList = session.get(2)
if (!flowFileList.isEmpty()) {
if (flowFileList[0] != None && flowFileList[1] != None):
myAttr = flowFile[1].getAttribute('dates')
flowFile = session.putAllAttributes(flowFile[0], myAttr)
flowFile = session.write(flowFile, ModJSON())
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
}
}
session.transfer(flowFile, REL_SUCCESS)
session.commit()
... View more
Labels: