Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi java.lang.NullPointerException when reading csv file

avatar
New Contributor

image.pngHello All,

 

First of all, I would like to thank community. I am trying my hand in NiFi and I came across a situation that confuses me. I am trying to read a csv file in NiFi using ExecuteScript(yes, I want to use this) processor. All the good records must be collected in one flowFile and the bad records are collected in one flowFile each (this way, I will easily know how many bad records exist.) and then route them to SUCCESS and FAILURE relations accordingly.

 

The script works fine when all the records are good. I deliberately inserted one bad record to test exception and it is not working. As per my knowledge, NiFi should throw ValueError but it throws NullPointerException. Not sure what I am missing. I am new to this and learning. any guidance will be much appreciated.

 

Input:

1,2,3,4
1,A,3,4
1,2,3,4

 

Code

 

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback, OutputStreamCallback
import datetime


class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self, obj):
        self.obj = obj
  def process(self, outputStream):
    outputStream.write(self.obj+'\n')


flowFile = session.get()
goodflowFile = session.create(flowFile)
badrecords = []

if (flowFile != None):
	inputStream = session.read(flowFile)
	text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
	
	for line in text:
	
		try:
			test = str(int(line.split(",")[1])) #VAlue error for 2nd line should happen here. But the script throws NullPointerException
			goodflowFile = session.append(goodflowFile, PyOutputStreamCallback(test))
		
		except Exception as e:
			badrecord = session.create(flowFile)
			badrecord = session.write(badrecord,PyOutputStreamCallback(e))
			badrecords.append(badrecord)
	
	session.transfer(goodflowFile, REL_SUCCESS)
	session.transfer(badrecords, REL_FAILURE)

		
	inputStream.close()
	session.remove(flowFile)
	session.commit()

 

 

Also, I am trying to collect all bad records in a list and transfer the list to Failure relation. Let me know if this is right.

 

Thanks in advance.

@mburgess 

@Shu_ashu 

1 ACCEPTED SOLUTION

avatar
New Contributor

I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java.. 

View solution in original post

1 REPLY 1

avatar
New Contributor

I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java..