- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Why am I getting a Not a data file exception when I try to read input stream?
- Labels:
-
Apache NiFi
Created ‎08-15-2017 10:50 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I am trying to build a custom processor and have an avro record set incoming from a ConvertRecord processor which transforms my CSV to Avro. The processor says that conversion was done correctly , but when I try to unpack the DataFileStream of GenericRecord type into individual records the processor gives me an IOException that simply says 'Not a data file.'
My scala code is as follows:
override def onTrigger(context: ProcessContext, session: ProcessSession): Unit = { val sourceFlowFile = Option(session.get) sourceFlowFile match { case Some(f) => //some property loading, processing and logging.. val transferList = new ListBuffer[FlowFile] session.read(f, new InputStreamCallback { override def process(in: InputStream) { val stream: Try[DataFileStream[GenericRecord]] = Try(new DataFileStream(in, new GenericDatumReader[GenericRecord]())) stream match { case Failure(e) => getLogger.error(s"${e.getMessage}: error reading from data file stream, passing to relationship:failure", e) failed = true case Success(dataStream) => getLogger.info("Reading data from stream was a success, will proceed to writing") while (dataStream.hasNext) { //some processing } if (failed) { session.transfer(f, RelFailure) return } for (recordFlowFile <- transferList) { writeToFlowFile(session, recordFlowFile) } getLogger.info("Writing was a success, passing list of generic records to success relationship") session.transfer(transferList, RelSuccess) case None => //some error handling } }
not sure why this is happening because the way I am trying to read the stream is the way they test the writer in the nifi codebase. This happens when I try to create the DataFileStream from the InputStream. The match always results in Failure, so what Success does is irrelevant at present. Am I misunderstanding the way the stream callbacks work? I have been checking the codebase for people doing similar things in their processors, but couldn't find much except for the tests. Please pardon my formatting I'm not really sure how to fix it.
Created ‎08-15-2017 01:53 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you show the configuration for the AvroRecordSetWriter being used by ConvertRecord? Specifically, what is the Schema Write Strategy set to? I believe it would have to be set to "Embedded Avro Schema" in order for it to be read as a data file.
Created ‎08-15-2017 01:53 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you show the configuration for the AvroRecordSetWriter being used by ConvertRecord? Specifically, what is the Schema Write Strategy set to? I believe it would have to be set to "Embedded Avro Schema" in order for it to be read as a data file.
Created ‎08-15-2017 02:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Brian, thank you for your response! That indeed made my match result in a success, but increased the size of the flowfile by quite a great deal, but at least I can use the AvroSchemaRegistry directly later on and I think I understand the DatumReader slightly better now.
Created ‎08-15-2017 02:29 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
No problem. Here is how the AvroReader in NiFi handles things...
If you choose "Schema Access Strategy" of "Use Embedded Avro Schema" then it uses this class which uses a datafile stream:
If you choose one of the other "Schema Access Strategy" options, like from a schema registry, then it uses this class:
So if you want your custom processor to support either option then you can model it after AvroReader:
Created ‎08-16-2017 08:42 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks again, it does seem like it would make more sense as a controller service since we will be using Record processors like ConvertRecord, Consume - and PublishKafkaRecord.
