FlowFile incomingAvro = session.get(); if (incomingAvro == null) { System.out.println("incoming avro is null"); return; } URI uri=null; String url=context.getProperty(INPUT_SCHEMA).getValue(); StringBuilder out = new StringBuilder(); try { uri = new URI(url); } catch (URISyntaxException e) { e.printStackTrace(); } Path schemaPath = new Path(uri); FileSystem fs; InputStream in=null; try { fs = schemaPath.getFileSystem(DefaultConfiguration.get()); in= fs.open(schemaPath); BufferedReader reader = new BufferedReader(new InputStreamReader(in)); String line; while ((line = reader.readLine()) != null) { out.append(line); out.append("\n"); } System.out.println("csv file data"+out.toString()); //Prints the string content read from input stream reader.close(); } catch (IOException e) { // TODO Auto-generated catch block System.out.println("error occured"+e.getMessage()); e.printStackTrace(); } FlowFile flowFile=null; try{ //Writing csv data inside flow file for infer avro schema converter final byte[] dataByte=String.valueOf(out).getBytes(StandardCharsets.UTF_8); flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(final OutputStream output) throws IOException { output.write(dataByte); } }); //flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "text/plain"); //flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), "vikas"); final AtomicReference avroSchema = new AtomicReference<>(); avroSchema.set(inferAvroSchemaFromCSV(flowFile, context, session)); FlowFile avroSchemaFF = null; avroSchemaFF = session.write(session.create(), new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(avroSchema.get().getBytes()); } }); avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE); avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.FILENAME.key(), "newFlowFile"); session.transfer(avroSchemaFF, SUCCESS); session.commit();