- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Creating and transferring new flowFile ends with 'transfer relationship not specified'
- Labels:
-
Apache NiFi
Created ‎11-28-2018 05:55 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I'm trying to read a JSON from `flowFile` and add the contents as attribute keys in the empty `updated_flowFile`, but I get `transfer relationship not specified` even though I'm specifying it.
from org.apache.commons.io import IOUtils from java.nio.charset import StandardCharsets from org.apache.nifi.processor.io import InputStreamCallback from org.apache.nifi.processor.io import OutputStreamCallback import json data = {} # Read contents of flowFile and write contents to data{} class PyInputStreamCallback(InputStreamCallback): def __init__(self): pass def process(self, inputStream): text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) global data data = json.loads(text) # Get incoming flowFile and call PyInputStreamCallback flowFile = session.get() if (flowFile != None): try: session.read(flowFile, PyInputStreamCallback()) global data # Create a blank flowfile, update the attributes with contents of data{} and and write it to session updated_flowFile = session.create() updated_flowFile = session.putAttribute(updated_flowFile, 'left', data['left']) updated_flowFile = session.putAttribute(updated_flowFile, 'top', data['top']) session.close(flowFile) session.transfer(updated_flowFile, REL_SUCCESS) except: session.close(updated_flowFile) session.transfer(flowFile, REL_FAILURE) else: session.transfer(flowFile, REL_FAILURE)
Alternatively, if there's a way to use the same flowFile object and wipe the JSON contents that would work too. I am doing a mergeContent later in my pipeline so I need the contents to be totally empty except for the attributes I'm adding.
Created ‎11-28-2018 06:52 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
A couple of notes:
1) There is no close() method on the session, I think you want remove().
2) In your try block you are creating and transferring updated_flowFile, but you don't transfer or remove flowFile. If you replace close() with remove(), it should fix the "transfer relationship not specified" issue.
3) In your except block, you might need to check for updated_flowFile != None before you remove it. If the exception occurs before the updated_flowFile is created, the remove() might fail.
4) Consider using EvaluateJsonPath to extract the "left" and "top" attributes, then ReplaceText to clear the content. Then your provenance/lineage chain will remain intact. Jython is fairly slow so you may see a performance improvement by using EvaluateJsonPath -> ReplaceText
Created ‎11-28-2018 06:52 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
A couple of notes:
1) There is no close() method on the session, I think you want remove().
2) In your try block you are creating and transferring updated_flowFile, but you don't transfer or remove flowFile. If you replace close() with remove(), it should fix the "transfer relationship not specified" issue.
3) In your except block, you might need to check for updated_flowFile != None before you remove it. If the exception occurs before the updated_flowFile is created, the remove() might fail.
4) Consider using EvaluateJsonPath to extract the "left" and "top" attributes, then ReplaceText to clear the content. Then your provenance/lineage chain will remain intact. Jython is fairly slow so you may see a performance improvement by using EvaluateJsonPath -> ReplaceText
Created ‎11-29-2018 06:57 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I figured there had to be a better way to do that, thanks @Matt Burgess!
Is there documentation on the different programming language API's that I'm missing? I've been working off of your excellent ExecuteScript cookbooks posted here, but beyond that I couldn't find in the documentation where I could have looked up something like session.remove().
Created ‎11-30-2018 06:23 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
We don't publish the javadocs for NiFi (yet), but you can get at them via javadoc.io (for example, the 1.8.0 nifi-api Javadocs are here). I've been thinking about adding a Part 4 to my cookbook, collecting examples from HCC articles and mailing list questions, to provide more coverage of the available APIs. Also check my blog, those articles are usually left-of-center usages of the API, perhaps to do things for which it wasn't necessarily intended, or to give external tools to help develop scripts (like my script tester, e.g.)
