Member since
11-16-2015
902
Posts
664
Kudos Received
249
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
148 | 09-30-2025 05:23 AM | |
618 | 06-26-2025 01:21 PM | |
452 | 06-19-2025 02:48 PM | |
697 | 05-30-2025 01:53 PM | |
9713 | 02-22-2024 12:38 PM |
08-22-2018
03:37 PM
2 Kudos
ConvertRecord is mostly for changing data formats, not structure. UpdateRecord is more appropriate, but I don't believe this is currently possible, as the "address" field doesn't exist in the input, and we don't currently update the schema in that case. I've filed NIFI-5524 to cover this improvement. You'll also be able to accomplish this with JoltTransformRecord when NIFI-5353 is implemented. In the meantime you can use ConvertRecord to convert from flat CSV into flat JSON (the writer can inherit the record schema), then use JoltTransformJSON to push the fields into the "address" object, here's a spec that will do that: [
{
"operation": "shift",
"spec": {
"*": {
"address1": "[#2].address.address_address1",
"zipcode": "[#2].address.address_zipcode",
"*": "[#2].&"
}
}
}
]
... View more
08-22-2018
03:20 PM
1 Kudo
In addition to Shu's suggestion, you can use UpdateAttribute with Store State set to "Store state locally". This gives you a little more flexibility over the sequence values: you can set an initial value with the "Stateful Variables Initial Value" property, you can increment by any formula you want by specifying it in the UpdateAttribute user-defined property value using NiFi Expression Language, etc. If you must get the value from an external database sequence, you could use a ScriptedLookupService with LookupRecord, you'd write a script to query the database sequence and return the value as the lookup value. I've written a blog post on how to do this with Groovy and PostgreSQL, but it can be done in any supported scripting language against any database (you have to "bring your own SQL" in the script).
... View more
08-21-2018
02:09 PM
Yes that's correct, otherwise all nodes could grab the same data. If you want to distribute the fetching among the nodes in your cluster, use GenerateTableFetch (on the primary node only still) -> RPG -> Input Port (on your same cluster) -> ExecuteSQL. GTF will not execute the SQL like QueryDatabaseTable does, it just generates the statements to be executed. The RPG -> Input Port distributes the flow files (containing SQL statements) among the nodes in the cluster, then each ExecuteSQL takes the ones it gets and actually does the fetch. Note that you can't merge the results back together on a single node once they've been distributed, but that's usually a good thing, as you can often put the results to a target (such as another DB) in a distributed fashion.
... View more
08-20-2018
03:26 PM
2 Kudos
You can replace everything from SplitAvro -> PutSQL with PutDatabaseRecord, that should give you a pretty good speedup as it takes the Avro directly in, generates the PreparedStatement once, and then does batch inserts for the entire incoming file. As of NiFi 1.6.0 (via NIFI-4836), if you don't care about the maxvalue.* and fragment.count attributes, you can also set the Output Batch Size property, which will send out X flow files immediately when they are ready, versus keeping them all in the session until all rows are processed and then sending all downstream. This allows you to start processing rows downstream while QueryDatabaseTable is still processing rows from the result set, which comes in handy on the initial load of 2M+ records.
... View more
08-20-2018
01:50 PM
1 Kudo
It sounds like you're trying to put the content of your flow file into an attribute in order to use getDelimitedField(). Instead you should keep it as content and use something like ExtractText to pull only the desired field into an attribute. Can you explain more about your use case and incoming data? Are you always getting the same field out?
... View more
08-15-2018
08:10 PM
dcachegroovy.txt Here you go! The command is called "keys", prints each key on its own line.
... View more
08-14-2018
11:37 PM
1 Kudo
The default implementation is to throw UnsupportedOperationException, but many/most/all of the subclasses override it. Check the reference impl, and since you are removing things, also check removeAndGet(), removeByPattern(), and removeByPatternAndGet() to see if they would help, they might save you the trouble of fetching all keys to remove some of them.
... View more
08-14-2018
06:42 PM
This one is only more complex because you want to convert the field names at the second level not the first, so you want to match "address" first, then use the above spec for each field in there, and then also transfer any fields at the top level over as-is (namely "firstname", the spec (which is specific for this example) is: [
{
"operation": "shift",
"spec": {
"address": {
"*-*-*": "&(0,1)_&(0,2)_&(0,3)",
"*-*": "&(0,1)_&(0,2)",
"*": "&"
},
"*": "&"
}
}
]
... View more
08-09-2018
02:45 PM
I just tried the same line of CSV and the same regex and it works fine. Can you share the entire stack trace from the logs? There might be more information as far as where it's failing while being scheduled. Also if you copy/pasted that regex from somewhere, perhaps it has some hidden/unprintable characters, try typing it in by hand instead.
... View more
08-09-2018
02:39 AM
Can you share your ExtractText configuration and possibly some sample input? This error occurs when the processor is scheduled, and all it does when scheduled is try to compile the regular expressions, so I presume there is some error in your regex somewhere.
... View more