Support Questions

Find answers, ask questions, and share your expertise

Nifi Custom processor showing error “ the local variable flowfile cannot be assigned” when writing OutStreamCallback

Rising Star
I am trying to send the original flow file as input to the next processor, but ending up getting the error,  plz someone help out to get out this situation.
public void onTrigger(ProessorContext context, ProcessSessionsession) throws ProceException{
   FlowFile flowfile = session.get();
   if(flowfile == null){
        return;
    }

    ArrayList<String> headData = new ArrayList<String>();
    try{        session.read(flowfile, new InputStreamCallback(){

        final DBCPService= context.getProperty(CONNECTION_POOL).asContollerService(DBCPService.class);
        String query = " CREATE TABLE MODEL (";
        @SuppressWarnings("deprecation")
        public void process(InputStream inputStream) throws IOException {
        try{
            OPCPackage pkg = OPCPackage.open(inputStream);
            XSSFWorkbook workbook = new XSSFWorkbook(pkg);            workbook.getAllNames();
            String dateheader = "date"
            XSSFSheet sheetName = workbook.getSheet(0);
            Row row = sheetName.getRow(0);
            for(Cell cell: row) {
                switch (cell.getCellType()){
                case NUMERIC:
                    if(HSSDataUtil.isCellDateFromated(cell)){
                        DataFormatter dataFromatter = new DataFormatter();                        headData.add(dataFromatter.formatCellValue(cell);                        query +=dataFromatter.formatCellValue(cell)+" " + "INT" ;
                    }else{                        headData.add(String.valueOf(cell.getNumericCellValue()));
                    }
                    break;
                    case STRING:                        headData.add(cell.getStringCellValue());
                        if(cell.getStringCellValue().toLowerCase().contains(dateheader))                            query += cell.getStringCellValue() + " " + "TIMESTAMP,";
                        else                            query +=cell.getStringCellValue() + " + "VARCHAR(50),";
                            break;
                    case BOOLEAN:                        headData.add(String.valueOf(cell.getBooleanCellValue());
                        break;
                    default:                        headData.add("");
                        break;
                        }
                    }                    query = query.substring(0, query.length() -1);                    query += ")";                    workbook.close();
                    final Connection con = DBCPService.getConnection();
                    try{                        java.sql.PreparedStatement = con.prepareStatement(query);
                        PreparedStatement.execute();                        con.commit();                        session.transfer(flowfile, REL_SUCCESS);
                    }catch (SQL Exception e){                        e.printStackTrace();                        session.transfer(flowfile, REL_FAILURE);
                    }
                }catch(InvalidFromatException ife){                    getLogger().error(" only .xlsx excel files are supprted", ife);                    thrownew UnsupportedOperationException("Only .xlsx OOXML files are substring", ife);
                }
            }

        });
        {catch (RuntimeException ex) {            getLogger().error("Failed to process incoming Excel document. " + ex.getMessage(), ex);
            FlowFile failedFlowFile = session.putAttribute(flowfile, testxlsqlProcessor.class.getMessage());
        }
        final StringBuilder stringBuilder = new StringBuilder();        flowfile = session.write(flowfile, new StreamCallback(){
        public void process(InputStream in, OutputStream out) throws IOException{            stringBuilder.append(IOUtils.copy(in,out));
        }
    });

    }
}

The problamatic code ..
**********************************************************
final StringBuilder stringBuilder = new StringBuilder(); flowfile = session.write(flowfile, new StreamCallback(){ public void process(InputStream in, OutputStream out) throws IOException{ stringBuilder.append(IOUtils.copy(in,out)); } });

*****************************************************

If I don't add outputstream, I am getting the exception transfer relationship not specified.
1 REPLY 1

Rising Star

Is it possible that you can send clean code without confusing codelines and lost brackets?

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.