- select * from table
- preprocessing line by line
- return a new single flow file
so I decided to convert the above approach to the below way.
- get the row count from the user (let's assume the user gives 1000)
- execute select * query as resultSet
- read the results line by line (rs.next())
- when the line count reaches 1000 return the flow file and continues to other lines
So my approach is as below onTrigger
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
final Long rowLimit = context.getProperty(ProcessorUtils.MAX_RECORD).evaluateAttributeExpressions(flowFile).asLong();
Connection conn = DriverManager.getConnection(
// db connection properties
);
Statement stm = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = stm.executeQuery("sql query");
Map<String, String> flowFileAttributes = flowFile.getAttributes();
process(
rs,
session,
flowFileAttributes,
rowLimit,
);
FlowFile stateFlowFile = session.create();
session.putAttribute(stateFlowFile, "processing_status", "end");
session.putAttribute(stateFlowFile, "record_count", "0");
session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // working line
} catch (Exception e) {
logger.warn(" conn " + e);
session.transfer(flowFile, GPReaderProcessorUtils.FAILURE);
}
}
Recursion Approach for termination based on line count
private void process(ResultSet rs, ProcessSession session, Map<String, String> flowFileAttributes, Long rowLimit) throws SQLException {
try{
logger.info("-> start processing with row limit = " + rowLimit);
AtomicInteger mainI = new AtomicInteger(0);
FlowFile flowFile =
session.write(session.putAllAttributes(session.create(), flowFileAttributes), (OutputStream out) -> {
int i = 0;
Map<String, String> preProcessResults = null;
try {
String res = "";
while (i < rowLimit && rs.next()) {
//preprocessing happens here
i++;
mainI.set(i);
out.write(preprocess results.toString().getBytes(StandardCharsets.UTF_8));
}
}catch (SQLException e) {
e.printStackTrace();
}
}
logger.info("gp-log ->"+ (String.valueOf(i)));
out.close();
});
FlowFile stateFlowFile = session.create();
session.putAttribute(stateFlowFile, "processing_status", "processing");
session.putAttribute(stateFlowFile, "record_count", mainI.toString());
session.transfer(stateFlowFile, GPReaderProcessorUtils.STATUS); // state relationship
session.transfer(flowFile, GPReaderProcessorUtils.SUCCESS); // preprocessed flow files returns
if(!rs.isAfterLast() && mainI != 0 && !rs.isLast()){ // recurrsion call
logger.info("gp-log -> recursion call" );
process(rs, session,flowFileAttributes,column,rowLimit);
}
}catch (Exception e){
logger.info(e.getMessage());
logger.error(e.getMessage());
session.transfer(session.putAllAttributes(session.create(),flowFileAttributes), GPReaderProcessorUtils.FAILURE);
}
}
Expected Behaviour -> while processing this one return completed rows as flow files
Current Behaviour -> after finishing all return all flow files (generated in recursion) once.
please advise on this.