Created on 09-01-2019 07:24 PM - edited 09-01-2019 07:26 PM
Hello All,
First of all, I would like to thank community. I am trying my hand in NiFi and I came across a situation that confuses me. I am trying to read a csv file in NiFi using ExecuteScript(yes, I want to use this) processor. All the good records must be collected in one flowFile and the bad records are collected in one flowFile each (this way, I will easily know how many bad records exist.) and then route them to SUCCESS and FAILURE relations accordingly.
The script works fine when all the records are good. I deliberately inserted one bad record to test exception and it is not working. As per my knowledge, NiFi should throw ValueError but it throws NullPointerException. Not sure what I am missing. I am new to this and learning. any guidance will be much appreciated.
Input:
1,2,3,4
1,A,3,4
1,2,3,4
Code
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback, OutputStreamCallback
import datetime
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self, obj):
self.obj = obj
def process(self, outputStream):
outputStream.write(self.obj+'\n')
flowFile = session.get()
goodflowFile = session.create(flowFile)
badrecords = []
if (flowFile != None):
inputStream = session.read(flowFile)
text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
for line in text:
try:
test = str(int(line.split(",")[1])) #VAlue error for 2nd line should happen here. But the script throws NullPointerException
goodflowFile = session.append(goodflowFile, PyOutputStreamCallback(test))
except Exception as e:
badrecord = session.create(flowFile)
badrecord = session.write(badrecord,PyOutputStreamCallback(e))
badrecords.append(badrecord)
session.transfer(goodflowFile, REL_SUCCESS)
session.transfer(badrecords, REL_FAILURE)
inputStream.close()
session.remove(flowFile)
session.commit()
Also, I am trying to collect all bad records in a list and transfer the list to Failure relation. Let me know if this is right.
Thanks in advance.
Created 09-05-2019 11:17 AM
I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java..
Created 09-05-2019 11:17 AM
I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java..