Member since
11-16-2015
911
Posts
668
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 695 | 09-30-2025 05:23 AM | |
| 1069 | 06-26-2025 01:21 PM | |
| 929 | 06-19-2025 02:48 PM | |
| 1093 | 05-30-2025 01:53 PM | |
| 12258 | 02-22-2024 12:38 PM |
07-24-2018
09:57 PM
I'm not sure what you meant about changing the "-" to "_" with InferAvroSchema and such, but here's a different approach (assuming you have HDF 3.1 / Apache NiFi 1.5.0+) so you can use the "correct" Avro schema even though the field names have Avro-invalid characters: Create an AvroSchemaRegistry controller service, and add a property called "mySchema" or whatever you want to call it, with the original schema as the value. Then set the "Validate Field Names" property to false (this was added in NiFi 1.5.0 via NIFI-4612), this will allow field names such as "general-1" without throwing an error. Then in your JsonTreeReader you can have an access strategy of "Use Schema Name", specifying your AvroSchemaRegistry in the "Schema Registry" property and "mySchema" as the value for the "Schema Name" property. The JSON writer can Inherit Schema so you don't need to put the schema in there either. When the schema comes from the schema registry with Validate Field Names set to false, you can use it even when the field names are not Avro-valid.
... View more
07-24-2018
04:12 PM
2 Kudos
Try ListDatabaseTables -> GenerateTableFetch -> RemoteProcessGroup -> Input Port -> ExecuteSQL in place of QueryDatabaseTable. ListDatabaseTables will give you an empty flow file per table, with attributes set on the flow file such as table name. QueryDatabaseTable (QDT) doesn't allow incoming connections, but GenerateTableFetch (GTF) does. The biggest difference between QDT and GTF is that QDT generates and executes the SQL statements, whereas GTF just generates the SQL statements. This allows you to send to a Remote Process Group that points back to an Input Port on your NiFi cluster/instance. The RPG->Input Port pattern distributes the flow files among the nodes in your cluster. If you have a standalone NiFi instance, you won't need the RPG->Input Port part, but note that since there is only one node, the tables will not be fetched in parallel (but perhaps concurrently if you set Max Concurrent Tasks for ExecuteSQL).
... View more
07-23-2018
12:49 PM
1 Kudo
You can use a JOIN clause in the select statement, but it will only work for a single RDBMS. You may find you can join two tables from two databases/schemas in the same RDBMS (if that system lets you), but you can't currently join two tables from totally separate database systems. You could investigate Presto, it allows for joining of tables across multiple systems, and you could have a single connection to it from NiFi in ExecuteSQL. That way it will look like a single RDBMS to NiFi, but Presto can be configured to do the cross-DB join.
... View more
07-12-2018
11:25 PM
1 Kudo
Note that since your format is not JSON nor JSON-per-line, you will have to do further processing before using any processors (record-based or not) that handle JSON. As of NiFi 1.7.0 (via NIFI-4456) the JsonTreeReader (and writer) allow for JSON-per-line, but your format is not exactly that either. If the existing processors or controller services (i.e. readers/writers) don't work, you might have to resort to a ScriptedRecordReader/Writer or a scripting processor to do custom handling.
... View more
07-12-2018
11:23 PM
1 Kudo
For that case, if you just need to replace the ESC with | then use ReplaceText with Line-by-Line strategy (with either Regex Replace or Literal Replace, one or the either or both should work) to replace \x1b with |
... View more
07-12-2018
09:08 PM
Once you put special characters in the table name, column names, etc. then it can cause problems for the fetch processors. Sometimes we get the table/column names from what is provided by the user in the config, and sometimes we get the table/column names from the result set(s). With special characters or other tweaks this can cause a mismatch when the processor tries to determine the last known max value, etc.
... View more
07-12-2018
08:31 PM
There currently aren't any processors to do incremental insert from a fetch all in one processor. However you could use QueryDatabaseTable or GenerateTableFetch to do the incremental fetch, then PutDatabaseRecord to insert the rows into the target database. Either of the fetch processors, if you specify a Maximum Value Column, will determine the current maximum value for that column, and the next time it runs, it will only fetch rows as your statement is above, where the column value > current maximum. The biggest difference between GenerateTableFetch and QueryDatabaseTable is that GTF generates SQL into the flow files, which then usually get executed with the ExecuteSQL processor. This allows you to distribute the SQL statements among nodes in a NiFi cluster so they can be executed in parallel. QueryDatabaseTable generates and executes the SQL each time, and is designed to run on the primary node only (if in a cluster). GTF figures out the current maximum value by doing its own query for MAX(), where QDT iterates through the rows and keeps the current maximum value while it is processing and outputting the rows.
... View more
07-12-2018
01:59 PM
In this case the problem is that "time" is a reserved word in PostgreSQL, so it needs an explicit "AS" for the alias. If the column were not a reserved word, the processor should work fine (I tested with a timestamp column named "ts" and it works without the AS). I have written NIFI-5419 to handle the improvement of aliases in the DB fetch processors (such as GenerateTableFetch). A workaround is to use QueryDatabaseTable, you can't distribute the SQL among a cluster, but it won't issue that MAX() query that fails.
... View more
07-11-2018
01:21 PM
2 Kudos
It is usually not recommended to store large values in attributes as they are kept in memory which can cause issues for the entire flow. Can you share an example JSON and what you're trying to get as a result? You might be able to use UpdateRecord to create the new fields in-place (i.e. in the flow file contents) rather than having to extract fields into attributes.
... View more
07-10-2018
02:02 AM
You can't do it recursively but you can manually do it if you know the max depth of the nested fields, you just need a similar spec to match each nested field, on the level that it matches. So instead of "*": "&", you'd try to match the nested fields underneath, with a sub-spec pretty much identical to the top-level one.
... View more