Support Questions
Find answers, ask questions, and share your expertise

Convert Hive result Set to Multi-Character delimited CSV - SelectHiveQl Processor NIFI

Convert Hive result Set to Multi-Character delimited CSV - SelectHiveQl Processor NIFI

Explorer

I am trying to fetch big full table with 10M+ records from hive using selectHiveQL Processor and did find that the converttoCSVStream() method in the source code takes longer time than fetching the result set . Observing the code : Result set is being iterated row by row and then added to the output stream .

When the table size is small it completes the process in seconds but as the data is large it takes longer time . Is there any way we can optimize the conversion ?? I have tried with the fetch size of 100000/1000/10000/1000 .

while (rs.next()) {
        	//logger.info("+++++++++++++Inside the While loop+++++++++++++++++");
            if (callback != null) {
                callback.processRow(rs);
            }
            
            List<String> rowValues = new ArrayList<>(nrOfColumns);
            
            for (int i = 1; i <= nrOfColumns; i++) {
                final int javaSqlType = meta.getColumnType(i);
                final Object value = rs.getObject(i);


                //logger.info("+++++++++++++Entering the Switch at +++++++++++++++++");
                switch (javaSqlType) {
                
                    case CHAR:
                    case LONGNVARCHAR:
                    case LONGVARCHAR:
                    case NCHAR:
                    case NVARCHAR:
                    case VARCHAR:
                        String valueString = rs.getString(i);
                        if (valueString != null) {
                            // Removed extra quotes as those are a part of the escapeCsv when required.
                            StringBuilder sb = new StringBuilder();
                            if (outputOptions.isQuote()) {
                                sb.append("\"");
                                if (outputOptions.isEscape()) {
                                    sb.append(StringEscapeUtils.escapeCsv(valueString));
                                } else {
                                    sb.append(valueString);
                                }
                                sb.append("\"");
                                rowValues.add(sb.toString());
                            } else {
                                if (outputOptions.isEscape()) {
                                    rowValues.add(StringEscapeUtils.escapeCsv(valueString));
                                } else {
                                    rowValues.add(valueString);
                                }
                            }
                        } else {
                            rowValues.add("");
                        }
                        break;
                    case ARRAY:
                    case STRUCT:
                    case JAVA_OBJECT:
                        String complexValueString = rs.getString(i);
                        if (complexValueString != null) {
                            rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
                        } else {
                            rowValues.add("");
                        }
                        break;
                    default:
                        if (value != null) {
                            rowValues.add(value.toString());
                        } else {
                            rowValues.add("");
                        }
                }
                //logger.info("+++++++++++++Exiting the Switch at +++++++++++++++++" + System.currentTimeMillis());
            }
            
            //List<String> rowValues1 = new ArrayList<>();
            //rowBuffer.add(StringUtils.join(rowValues, outputOptions.getDelimiter()));
            //rowBuffer.add("\n");
            // Write row values
            //logger.info("+++++++++++++Writing Row value at+++++++++++++++++" + System.currentTimeMillis());
            outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8));
            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
            nrOfRows++;
            
/*            if (rowBuffer.size() == BATCH_SIZE) {
            	writeToOutStream( outStream,rowBuffer );
              }  */


            if (maxRows > 0 && nrOfRows == maxRows)
                break;
        }
2 REPLIES 2
Highlighted

Re: Convert Hive result Set to Multi-Character delimited CSV - SelectHiveQl Processor NIFI

Super Guru

Improvements to SelectHiveQL's CSV output are covered in NIFI-5307 but have not yet been implemented. There has also been discussion (but no Jira that I know of) to allow incremental commits (i.e. sending out partial results as soon as they have been processed rather than sending all when the result set has been fully processed) such as has been done in QueryDatabaseTable (NIFI-4836).

Highlighted

Re: Convert Hive result Set to Multi-Character delimited CSV - SelectHiveQl Processor NIFI

Explorer

@Matt Burgess I did checked the code in the mentioned PR- NIFI-4836 .

I believe batching in QueryDatabaseTable is different to selectHiveQL processor where the bottle neck is to breakdown the resultset in batches .

Here is the code from NIFI-4836 which only releases the flow files if the batch size is reached .

This is an interesting problem statement to tune the hiveql processing .

 if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) {
                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
                            session.commit();
                            resultSetFlowFiles.clear();
                        }