Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

QueryDatabaseTable, importing based on postgres Timestamp

avatar
Contributor

Hi all,

I've got a Postgres table with columns, val text, created_at timestamp, updated_at timestamp. I've got a QueryDatabaseTable running on this table using updated_at as the "Maximum-value Columns" value.

It's returning some values over and over again as the precision on the timestamp query is being stored as timestamp(3) whereas the precision in the pg table is 6. I've checked the query it was running and it turned out something like:

select*from test where updated_at > '2016-08-09 15:52:14.731'; when the updated_at timestamp value in the table is '2016-08-09 15:52:14.731234', meaning the data gets returned over and over again.

Is there a work-around for this? I'm not sure if it's a limitation of the JDBC driver or with the way NiFi is storing the max value each time.

1 ACCEPTED SOLUTION

avatar
Master Guru

This is a bug in NiFi/HDF, recorded here. As you mentioned, due to limitations in the various JDBC drivers, it is usually expected that the returned values will adhere to the specification. In this case, a Timestamp value should have supported as much precision as feasible, hence the bug. In contrast however, for Time types this is not as consistent.

A possible workaround is to schedule the QueryDatabaseTable processor to run at intervals when you expect new data, and also maybe a DetectDuplicate processor somewhere in your flow.

View solution in original post

3 REPLIES 3

avatar
Master Guru

This is a bug in NiFi/HDF, recorded here. As you mentioned, due to limitations in the various JDBC drivers, it is usually expected that the returned values will adhere to the specification. In this case, a Timestamp value should have supported as much precision as feasible, hence the bug. In contrast however, for Time types this is not as consistent.

A possible workaround is to schedule the QueryDatabaseTable processor to run at intervals when you expect new data, and also maybe a DetectDuplicate processor somewhere in your flow.

avatar
Contributor

Matt - thanks for fixing the AbstractDatabaseFetchProcessor's timestamp bug that caused the timestamp truncation problem with the QueryDatabaseTable processor. I've encourtered this same timestamp truncation phenomena in both the ConvertJsonToSQL and the QueryDatabaseTable processors. The ColumnDescription method of ConvertJSONToSQL does not appear to use the lower level information within ResultSetMetaData's getPrecision method to get the real physical size of each column, as per the database service, in the result set (which contains the table's schema metadata). When the generateInsert and the generateUpdate methods execute, building the sql.args.N.values FlowFile attribute, these methods only have the data type size info from the generic ResultSet to establish the column's size: the size used to truncate the result set's timestamp values held in the JSON string. Perhaps, if the ColumnDescription method were extended to exploit the ResultSetMetaData.getPrecision() method the real physical column size can be applied when building the sql.args.N.values FlowFile attribute, thereby solving the timestamp data type value truncation issue.

avatar
Contributor

Here's the fix to the timestamp value truncations that I attempted to explain above:

public static ColumnDescription from(final ResultSet resultSet) throws SQLException {

final ResultSetMetaData md = resultSet.getMetaData();

List<String> columns = new ArrayList<>();

HashMap<String,Int> columncache = new HashMap<String,Int>(); // NEW - used to store column size, as per database service

for (int i = 1; i < md.getColumnCount() + 1; i++) {

columns.add(md.getColumnName(i));

columncache.put(md.getColumnName(i),md.getPrecision(i)); // NEW - get physical column size

}

final String columnName = resultSet.getString("COLUMN_NAME");

final int dataType = resultSet.getInt("DATA_TYPE");

//final int colSize = resultSet.getInt("COLUMN_SIZE");

final int colSize = columncache.get(columnName); // NEW

the rest of the constructor methods code has been omitted.