Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

NiFi: Write to custom delimitted file

Solved Go to solution

NiFi: Write to custom delimitted file

Rising Star

After querying from Hive, I wanted to write into a file with a custom delimiter say |, is there a way to achieve that? I cannot use Replacetext to replace, with the custom delimiter as the column values may have comma in it.

One other option I see is have query producing single string with custom delimiter. But it encloses the column values with the double quotes for the string type columns and that voids the required fileformat.

1 ACCEPTED SOLUTION

Accepted Solutions

Re: NiFi: Write to custom delimitted file

Super Guru

Here is what i suggest. the code is simple.

The processor calls HiveJdbcCommon.convertToCsvStream which is a custom class built

public void process(final OutputStream out) throws IOException {
                    try {
                        logger.debug("Executing query {}", new Object[]{selectQuery});
                        final ResultSet resultSet = st.executeQuery(selectQuery);
                        if (AVRO.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
                        } else if (CSV.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out));
                        } else {
                            nrOfRows.set(0L);
                            throw new ProcessException("Unsupported output format: " + outputFormat);
                        }

The custom class is here

public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
            throws SQLException, IOException {


        final ResultSetMetaData meta = rs.getMetaData();
        final int nrOfColumns = meta.getColumnCount();
        List<String> columnNames = new ArrayList<>(nrOfColumns);


        for (int i = 1; i <= nrOfColumns; i++) {
            String columnNameFromMeta = meta.getColumnName(i);
            // Hive returns table.column for column name. Grab the column name as the string after the last period
            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
        }


        // Write column names as header row
        outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
        outStream.write("\n".getBytes(StandardCharsets.UTF_8));


        // Iterate over the rows
        long nrOfRows = 0;
        while (rs.next()) {
            if (callback != null) {
                callback.processRow(rs);
            }
            List<String> rowValues = new ArrayList<>(nrOfColumns);
            for (int i = 1; i <= nrOfColumns; i++) {
                final int javaSqlType = meta.getColumnType(i);
                final Object value = rs.getObject(i);


                switch (javaSqlType) {
                    case CHAR:
                    case LONGNVARCHAR:
                    case LONGVARCHAR:
                    case NCHAR:
                    case NVARCHAR:
                    case VARCHAR:
                        String valueString = rs.getString(i);
                        if (valueString != null) {
                            rowValues.add("\"" + StringEscapeUtils.escapeCsv(valueString) + "\"");
                        } else {
                            rowValues.add("");
                        }
                        break;
                    case ARRAY:
                    case STRUCT:
                    case JAVA_OBJECT:
                        String complexValueString = rs.getString(i);
                        if (complexValueString != null) {
                            rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
                        } else {
                            rowValues.add("");
                        }
                        break;
                    default:
                        if (value != null) {
                            rowValues.add(value.toString());
                        } else {
                            rowValues.add("");
                        }
                }
            }
            // Write row values
            outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
            nrOfRows++;
        }
        return nrOfRows;
    }

The code is very simple. Basically where it add a comma, you an replace with your delimiter. build the nar (very simple) and there you go. in reality I believe this code can be easily enhanced by accepting a input parameter ie delimiter, and the processor would spit out the result set in the delimiter you have identified. If I have a hour or so next week I will post the custom nar info here.

5 REPLIES 5

Re: NiFi: Write to custom delimitted file

Would it be possible to get the result as an AVRO file and work from there?

Highlighted

Re: NiFi: Write to custom delimitted file

Rising Star

The only option with Avro available is to split into JSON, but there on any thoughts on how that can be emitted in to a delimited format?

Re: NiFi: Write to custom delimitted file

Expert Contributor

you could use a simple python script with executescript to achieve that.

http://funnifi.blogspot.com/2016/03/executescript-json-to-json-revisited_14.html

Re: NiFi: Write to custom delimitted file

Super Guru

Here is what i suggest. the code is simple.

The processor calls HiveJdbcCommon.convertToCsvStream which is a custom class built

public void process(final OutputStream out) throws IOException {
                    try {
                        logger.debug("Executing query {}", new Object[]{selectQuery});
                        final ResultSet resultSet = st.executeQuery(selectQuery);
                        if (AVRO.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToAvroStream(resultSet, out));
                        } else if (CSV.equals(outputFormat)) {
                            nrOfRows.set(HiveJdbcCommon.convertToCsvStream(resultSet, out));
                        } else {
                            nrOfRows.set(0L);
                            throw new ProcessException("Unsupported output format: " + outputFormat);
                        }

The custom class is here

public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
            throws SQLException, IOException {


        final ResultSetMetaData meta = rs.getMetaData();
        final int nrOfColumns = meta.getColumnCount();
        List<String> columnNames = new ArrayList<>(nrOfColumns);


        for (int i = 1; i <= nrOfColumns; i++) {
            String columnNameFromMeta = meta.getColumnName(i);
            // Hive returns table.column for column name. Grab the column name as the string after the last period
            int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
            columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
        }


        // Write column names as header row
        outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
        outStream.write("\n".getBytes(StandardCharsets.UTF_8));


        // Iterate over the rows
        long nrOfRows = 0;
        while (rs.next()) {
            if (callback != null) {
                callback.processRow(rs);
            }
            List<String> rowValues = new ArrayList<>(nrOfColumns);
            for (int i = 1; i <= nrOfColumns; i++) {
                final int javaSqlType = meta.getColumnType(i);
                final Object value = rs.getObject(i);


                switch (javaSqlType) {
                    case CHAR:
                    case LONGNVARCHAR:
                    case LONGVARCHAR:
                    case NCHAR:
                    case NVARCHAR:
                    case VARCHAR:
                        String valueString = rs.getString(i);
                        if (valueString != null) {
                            rowValues.add("\"" + StringEscapeUtils.escapeCsv(valueString) + "\"");
                        } else {
                            rowValues.add("");
                        }
                        break;
                    case ARRAY:
                    case STRUCT:
                    case JAVA_OBJECT:
                        String complexValueString = rs.getString(i);
                        if (complexValueString != null) {
                            rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
                        } else {
                            rowValues.add("");
                        }
                        break;
                    default:
                        if (value != null) {
                            rowValues.add(value.toString());
                        } else {
                            rowValues.add("");
                        }
                }
            }
            // Write row values
            outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
            outStream.write("\n".getBytes(StandardCharsets.UTF_8));
            nrOfRows++;
        }
        return nrOfRows;
    }

The code is very simple. Basically where it add a comma, you an replace with your delimiter. build the nar (very simple) and there you go. in reality I believe this code can be easily enhanced by accepting a input parameter ie delimiter, and the processor would spit out the result set in the delimiter you have identified. If I have a hour or so next week I will post the custom nar info here.

Re: NiFi: Write to custom delimitted file

Rising Star

Thanks, that appears to be relatively simple change. I will give a try