Member since
11-16-2015
911
Posts
668
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 694 | 09-30-2025 05:23 AM | |
| 1069 | 06-26-2025 01:21 PM | |
| 928 | 06-19-2025 02:48 PM | |
| 1091 | 05-30-2025 01:53 PM | |
| 12254 | 02-22-2024 12:38 PM |
07-27-2018
06:38 PM
1 Kudo
What version of NiFi are you using? I tried this on the latest snapshot (to become NiFi 1.8.0) and using double-quotes in the Maximum Value Column worked for me (i.e. it does incremental fetching). Looking at the code it seems like it should work in NiFi 1.7.x as well, I'm wondering if there was a bug in your version that we've since fixed.
... View more
07-27-2018
06:15 PM
The column name itself has double quotes?
... View more
07-27-2018
05:53 PM
What issues are you having? That flow description seems like it should work. Perhaps your regular expression or other config of ExtractText needs tweaking?
... View more
07-26-2018
05:46 PM
1 Kudo
Currently the processor(s) do a simple "put" of the value into the attribute whose name is the same as the parameter key. Previous encounters with a parameter of the same name will be overwritten, which is why you're seeing . I have written up an Improvement Jira (NIFI-5467) to let the user choose to allow multiple params with the same key, and create a comma-separated list when multiple params are encountered. As a workaround you can parse the http.query.string using the same technique with which I answered your related question, using a script to parse the string and create an array of values for the multiple-valued param.
... View more
07-26-2018
05:02 PM
1 Kudo
AFAIK, Jolt doesn't have functions for doing transformations of a single value (such as split, extract, etc.). If the "params" field will always contain the same parameters in the same order (at least sn being first, c being second, and 4 p's) then you can extract "params" into an attribute using EvaluateJsonPath (with a JSONPath of $.params), then an update attribute with something like this: If the parameters are arbitrary (such as number of p's, or can be in any order), you might be better off with ExecuteScript for your custom logic. Here's a Groovy script you can use in ExecuteScript to do what you describe above: import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
import groovy.json.*
def flowFile = session.get()
if(!flowFile) return
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
def obj = new JsonSlurper().parseText(text)
def params = obj.params.tokenize('&')
def builder = new groovy.json.JsonBuilder()
builder.call {
'queryType' 'scan'
'dataSource' 'xyz'
'resultFormat' ' list'
'columns' params.findAll {p -> p.tokenize('=')[0] == 'p'}.collect {p -> p.tokenize('=')[1]}
'intervals' ([] << '2018-01-01/2018-02-09')
'filter' {
'type' 'selector'
'dimension' 'sn'
'value' params.find {p -> p.tokenize('=')[0] == 'sn'}.tokenize('=')[1]
}
}
outputStream.write(builder.toPrettyString().getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').tokenize('.')[0]+'_translated.json')
session.transfer(flowFile, ExecuteScript.REL_SUCCESS)
... View more
07-26-2018
02:38 AM
1 Kudo
For PartitionRecord, you'll want a "RecordPath" that points to the field in the schema that holds the table code value. Assuming it's called "tableCode", the RecordPath would be /tableCode
... View more
07-26-2018
02:36 AM
1 Kudo
If all the CSV columns are strings and you have a header line, you could set up a CSVReader that has "Use String Fields From Header" as the access strategy. If they have non-String columns you could use InferAvroSchema, that will try to figure out the fields' types and generate an Avro schema in the "inferred.avro.schema" attribute, which you can use in a CSV Reader/Writer as "Use Schema Text" with "${inferred.avro.schema}" as the value of "Schema Text". If all else fails, you could always create an AvroSchemaRegistry and add each of the 23 table schemas manually, but you can refer to them by name (if the name is the table code, you'll save a step) by using "Use Schema Name" with ${table.code} as the value. The general idea is to enrich/annotate each FlowFile and use the same controller services, processors, and other components by dynamically configuring them with the same properties but using Expression Language to specify the per-file information.
... View more
07-25-2018
06:27 PM
2 Kudos
Currently there is nothing OOTB that will parse Parquet files in NiFi, but I have written NIFI-5455 to cover the addition of a ParquetReader, such that incoming Parquet files may be able to be operated on as other supported formats are. As a workaround, there is a ScriptedReader where you could write your own in Groovy, Javascript, Jython, etc.
... View more
07-25-2018
06:17 PM
1 Kudo
There are a few mechanisms and recent improvements that should allow you to create a more scalable flow. If my comments below don't apply or are a little off, please feel free to elaborate on any/all of them so I can understand your use case better. It appears you have a code as a column in the data, so either your incoming pipe-delimited data is coming in one row at a time, or as a batch with the same value for the "table code", or as a batch with various values for the "table code". If each flow file has a single row, you may want ExtractText (to get the "table code" into an attribute) followed by LookupAttribute to add the "table name" as an attribute from the "table code" value. You should be able to use the same pattern (with perhaps a different regex/config for ExtractText) if each flow file contains multiple rows with the same value for "table code". If you're getting multiple rows with multiple "table code" values, use PartitionRecord to split the flow file into outgoing flow files based on the "table code" value. Then you'll end up with scenario #2 (multiple rows with the same table code value) but the attribute will already be available (see PartitionRecord doc for details). Hopefully (depending on your flow) this will allow you to avoid 23 process groups, and instead have attributes set (and use NiFi Expression Language in the processors' configs) to have a single flow. If you are replicating this data across 4 different targets, and each flow file has (or could be enriched with) enough data to "self-describe" its destination, then you might consider Remote Process Group from any of your flow branches that will end up at the same target processor(s). That way you'd have one Input Port for each of the 4 targets, and any branch sending to target 1 would have an RPG pointing at the Input Port for target 1. If you can collapse your 23 process groups (PGs) into 1 as described above, then you may not even need that, you could still have the 4 separate PGs coming from the single flow.
... View more
07-25-2018
01:03 PM
Without a Maximum Value Column, both GTF and QDT will continue to pull the same data. Without a column for which we could figure out the "last" value, how would they know to pull the rows "since the last time"? Often an increasing ID or a "last updated" timestamp column is used as the Maximum Value Column. In NiFi 1.7.0 (via NIFI-5143) and in the upcoming HDF 3.2 release, you can use the values in a column for GenerateTableFetch (versus the concept of "row number"), but without a Maximum Value Column you still can't do incremental fetch. The alternative would be a Change Data Capture solution (CDC), but these are vendor-specific and currently only CaptureChangeMySQL exists (although there is a proposed addition of a MS SQL Server version).
... View more