Support Questions

Find answers, ask questions, and share your expertise

Why am I getting a Not a data file exception when I try to read input stream?

avatar
Explorer

I am trying to build a custom processor and have an avro record set incoming from a ConvertRecord processor which transforms my CSV to Avro. The processor says that conversion was done correctly , but when I try to unpack the DataFileStream of GenericRecord type into individual records the processor gives me an IOException that simply says 'Not a data file.'

My scala code is as follows:

override def onTrigger(context: ProcessContext, session: ProcessSession): Unit = {
  val sourceFlowFile = Option(session.get)
  sourceFlowFile match {
    case Some(f) =>
    //some property loading, processing and logging..
      val transferList = new ListBuffer[FlowFile]
      session.read(f, new InputStreamCallback {
        override def process(in: InputStream) {
          val stream: Try[DataFileStream[GenericRecord]] = Try(new DataFileStream(in, new GenericDatumReader[GenericRecord]()))
          stream match {
            case Failure(e) =>
              getLogger.error(s"${e.getMessage}: error reading from data file stream, passing to relationship:failure", e)
              failed = true
            case Success(dataStream) =>
              getLogger.info("Reading data from stream was a success, will proceed to writing")
              while (dataStream.hasNext) {
                //some processing

}

      if (failed) {
        session.transfer(f, RelFailure)
        return
      }
      for (recordFlowFile <- transferList) {
        writeToFlowFile(session, recordFlowFile)
      }
      getLogger.info("Writing was a success, passing list of generic records to success relationship")
      session.transfer(transferList, RelSuccess)
    case None =>
      //some error handling
  }
}

not sure why this is happening because the way I am trying to read the stream is the way they test the writer in the nifi codebase. This happens when I try to create the DataFileStream from the InputStream. The match always results in Failure, so what Success does is irrelevant at present. Am I misunderstanding the way the stream callbacks work? I have been checking the codebase for people doing similar things in their processors, but couldn't find much except for the tests. Please pardon my formatting I'm not really sure how to fix it.

1 ACCEPTED SOLUTION

avatar
Master Guru

Can you show the configuration for the AvroRecordSetWriter being used by ConvertRecord? Specifically, what is the Schema Write Strategy set to? I believe it would have to be set to "Embedded Avro Schema" in order for it to be read as a data file.

View solution in original post

4 REPLIES 4

avatar
Master Guru

Can you show the configuration for the AvroRecordSetWriter being used by ConvertRecord? Specifically, what is the Schema Write Strategy set to? I believe it would have to be set to "Embedded Avro Schema" in order for it to be read as a data file.

avatar
Explorer

Hi Brian, thank you for your response! That indeed made my match result in a success, but increased the size of the flowfile by quite a great deal, but at least I can use the AvroSchemaRegistry directly later on and I think I understand the DatumReader slightly better now.

avatar
Master Guru

No problem. Here is how the AvroReader in NiFi handles things...

If you choose "Schema Access Strategy" of "Use Embedded Avro Schema" then it uses this class which uses a datafile stream:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-seria...

If you choose one of the other "Schema Access Strategy" options, like from a schema registry, then it uses this class:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-seria...

So if you want your custom processor to support either option then you can model it after AvroReader:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-seria...

avatar
Explorer

Thanks again, it does seem like it would make more sense as a controller service since we will be using Record processors like ConvertRecord, Consume - and PublishKafkaRecord.