Support Questions

Find answers, ask questions, and share your expertise

NiFi: Write to custom delimitted file

avatar
Expert Contributor

After querying from Hive, I wanted to write into a file with a custom delimiter say |, is there a way to achieve that? I cannot use Replacetext to replace, with the custom delimiter as the column values may have comma in it.

One other option I see is have query producing single string with custom delimiter. But it encloses the column values with the double quotes for the string type columns and that voids the required fileformat.

1 ACCEPTED SOLUTION

avatar
Master Guru

Here is what i suggest. the code is simple.

The processor calls HiveJdbcCommon.convertToCsvStream which is a custom class built

public void process(final OutputStream out) throws IOException {
                    try {
                        logger.debug("Executing query {}", new Object[]{selectQuery});
                        final ResultSet resultSet = st.executeQuery(selectQuery);
                        if (AVRO.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
                        } else if (CSV.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out));
                        } else {
                            nrOfRows.set(0L);
                            throw new ProcessException("Unsupported output format: " + outputFormat);
                        }

The custom class is here

public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
            throws SQLException, IOException {


        final ResultSetMetaData meta = rs.getMetaData();
        final int nrOfColumns = meta.getColumnCount();
        List<String> columnNames = new ArrayList<>(nrOfColumns);


        for (int i = 1; i <= nrOfColumns; i++) {
            String columnNameFromMeta = meta.getColumnName(i);
            // Hive returns table.column for column name. Grab the column name as the string after the last period
            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
        }


        // Write column names as header row
        outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
        outStream.write("\n".getBytes(StandardCharsets.UTF_8));


        // Iterate over the rows
        long nrOfRows = 0;
        while (rs.next()) {
            if (callback != null) {
                callback.processRow(rs);
            }
            List<String> rowValues = new ArrayList<>(nrOfColumns);
            for (int i = 1; i <= nrOfColumns; i++) {
                final int javaSqlType = meta.getColumnType(i);
                final Object value = rs.getObject(i);


                switch (javaSqlType) {
                    case CHAR:
                    case LONGNVARCHAR:
                    case LONGVARCHAR:
                    case NCHAR:
                    case NVARCHAR:
                    case VARCHAR:
                        String valueString = rs.getString(i);
                        if (valueString != null) {
                            rowValues.add("\"" + StringEscapeUtils.escapeCsv(valueString) + "\"");
                        } else {
                            rowValues.add("");
                        }
                        break;
                    case ARRAY:
                    case STRUCT:
                    case JAVA_OBJECT:
                        String complexValueString = rs.getString(i);
                        if (complexValueString != null) {
                            rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
                        } else {
                            rowValues.add("");
                        }
                        break;
                    default:
                        if (value != null) {
                            rowValues.add(value.toString());
                        } else {
                            rowValues.add("");
                        }
                }
            }
            // Write row values
            outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
            nrOfRows++;
        }
        return nrOfRows;
    }

The code is very simple. Basically where it add a comma, you an replace with your delimiter. build the nar (very simple) and there you go. in reality I believe this code can be easily enhanced by accepting a input parameter ie delimiter, and the processor would spit out the result set in the delimiter you have identified. If I have a hour or so next week I will post the custom nar info here.

View solution in original post

5 REPLIES 5

avatar

Would it be possible to get the result as an AVRO file and work from there?

avatar
Expert Contributor

The only option with Avro available is to split into JSON, but there on any thoughts on how that can be emitted in to a delimited format?

avatar
Super Collaborator

you could use a simple python script with executescript to achieve that.

http://funnifi.blogspot.com/2016/03/executescript-json-to-json-revisited_14.html

avatar
Master Guru

Here is what i suggest. the code is simple.

The processor calls HiveJdbcCommon.convertToCsvStream which is a custom class built

public void process(final OutputStream out) throws IOException {
                    try {
                        logger.debug("Executing query {}", new Object[]{selectQuery});
                        final ResultSet resultSet = st.executeQuery(selectQuery);
                        if (AVRO.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
                        } else if (CSV.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out));
                        } else {
                            nrOfRows.set(0L);
                            throw new ProcessException("Unsupported output format: " + outputFormat);
                        }

The custom class is here

public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
            throws SQLException, IOException {


        final ResultSetMetaData meta = rs.getMetaData();
        final int nrOfColumns = meta.getColumnCount();
        List<String> columnNames = new ArrayList<>(nrOfColumns);


        for (int i = 1; i <= nrOfColumns; i++) {
            String columnNameFromMeta = meta.getColumnName(i);
            // Hive returns table.column for column name. Grab the column name as the string after the last period
            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
        }


        // Write column names as header row
        outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
        outStream.write("\n".getBytes(StandardCharsets.UTF_8));


        // Iterate over the rows
        long nrOfRows = 0;
        while (rs.next()) {
            if (callback != null) {
                callback.processRow(rs);
            }
            List<String> rowValues = new ArrayList<>(nrOfColumns);
            for (int i = 1; i <= nrOfColumns; i++) {
                final int javaSqlType = meta.getColumnType(i);
                final Object value = rs.getObject(i);


                switch (javaSqlType) {
                    case CHAR:
                    case LONGNVARCHAR:
                    case LONGVARCHAR:
                    case NCHAR:
                    case NVARCHAR:
                    case VARCHAR:
                        String valueString = rs.getString(i);
                        if (valueString != null) {
                            rowValues.add("\"" + StringEscapeUtils.escapeCsv(valueString) + "\"");
                        } else {
                            rowValues.add("");
                        }
                        break;
                    case ARRAY:
                    case STRUCT:
                    case JAVA_OBJECT:
                        String complexValueString = rs.getString(i);
                        if (complexValueString != null) {
                            rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
                        } else {
                            rowValues.add("");
                        }
                        break;
                    default:
                        if (value != null) {
                            rowValues.add(value.toString());
                        } else {
                            rowValues.add("");
                        }
                }
            }
            // Write row values
            outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
            nrOfRows++;
        }
        return nrOfRows;
    }

The code is very simple. Basically where it add a comma, you an replace with your delimiter. build the nar (very simple) and there you go. in reality I believe this code can be easily enhanced by accepting a input parameter ie delimiter, and the processor would spit out the result set in the delimiter you have identified. If I have a hour or so next week I will post the custom nar info here.

avatar
Expert Contributor

Thanks, that appears to be relatively simple change. I will give a try