Created 06-19-2018 11:15 AM
I am trying to fetch big full table with 10M+ records from hive using selectHiveQL Processor and did find that the converttoCSVStream() method in the source code takes longer time than fetching the result set . Observing the code : Result set is being iterated row by row and then added to the output stream .
When the table size is small it completes the process in seconds but as the data is large it takes longer time . Is there any way we can optimize the conversion ?? I have tried with the fetch size of 100000/1000/10000/1000 .
while (rs.next()) { //logger.info("+++++++++++++Inside the While loop+++++++++++++++++"); if (callback != null) { callback.processRow(rs); } List<String> rowValues = new ArrayList<>(nrOfColumns); for (int i = 1; i <= nrOfColumns; i++) { final int javaSqlType = meta.getColumnType(i); final Object value = rs.getObject(i); //logger.info("+++++++++++++Entering the Switch at +++++++++++++++++"); switch (javaSqlType) { case CHAR: case LONGNVARCHAR: case LONGVARCHAR: case NCHAR: case NVARCHAR: case VARCHAR: String valueString = rs.getString(i); if (valueString != null) { // Removed extra quotes as those are a part of the escapeCsv when required. StringBuilder sb = new StringBuilder(); if (outputOptions.isQuote()) { sb.append("\""); if (outputOptions.isEscape()) { sb.append(StringEscapeUtils.escapeCsv(valueString)); } else { sb.append(valueString); } sb.append("\""); rowValues.add(sb.toString()); } else { if (outputOptions.isEscape()) { rowValues.add(StringEscapeUtils.escapeCsv(valueString)); } else { rowValues.add(valueString); } } } else { rowValues.add(""); } break; case ARRAY: case STRUCT: case JAVA_OBJECT: String complexValueString = rs.getString(i); if (complexValueString != null) { rowValues.add(StringEscapeUtils.escapeCsv(complexValueString)); } else { rowValues.add(""); } break; default: if (value != null) { rowValues.add(value.toString()); } else { rowValues.add(""); } } //logger.info("+++++++++++++Exiting the Switch at +++++++++++++++++" + System.currentTimeMillis()); } //List<String> rowValues1 = new ArrayList<>(); //rowBuffer.add(StringUtils.join(rowValues, outputOptions.getDelimiter())); //rowBuffer.add("\n"); // Write row values //logger.info("+++++++++++++Writing Row value at+++++++++++++++++" + System.currentTimeMillis()); outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8)); outStream.write("\n".getBytes(StandardCharsets.UTF_8)); nrOfRows++; /* if (rowBuffer.size() == BATCH_SIZE) { writeToOutStream( outStream,rowBuffer ); } */ if (maxRows > 0 && nrOfRows == maxRows) break; }
Created 06-19-2018 04:52 PM
Improvements to SelectHiveQL's CSV output are covered in NIFI-5307 but have not yet been implemented. There has also been discussion (but no Jira that I know of) to allow incremental commits (i.e. sending out partial results as soon as they have been processed rather than sending all when the result set has been fully processed) such as has been done in QueryDatabaseTable (NIFI-4836).
Created 06-20-2018 04:18 AM
@Matt Burgess I did checked the code in the mentioned PR- NIFI-4836 .
I believe batching in QueryDatabaseTable is different to selectHiveQL processor where the bottle neck is to breakdown the resultset in batches .
Here is the code from NIFI-4836 which only releases the flow files if the batch size is reached .
This is an interesting problem statement to tune the hiveql processing .
if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { session.transfer(resultSetFlowFiles, REL_SUCCESS); session.commit(); resultSetFlowFiles.clear(); }