Support Questions
Find answers, ask questions, and share your expertise

I want to convert CSV To AvroSchema using Custom Processor

Hello...I am trying to convert my CSV file to get an AvroSchema by using my own custom processor which will get the file from the disk and converts it into flowfile and then passes it to the method named "inferAvroSchemaFromCSV" of the processor inferAvroSchema.After that i am getting my required schema successfully, which i am trying to transfer it into the success relationship so as to get on the disk, but i am getting following error -

"DBColumnMapper[id=3836e9a0-015b-1000-9d9b-3d14d4066516] Failed to infer Avro schema for StandardFlowFileRecord[uuid=785128f9-acea-4d1c-a3dc-ec1313c900a0,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1491380551546-1, container=default, section=1], offset=15934, length=797],offset=0,name=vikas,size=797] due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=785128f9-acea-4d1c-a3dc-ec1313c900a0,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1491380551546-1, container=default, section=1], offset=15934, length=797],offset=0,name=vikas,size=797] is not the most recent version of this FlowFile within this session (StandardProcessSession[id=12]): org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=785128f9-acea-4d1c-a3dc-ec1313c900a0,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1491380551546-1, container=default, section=1], offset=15934, length=797],offset=0,name=vikas,size=797] is not the most recent version of this FlowFile within this session (StandardProcessSession[id=12]) 14:20:53 ISTERROR3836e9a0-015b-1000-9d9b-3d14d4066516"

Following is my workflow template.png and my configuration of custom processor dbcolumnmapper-configuration.png

Here is the Java Code--

@Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { URI uri=null; String url=context.getProperty(INPUT_SCHEMA).getValue(); StringBuilder out = new StringBuilder(); try { uri = new URI(url); } catch (URISyntaxException e) { e.printStackTrace(); } Path schemaPath = new Path(uri); FileSystem fs; InputStream in=null; try { fs = schemaPath.getFileSystem(DefaultConfiguration.get()); in= fs.open(schemaPath); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line; while ((line = reader.readLine()) != null) { out.append(line); } System.out.println("csv file data"+out.toString()); //Prints the string content read from input stream reader.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } FlowFile flowFile=null; try{ //Writing csv data inside flow file for infer avro schema converter final byte[] dataByte=String.valueOf(out).getBytes(StandardCharsets.UTF_8); flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream output) throws IOException { output.write(dataByte); } }); //flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "text/plain"); //flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), "vikas"); final AtomicReference<String> avroSchema = new AtomicReference<>(); avroSchema.set(inferAvroSchemaFromCSV(flowFile, context, session)); FlowFile avroSchemaFF = null; avroSchemaFF = session.write(session.create(), new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(avroSchema.get().getBytes()); } }); avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE); avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.FILENAME.key(), "newFlowFile"); session.transfer(avroSchemaFF, REL_ORIGINAL); session.transfer(avroSchemaFF, SUCCESS); //session.transfer(avroSchemaFF, FAILURE); session.commit(); }catch (Exception ex) { getLogger().error("Failed to infer Avro schema for {} due to {}", new Object[] {flowFile, ex}); } }

3 REPLIES 3

It would be helpful if you edited the post and formatted the code portion using the code style, it is unreadable as is.

The error "is not the most recent version of this FlowFile within this session" means that somewhere in your processor you transferred a FlowFile reference, but that reference is no longer the most recent version of the FlowFile.

Here is an example that would produce this error:

FlowFile flowFile = session.get();
session.putAttribute(flowFile, "key", "value");
session.transfer(flowFile, MY_RELATIONSHIP); 

Because the result of session.putAttribute was not assigned back to flowFile, the transfer here is trying to transfer the reference from line 1 which is no longer the latest reference. The code should be:

FlowFile flowFile = session.get();
flowFile = session.putAttribute(flowFile, "key", "value");
session.transfer(flowFile, MY_RELATIONSHIP); 

Thanks @Bryan Bende , it worked for me. Now i am getting the following error after the line session.commit() in the following code ontriggermethod.txt -------

DBColumnMapper[id=43376208-015b-1000-8176-8a62b92462aa] DBColumnMapper[id=43376208-015b-1000-8176-8a62b92462aa] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=357d0770-3aea-4def-b607-e93f574593ea,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1491495241588-2, container=default, section=2], offset=10858, length=496],offset=0,name=187456790845166,size=496] transfer relationship not specified; rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=357d0770-3aea-4def-b607-e93f574593ea,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1491495241588-2, container=default, section=2], offset=10858, length=496],offset=0,name=187456790845166,size=496] transfer relationship not specified....

Thanks in Advance.

This means somewhere you created a flow file, or obtained on from the session, and the flow file was never transferred to a relationship or removed. Every flow file either needs to be sent somewhere via session.transfer() or needs to be deleted via session.remove().

; ;