Member since
07-11-2017
8
Posts
0
Kudos Received
0
Solutions
02-18-2019
01:25 PM
Also I'm pretty sure I wasn't having this problem over the weekend
... View more
02-18-2019
01:22 PM
Hey Matt, I think something is going wrong with the rendering of this page, perhaps it's just on my side, but the article stops after the first code snippet and it looks like some of the other article use cases is in there so it might just be some bad markup somewhere? Please confirm whether it's not just on my side.
... View more
02-18-2019
01:15 PM
Thank you! Also just for interest's sake and hope you don't mind answering here, but when will you guys be moving to JDK 11?
... View more
02-15-2019
08:58 AM
Hey Matt, great series of articles. I have learnt everything I know about ExecuteScript from here and would recommend it to anyone. I was just curious about whether you guys still use Nashorn as the JS Engine though?
... View more
08-16-2017
08:42 AM
Thanks again, it does seem like it would make more sense as a controller service since we will be using Record processors like ConvertRecord, Consume - and PublishKafkaRecord.
... View more
08-15-2017
02:21 PM
Hi Brian, thank you for your response! That indeed made my match result in a success, but increased the size of the flowfile by quite a great deal, but at least I can use the AvroSchemaRegistry directly later on and I think I understand the DatumReader slightly better now.
... View more
08-15-2017
10:50 AM
I am trying to build a custom processor and have an avro record set incoming from a ConvertRecord processor which transforms my CSV to Avro. The processor says that conversion was done correctly , but when I try to unpack the DataFileStream of GenericRecord type into individual records the processor gives me an IOException that simply says 'Not a data file.' My scala code is as follows: override def onTrigger(context: ProcessContext, session: ProcessSession): Unit = {
val sourceFlowFile = Option(session.get)
sourceFlowFile match {
case Some(f) =>
//some property loading, processing and logging..
val transferList = new ListBuffer[FlowFile]
session.read(f, new InputStreamCallback {
override def process(in: InputStream) {
val stream: Try[DataFileStream[GenericRecord]] = Try(new DataFileStream(in, new GenericDatumReader[GenericRecord]()))
stream match {
case Failure(e) =>
getLogger.error(s"${e.getMessage}: error reading from data file stream, passing to relationship:failure", e)
failed = true
case Success(dataStream) =>
getLogger.info("Reading data from stream was a success, will proceed to writing")
while (dataStream.hasNext) {
//some processing
}
if (failed) {
session.transfer(f, RelFailure)
return
}
for (recordFlowFile <- transferList) {
writeToFlowFile(session, recordFlowFile)
}
getLogger.info("Writing was a success, passing list of generic records to success relationship")
session.transfer(transferList, RelSuccess)
case None =>
//some error handling
}
} not sure why this is happening because the way I am trying to read the stream is the way they test the writer in the nifi codebase. This happens when I try to create the DataFileStream from the InputStream. The match always results in Failure, so what Success does is irrelevant at present. Am I misunderstanding the way the stream callbacks work? I have been checking the codebase for people doing similar things in their processors, but couldn't find much except for the tests. Please pardon my formatting I'm not really sure how to fix it.
... View more
Labels:
- Labels:
-
Apache NiFi
07-11-2017
08:36 AM
@Jeff Storck Had the same issue and got the nifi running by just deleting my custom processor. I generate the nar using Maven in IntelliJ and then just move said nar to the lib directory. Downloaded the 1.3.0 sources yesterday and before that I wasn't having any issues with custom processors in 1.2.0, not sure what or where the work directory you're referring to is. I am running Arch and there is no nifi folder in /var/lib.
... View more