Member since
09-01-2019
3
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5769 | 09-05-2019 11:17 AM |
09-05-2019
11:17 AM
1 Kudo
I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java..
... View more
09-01-2019
07:24 PM
Hello All, First of all, I would like to thank community. I am trying my hand in NiFi and I came across a situation that confuses me. I am trying to read a csv file in NiFi using ExecuteScript(yes, I want to use this) processor. All the good records must be collected in one flowFile and the bad records are collected in one flowFile each (this way, I will easily know how many bad records exist.) and then route them to SUCCESS and FAILURE relations accordingly. The script works fine when all the records are good. I deliberately inserted one bad record to test exception and it is not working. As per my knowledge, NiFi should throw ValueError but it throws NullPointerException. Not sure what I am missing. I am new to this and learning. any guidance will be much appreciated. Input: 1,2,3,4 1,A,3,4 1,2,3,4 Code import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback, OutputStreamCallback
import datetime
class PyOutputStreamCallback(OutputStreamCallback):
def __init__(self, obj):
self.obj = obj
def process(self, outputStream):
outputStream.write(self.obj+'\n')
flowFile = session.get()
goodflowFile = session.create(flowFile)
badrecords = []
if (flowFile != None):
inputStream = session.read(flowFile)
text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
for line in text:
try:
test = str(int(line.split(",")[1])) #VAlue error for 2nd line should happen here. But the script throws NullPointerException
goodflowFile = session.append(goodflowFile, PyOutputStreamCallback(test))
except Exception as e:
badrecord = session.create(flowFile)
badrecord = session.write(badrecord,PyOutputStreamCallback(e))
badrecords.append(badrecord)
session.transfer(goodflowFile, REL_SUCCESS)
session.transfer(badrecords, REL_FAILURE)
inputStream.close()
session.remove(flowFile)
session.commit() Also, I am trying to collect all bad records in a list and transfer the list to Failure relation. Let me know if this is right. Thanks in advance. @mburgess @Shu_ashu
... View more
Labels:
- Labels:
-
Apache NiFi