Created 08-15-2017 10:50 AM
I am trying to build a custom processor and have an avro record set incoming from a ConvertRecord processor which transforms my CSV to Avro. The processor says that conversion was done correctly , but when I try to unpack the DataFileStream of GenericRecord type into individual records the processor gives me an IOException that simply says 'Not a data file.'
My scala code is as follows:
override def onTrigger(context: ProcessContext, session: ProcessSession): Unit = { val sourceFlowFile = Option(session.get) sourceFlowFile match { case Some(f) => //some property loading, processing and logging.. val transferList = new ListBuffer[FlowFile] session.read(f, new InputStreamCallback { override def process(in: InputStream) { val stream: Try[DataFileStream[GenericRecord]] = Try(new DataFileStream(in, new GenericDatumReader[GenericRecord]())) stream match { case Failure(e) => getLogger.error(s"${e.getMessage}: error reading from data file stream, passing to relationship:failure", e) failed = true case Success(dataStream) => getLogger.info("Reading data from stream was a success, will proceed to writing") while (dataStream.hasNext) { //some processing } if (failed) { session.transfer(f, RelFailure) return } for (recordFlowFile <- transferList) { writeToFlowFile(session, recordFlowFile) } getLogger.info("Writing was a success, passing list of generic records to success relationship") session.transfer(transferList, RelSuccess) case None => //some error handling } }
not sure why this is happening because the way I am trying to read the stream is the way they test the writer in the nifi codebase. This happens when I try to create the DataFileStream from the InputStream. The match always results in Failure, so what Success does is irrelevant at present. Am I misunderstanding the way the stream callbacks work? I have been checking the codebase for people doing similar things in their processors, but couldn't find much except for the tests. Please pardon my formatting I'm not really sure how to fix it.
Created 08-15-2017 01:53 PM
Can you show the configuration for the AvroRecordSetWriter being used by ConvertRecord? Specifically, what is the Schema Write Strategy set to? I believe it would have to be set to "Embedded Avro Schema" in order for it to be read as a data file.
Created 08-15-2017 01:53 PM
Can you show the configuration for the AvroRecordSetWriter being used by ConvertRecord? Specifically, what is the Schema Write Strategy set to? I believe it would have to be set to "Embedded Avro Schema" in order for it to be read as a data file.
Created 08-15-2017 02:21 PM
Hi Brian, thank you for your response! That indeed made my match result in a success, but increased the size of the flowfile by quite a great deal, but at least I can use the AvroSchemaRegistry directly later on and I think I understand the DatumReader slightly better now.
Created 08-15-2017 02:29 PM
No problem. Here is how the AvroReader in NiFi handles things...
If you choose "Schema Access Strategy" of "Use Embedded Avro Schema" then it uses this class which uses a datafile stream:
If you choose one of the other "Schema Access Strategy" options, like from a schema registry, then it uses this class:
So if you want your custom processor to support either option then you can model it after AvroReader:
Created 08-16-2017 08:42 AM
Thanks again, it does seem like it would make more sense as a controller service since we will be using Record processors like ConvertRecord, Consume - and PublishKafkaRecord.