Support Questions

Find answers, ask questions, and share your expertise

NiFi java.lang.NullPointerException when reading csv file

avatar
New Contributor

image.pngHello All,

 

First of all, I would like to thank community. I am trying my hand in NiFi and I came across a situation that confuses me. I am trying to read a csv file in NiFi using ExecuteScript(yes, I want to use this) processor. All the good records must be collected in one flowFile and the bad records are collected in one flowFile each (this way, I will easily know how many bad records exist.) and then route them to SUCCESS and FAILURE relations accordingly.

 

The script works fine when all the records are good. I deliberately inserted one bad record to test exception and it is not working. As per my knowledge, NiFi should throw ValueError but it throws NullPointerException. Not sure what I am missing. I am new to this and learning. any guidance will be much appreciated.

 

Input:

1,2,3,4
1,A,3,4
1,2,3,4

 

Code

 

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback, OutputStreamCallback
import datetime


class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self, obj):
        self.obj = obj
  def process(self, outputStream):
    outputStream.write(self.obj+'\n')


flowFile = session.get()
goodflowFile = session.create(flowFile)
badrecords = []

if (flowFile != None):
	inputStream = session.read(flowFile)
	text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
	
	for line in text:
	
		try:
			test = str(int(line.split(",")[1])) #VAlue error for 2nd line should happen here. But the script throws NullPointerException
			goodflowFile = session.append(goodflowFile, PyOutputStreamCallback(test))
		
		except Exception as e:
			badrecord = session.create(flowFile)
			badrecord = session.write(badrecord,PyOutputStreamCallback(e))
			badrecords.append(badrecord)
	
	session.transfer(goodflowFile, REL_SUCCESS)
	session.transfer(badrecords, REL_FAILURE)

		
	inputStream.close()
	session.remove(flowFile)
	session.commit()

 

 

Also, I am trying to collect all bad records in a list and transfer the list to Failure relation. Let me know if this is right.

 

Thanks in advance.

@mburgess 

@Shu_ashu 

1 ACCEPTED SOLUTION

avatar
New Contributor

I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java.. 

View solution in original post

1 REPLY 1

avatar
New Contributor

I was able to solve it myself.. The error is misleading.. The actual error is date conversion in Jython.. I used datetime library from python but for some reason datatype is being converted to java.sql.Timestamp underneath instead of creating datetime object.. So I was not able to use methods of datatime object. I was able to solve it using calendar and SimpleDateFormat libraries from java..