Member since
11-16-2015
911
Posts
668
Kudos Received
249
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 706 | 09-30-2025 05:23 AM | |
| 1076 | 06-26-2025 01:21 PM | |
| 932 | 06-19-2025 02:48 PM | |
| 1103 | 05-30-2025 01:53 PM | |
| 12290 | 02-22-2024 12:38 PM |
06-30-2017
05:11 PM
@Alvin Jin To answer your question about which processors to use: it depends on what you want to do with the whole CSV file. Your question only mentions splitting and ignoring the header, the CSVReader takes care of that. The record-aware processors in NiFi 1.3.0 include: ConsumeKafkaRecord_0_10: Gets messages from a Kafka topic, bundles into a single flow file instead of one per message ConvertRecord: Converts records from one data format to another (Avro to JSON, e.g.) LookupRecord: Uses fields from a record to lookup a value, which can be added back to the record PartitionRecord: Groups "like" records (based on user-provided criteria) into individual flow files PublishKafkaRecord_0_10: Posts messages to a Kafka topic PutDatabaseRecord: Executes a specified operation (INSERT, UPDATE, DELETE, e.g.) on a database for each record in a flow file PutElasticsearchHttpRecord: Executes a specified operation ("index", e.g.) on an Elasticsearch cluster for each record in a flow file QueryRecord: execute SQL queries on fields from the records. This can be used to filter, aggregate, etc. SplitRecord: Splits records into smaller flow files. Usually only used when downstream processors are not record-aware UpdateRecord: Updates field(s) in each record of a flow file Also I wanted to mention, if for some reason all your CSV columns are strings, you can set "Schema Access Strategy to "Use String Fields From Header", and then you don't need a schema or schema registry. Otherwise if you want to provide a schema, you're not required to use a schema registry, you can just paste your schema into the Schema Text property. and set "Schema Access Strategy" to "Use Schema Text Property".
... View more
06-29-2017
06:54 PM
In addition to @Wynner's answer, if you'd like to keep using ExecuteScript, you can pass in arguments as user-defined properties (aka dynamic properties) or flow file attributes and use them in ExecuteScript. For examples on leveraging user-defined properties in ExecuteScript, check out Part 3 of my ExecuteScript Cookbook article series in HCC, it has examples in Jython.
... View more
06-28-2017
08:32 PM
1 Kudo
You could set the Header Line Count to 0, then send the flowfiles to a RouteOnAttribute processor where you can "skip" the first line by routing on the following Expression Language statement: ${fragment.index:gt(0)} The first line will be routed to "unmatched" and the rest to "matched" or the user-defined property name (depending on the value of the Routing Strategy property). Note that this requires the Line Split Count property be set to 1 in SplitText. Alternatively, if you are using (or can upgrade to) NiFi 1.3.0, you can use a record-aware processor with a CSVReader. This reader can be configured to (among other things) skip the header line. The record-aware processors also offer better performance when working with flow files that contain many "records" (such as a CSV file where each "record" is a row).
... View more
06-28-2017
08:16 PM
4 Kudos
As of NiFi 1.3.0, you can use UpdateRecord for this. If your incoming field name is "createdOn", you can add a user-defined property named "/createdOn" whose value is the following: ${field.value:toDate('yyyy-mm-dd HH:mm:ss.SSS'):toNumber()} Note that you may need to change the type of createdOn from String (in the Reader's schema) to Long (in the Writer's schema).
... View more
06-23-2017
05:23 PM
You can try a thread dump (with jstack or nifi.sh dump) while it is waiting to shut down, you may be able to spot the culprit in the output.
... View more
06-21-2017
06:24 PM
I tested this with Arabic characters in my text field, and it worked fine. You're saying you still get the error when using my suggested lines?
... View more
06-21-2017
06:23 PM
The documentation says "The Expression Language allows single quotes and double quotes to be used interchangeably". Try double-quotes in your EL expression.
... View more
06-21-2017
05:27 PM
1 Kudo
You can accomplish this with ExecuteScript, the following example uses Groovy as the language and Jayway's JsonPath as the library for JSONPath parsing. First I had to download 3 JARs (json-path and its required transitive dependencies) into a directory: Then I set the Module Directory property to the path of this directory: Note that I have also added a dynamic property, whose name will become an attribute and whose value supports Expression Language and (after evaluation) should contain a JSONPath expression used to retrieve the value from the content of the JSON flow file. The Script Body is the following Groovy script: import com.jayway.jsonpath.*
def flowFile = session.get()
if(!flowFile) return
def inputStream = session.read(flowFile)
def json = JsonPath.parse(inputStream)
inputStream.close()
context.properties.findAll {p,s -> p.dynamic}.each {pd, name ->
def prop = context.getProperty(pd)
try {
flowFile = session.putAttribute(flowFile, pd.name, json.read(prop.evaluateAttributeExpressions(flowFile).value))
} catch (e) {
log.error("Error evaluating JSONPath expression in property $name: ${prop?.value} , ignoring...", e)
}
}
session.transfer(flowFile, REL_SUCCESS) I tested this with a GenerateFlowFile: After the ExecuteScript transfers the flow file, it has the desired attribute name/value: This should work with any number of attributes/JSONPaths per flow file. In addition, I have written NIFI-4100 to cover the improvement to the EvaluateJsonPath processor to support Expression Language.
... View more
06-21-2017
04:35 PM
Also, as of NiFi 1.3.0 / HDF 3.0.0, GenerateTableFetch accepts incoming connections/flow files, so you can use ListDatabaseTables -> GenerateTableFetch -> RPG -> Input Port -> ExecuteSQL to fully distribute the fetching of batches of rows across your NiFi cluster. The RPG -> Input Port part is optional and only used on a cluster if you want to fetch rows in parallel.
... View more
06-21-2017
04:33 PM
5 Kudos
The answer in this StackOverflow post refers to documentation saying an array will be returned; this appears to be happening after the JSONPath is evaluated (which is why appending a [0] does not work). The post also implies the answer: in NiFi, you can choose "json" as the Return Type and "flowfile-attribute" as the Destination in EvaluateJsonPath (let's say your dynamic property has key "my.attr" and a value of your JSONPath expression above), then follow that processor with an UpdateAttribute processor, setting "my.attr" to the following: ${my.attr:jsonPath('$[0]')} This will overwrite the original "my.attr" value by hoisting the value out of the array. If you need that value back in the content, you can follow this with a ReplaceText processor to replace the Entire Text with the value of "my.attr".
... View more