Member since
11-16-2015
905
Posts
666
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 511 | 09-30-2025 05:23 AM | |
| 843 | 06-26-2025 01:21 PM | |
| 759 | 06-19-2025 02:48 PM | |
| 935 | 05-30-2025 01:53 PM | |
| 11696 | 02-22-2024 12:38 PM |
07-26-2018
02:36 AM
1 Kudo
If all the CSV columns are strings and you have a header line, you could set up a CSVReader that has "Use String Fields From Header" as the access strategy. If they have non-String columns you could use InferAvroSchema, that will try to figure out the fields' types and generate an Avro schema in the "inferred.avro.schema" attribute, which you can use in a CSV Reader/Writer as "Use Schema Text" with "${inferred.avro.schema}" as the value of "Schema Text". If all else fails, you could always create an AvroSchemaRegistry and add each of the 23 table schemas manually, but you can refer to them by name (if the name is the table code, you'll save a step) by using "Use Schema Name" with ${table.code} as the value. The general idea is to enrich/annotate each FlowFile and use the same controller services, processors, and other components by dynamically configuring them with the same properties but using Expression Language to specify the per-file information.
... View more
07-25-2018
06:27 PM
2 Kudos
Currently there is nothing OOTB that will parse Parquet files in NiFi, but I have written NIFI-5455 to cover the addition of a ParquetReader, such that incoming Parquet files may be able to be operated on as other supported formats are. As a workaround, there is a ScriptedReader where you could write your own in Groovy, Javascript, Jython, etc.
... View more
07-25-2018
06:17 PM
1 Kudo
There are a few mechanisms and recent improvements that should allow you to create a more scalable flow. If my comments below don't apply or are a little off, please feel free to elaborate on any/all of them so I can understand your use case better. It appears you have a code as a column in the data, so either your incoming pipe-delimited data is coming in one row at a time, or as a batch with the same value for the "table code", or as a batch with various values for the "table code". If each flow file has a single row, you may want ExtractText (to get the "table code" into an attribute) followed by LookupAttribute to add the "table name" as an attribute from the "table code" value. You should be able to use the same pattern (with perhaps a different regex/config for ExtractText) if each flow file contains multiple rows with the same value for "table code". If you're getting multiple rows with multiple "table code" values, use PartitionRecord to split the flow file into outgoing flow files based on the "table code" value. Then you'll end up with scenario #2 (multiple rows with the same table code value) but the attribute will already be available (see PartitionRecord doc for details). Hopefully (depending on your flow) this will allow you to avoid 23 process groups, and instead have attributes set (and use NiFi Expression Language in the processors' configs) to have a single flow. If you are replicating this data across 4 different targets, and each flow file has (or could be enriched with) enough data to "self-describe" its destination, then you might consider Remote Process Group from any of your flow branches that will end up at the same target processor(s). That way you'd have one Input Port for each of the 4 targets, and any branch sending to target 1 would have an RPG pointing at the Input Port for target 1. If you can collapse your 23 process groups (PGs) into 1 as described above, then you may not even need that, you could still have the 4 separate PGs coming from the single flow.
... View more
07-25-2018
01:03 PM
Without a Maximum Value Column, both GTF and QDT will continue to pull the same data. Without a column for which we could figure out the "last" value, how would they know to pull the rows "since the last time"? Often an increasing ID or a "last updated" timestamp column is used as the Maximum Value Column. In NiFi 1.7.0 (via NIFI-5143) and in the upcoming HDF 3.2 release, you can use the values in a column for GenerateTableFetch (versus the concept of "row number"), but without a Maximum Value Column you still can't do incremental fetch. The alternative would be a Change Data Capture solution (CDC), but these are vendor-specific and currently only CaptureChangeMySQL exists (although there is a proposed addition of a MS SQL Server version).
... View more
07-24-2018
09:57 PM
I'm not sure what you meant about changing the "-" to "_" with InferAvroSchema and such, but here's a different approach (assuming you have HDF 3.1 / Apache NiFi 1.5.0+) so you can use the "correct" Avro schema even though the field names have Avro-invalid characters: Create an AvroSchemaRegistry controller service, and add a property called "mySchema" or whatever you want to call it, with the original schema as the value. Then set the "Validate Field Names" property to false (this was added in NiFi 1.5.0 via NIFI-4612), this will allow field names such as "general-1" without throwing an error. Then in your JsonTreeReader you can have an access strategy of "Use Schema Name", specifying your AvroSchemaRegistry in the "Schema Registry" property and "mySchema" as the value for the "Schema Name" property. The JSON writer can Inherit Schema so you don't need to put the schema in there either. When the schema comes from the schema registry with Validate Field Names set to false, you can use it even when the field names are not Avro-valid.
... View more
07-24-2018
04:12 PM
2 Kudos
Try ListDatabaseTables -> GenerateTableFetch -> RemoteProcessGroup -> Input Port -> ExecuteSQL in place of QueryDatabaseTable. ListDatabaseTables will give you an empty flow file per table, with attributes set on the flow file such as table name. QueryDatabaseTable (QDT) doesn't allow incoming connections, but GenerateTableFetch (GTF) does. The biggest difference between QDT and GTF is that QDT generates and executes the SQL statements, whereas GTF just generates the SQL statements. This allows you to send to a Remote Process Group that points back to an Input Port on your NiFi cluster/instance. The RPG->Input Port pattern distributes the flow files among the nodes in your cluster. If you have a standalone NiFi instance, you won't need the RPG->Input Port part, but note that since there is only one node, the tables will not be fetched in parallel (but perhaps concurrently if you set Max Concurrent Tasks for ExecuteSQL).
... View more
07-23-2018
12:49 PM
1 Kudo
You can use a JOIN clause in the select statement, but it will only work for a single RDBMS. You may find you can join two tables from two databases/schemas in the same RDBMS (if that system lets you), but you can't currently join two tables from totally separate database systems. You could investigate Presto, it allows for joining of tables across multiple systems, and you could have a single connection to it from NiFi in ExecuteSQL. That way it will look like a single RDBMS to NiFi, but Presto can be configured to do the cross-DB join.
... View more
07-12-2018
11:25 PM
1 Kudo
Note that since your format is not JSON nor JSON-per-line, you will have to do further processing before using any processors (record-based or not) that handle JSON. As of NiFi 1.7.0 (via NIFI-4456) the JsonTreeReader (and writer) allow for JSON-per-line, but your format is not exactly that either. If the existing processors or controller services (i.e. readers/writers) don't work, you might have to resort to a ScriptedRecordReader/Writer or a scripting processor to do custom handling.
... View more
07-12-2018
11:23 PM
1 Kudo
For that case, if you just need to replace the ESC with | then use ReplaceText with Line-by-Line strategy (with either Regex Replace or Literal Replace, one or the either or both should work) to replace \x1b with |
... View more
07-12-2018
09:08 PM
Once you put special characters in the table name, column names, etc. then it can cause problems for the fetch processors. Sometimes we get the table/column names from what is provided by the user in the config, and sometimes we get the table/column names from the result set(s). With special characters or other tweaks this can cause a mismatch when the processor tries to determine the last known max value, etc.
... View more