Member since
11-16-2015
905
Posts
666
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 511 | 09-30-2025 05:23 AM | |
| 829 | 06-26-2025 01:21 PM | |
| 752 | 06-19-2025 02:48 PM | |
| 935 | 05-30-2025 01:53 PM | |
| 11691 | 02-22-2024 12:38 PM |
07-01-2018
04:30 AM
1 Kudo
In general, QueryDatabaseTable (QDT) must be scheduled at a rate commensurate with the values in the Maximum Value Column. Using a Maximum Value Column of type Date (with no timestamp) means you should only check for new values (i.e. schedule the QDT processor) each day, not every 5 minutes. Otherwise how would the processor know to grab rows with the current day that have come in since the last time it checked? It only looks at the date and (hypothetically) would either grab all the data with that value for the Date, leading to duplicate data, or (as it actually behaves) would look for data with a value of the next day, which it won't find until the following day, also leading to data loss. Possible workarounds are to schedule QDT to run once a day, or add/alter a column to include timestamp and schedule QDT accordingly, or use a different column (possibly via a DB view) that is always increasing for each new row as the Maximum Value Column. Hopefully someday there will also be an Oracle CDC processor (likely leveraging LogMiner) to fetch changes in near-real-time, there may be a way to use QDT and Oracle/LogMiner tables to emulate this capability, if the changes are available in a table with the same qualities (i.e. a Maximum Value Column that is always increasing w.r.t. to the Run Schedule of QDT).
... View more
06-30-2018
04:52 AM
Record-based processors should be writing the "record.count" attribute to outgoing flow files, but PutElasticsearchHttpRecord does not currently. I have written NIFI-5356 to cover this improvement. In the meantime, if you are using other record-based processors upstream, you may find you already have a record.count attribute, and since PutElasticsearchHttpRecord transfers the entire flow file to either success or failure, then if the attribute is there and the flow file is transferred to success, then "record.count" correctly indicates the number of records loaded into ES by that processor (for that flow file).
... View more
06-29-2018
03:06 PM
As of NiFi 1.5.0 (via NIFI-4522), you can issue a SQL query in PutSQL while still retaining the incoming flow file contents. For your case, you could send the CSV file to PutSQL and execute a "CREATE TABLE IF NOT EXISTS" statement, which will create the table the first time but allow the CSV to proceed to the "real" destination processor, likely PutDatabaseRecord.
... View more
06-28-2018
06:44 PM
1 Kudo
HDF 3.1.1.0 was built with HDP Hive 2.6.4 libraries, which are not compatible with HDP Hive 2.6.2 (there were evidently some changes to the Thrift interface that were not backwards compatible. The HDF 3.0.x.y series should be compatible with HDP Hive 2.6.2, I think as of this writing the latest is HDF 3.0.2.6.
... View more
06-20-2018
10:20 PM
2 Kudos
This is a regression (NIFI-4862) introduced in 1.5.0 and will be fixed in 1.7.0 (HDF 3.2).
... View more
06-20-2018
03:41 PM
As of NiFi 1.6.0, there is a processor called RunMongoAggregation that should do what you want. I don't think you'll need the "db.Pdata.aggregate" part, just the JSON query body, possibly as an array (i.e. the Mongo shell may know to treat that list of objects as an array but the NiFi processor may expect a valid JSON array of objects).
... View more
06-20-2018
02:53 PM
2 Kudos
You can use UpdateRecord to add a field to your records, then PutDatabaseRecord to put it into your database. Using the record-aware processors allows you better control over the content (rather than ReplaceText which can be brittle and only supports things like CSV and JSON, not Avro), and you don't need to use the Split/Merge pattern; instead these processors operate on many records in a single flow file, making things more efficient. If the table has already been created and you are trying to insert new rows with an additional column, then (as of NiFi 1.5.0) you might be able to use PutSQL before PutDatabaseRecord to execute a statement like ALTER TABLE ADD COLUMN IF NOT EXISTS or something like that, but it is probably better to issue that statement once on your table externally so you don't need to do that for each flow file coming through NiFi.
... View more
06-19-2018
10:33 PM
You will want to set whatever column has your "LAST MODIFIED" values as the Maximum Value Column in GenerateTableFetch. The first time it will still generate SQL to pull the complete data, but it will also keep track of the maximum observed value from your Maximum Value Column. The next time GenerateTableFetch runs, it will only generate SQL to fetch the rows whose value for LAST MODIFIED is greater than the last observed maximum. If you want the first generation to start at a particular value (for the Maximum Value Column), you can add a user-defined property called "initial.maxvalue.<maxvaluecol>", where "<maxvaluecol>" is the name of the column you specified as the Maximum Value Column. This allows you to "skip ahead", and from then on GenerateTableFetch will continue in normal operation, keeping track of the current maximum and only generating SQL to fetch rows whose values are larger than the current max. If you need a custom query (or, more correctly, you want to add a custom WHERE clause), you can do that by setting the Custom WHERE Clause property of GenerateTableFetch. If you need completely arbitrary queries, then in NiFi 1.7.0 (via NIFI-1706) you can use QueryDatabaseTable to provide arbitrary queries. This capability does not exist for GenerateTableFetch, but we can investigate adding it as an improvement, please feel free to file a Jira for this.
... View more
06-19-2018
06:17 PM
Yes I think that will work. Also if you convert from decimal to a different type then you should be able to use PutHiveStreaming, although that isn't always as performant as it could be. In the upcoming Hive 3 bundle, there is a new Streaming API and PutHive3Streaming should be much faster (and Avro logical types are supported).
... View more
06-19-2018
05:36 PM
I recommend using MergeRecord before the JoltTransformJSON as the Jolt transform should be able to be applied to the whole JSON array (after your smaller JSON objects have been merged). You'll want to use a JsonTreeRecordReader and provide an Avro schema that matches your input data above. mergerecord-example.xml is an example template where I generate data similar to yours, use MergeRecord to bundle them 20 at a time, then run the Jolt spec on it, it includes the associated Avro schema and hopefully all config to get you up and going.
... View more