Member since
07-30-2019
3471
Posts
1642
Kudos Received
1020
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 146 | 06-03-2026 06:06 PM | |
| 459 | 05-06-2026 09:16 AM | |
| 826 | 05-04-2026 05:20 AM | |
| 495 | 05-01-2026 10:15 AM | |
| 621 | 03-23-2026 05:44 AM |
11-22-2021
05:12 AM
@Ankit13 I would still use Cron scheduling on the PutFile processor, but rather than just having it run once at say hour 7, I'd schedule it to run every second starting at hour 7. That may it starts putting files at hour 7 and continues to put files all the way until 07:59:59. Then it stops executing until the next day. http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html Hope this helps, Matt
... View more
11-18-2021
06:45 AM
@prova Based on timestamp shared, the source is RFC3164 syslog messages in which the timestamp does not include a year. The SyslogReader supports both RFC3164 and RFC5424 syslog messages, but uses a generic syslog schema applied against the source data: {
"type" : "record",
"name" : "nifiRecord",
"namespace" : "org.apache.nifi",
"fields" : [ {
"name" : "priority",
"type" : [ "null", "string" ]
}, {
"name" : "severity",
"type" : [ "null", "string" ]
}, {
"name" : "facility",
"type" : [ "null", "string" ]
}, {
"name" : "version",
"type" : [ "null", "string" ]
}, {
"name" : "timestamp",
"type" : [ "null", "string" ]
}, {
"name" : "hostname",
"type" : [ "null", "string" ]
}, {
"name" : "body",
"type" : [ "null", "string" ]
} ]
} You can see that timestamp is treated as a string. When it comes to reformatting the customer is looking for, where is NiFi expected to extract the year from since int is not in the syslog message? Since schema treats the timestamp as a string, it can't be treated like a timestamp type within the syslog for reformatting.This is possible with RFC5424 formatted source syslog messages. This is not to say that you could not manipulate this date string via some downstream processor, but would still need to figure out where you are going to get the year from. NiFi can't assume that RFC3164 formatted syslog message was produced in same year that NiFi is parsing it. This becomes hard to handle evening via some downstream processor at end of year where NiFi servers may already be in 2022 for example but received RFC3164 syslog messages were produced in 2021. RFC3164 was absolute when RFC5424 was introduced. RFC3164 syslog messages are produced by older systems and the options here are limited. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
11-16-2021
06:13 AM
@Yemre The ability to dynamically fetch secrets/passwords form an external source is not something that exists currently. Doing so would require modification with the every component class that uses sensitive properties. There is some progress in this path however: https://issues.apache.org/jira/browse/NIFI-5481 This new feature handles pulling secrets from an external vault, but is a NiFi core level feature and does not extend in to individual flow component level. I recommend raising an Apache NiFi Jira with your specific request. https://issues.apache.org/jira/projects/NIFI/ If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
11-02-2021
08:23 AM
@AnnaBea Let me make sure I am clear on your ask here: 1. You have successfully split your source file in to 3 parts (header line, body line(s), and footer line). 2. You have successfully modified all three split files as needed. 3. You are having issues re-assembling the three split files back in to one file in order of header, body, footer using MergeRecord processor? With this particular dataflow design, the MergeRecord processor is not likely what you want to use. You probably want to be using the MergeContent processor instead with a "Merge Strategy" of "Defragment". But to get these three source FlowFiles merged in a specific order would require some additional work in your upstream flow. In order to use "Defragment" your three source FlowFiles all would need o have these FlowFile Attributes: fragment.identifier All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute fragment.index A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile fragment.count The number of split FlowFiles generated from the parent FlowFile 1. Add one UpdateAttribute processor before your RouteText and configure it to create the "fragement.identifier" attribute with a value of "${UUID()}" and another Attribute "Fragment.count" with a value of "3". Each FlowFIle produced by RouteText should then have these two attribute set on it. 2. Then add one UpdateAttribute processor to each of teh 3 flow paths to set the "fragment.index" attribute uniquely per each dataflow path. value=1 for header, value=2 for body, and value=3 for footer. 3. Now the MergeContent will have what it needs to bin these three files by the UUID and merge them in the proper order. There are often times many ways to solve the same use case using NiFi components. Some design choices are better than others and use less resources to accomplish the end goal. While above is one solution, there are others I am sure. Cloudera's professional services is a great resource that can help with use case designs. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-27-2021
02:20 PM
@Apoo The EvaluateJsonPath processor dynamic properties do not support NiFi Expression language, so being able to pass dynamic strings to these dynamic properties from FlowFile attributes is not possible. The dynamic properties only support NiFi parameters. You may want to raise an Apache NiFi jira requesting adding NiFi EL support to these dynamic properties or even contribute the the open source code if you so choose. Thank you, Matt
... View more
10-22-2021
07:41 PM
Thanks Matt, It is very helpful
... View more
10-19-2021
02:04 PM
@AA24 The easiest way to accomplish this is to use the PutDistributedMapCache processor in one flow to write the attributes values you want to share to a cache server and on your other flow use the FetchDistributedMapCache processor to retrieve those cached attributes and add them to your other FlowFiles that need them. Another option is to use the MergeContent processor. On flow one where it looks like you are extracting your session_id and job_id you would use the ModfiyBytes processor to zero out the content leaving you with a FlowFile that only has attributes and then use MergeContent to combine this FlowFile with the FlowFile in your second flow. In the MergeContent processor you would configure "Attribute Strategy" to use "Keep All Unique Attributes". If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more
10-19-2021
01:55 PM
1 Kudo
@TRSS_Cloudera It i snot clear to me how you have designed your dataflow to remove all files from source SFTP server except newest file? Assuming state was not an issue (since you said you flow works if you manually clear state), how do you have your flow built? There exists a GetSFTP processor that does not maintain state. So you could have your flow that uses the listSFTP and FetchSFTP to always get the newest "history" file and record that that latest "history" files last modified timestamp in something like a distributedMapCache server. Then have your GetFile run once a day using the "Cron driven" scheduling strategy to get all files (Delete Original= false)in that directory (would get latest history file also) and then get the current stored last modified time from the map cache and then via a RouteOnAttribute send any to FlowFiles where last modified stored is newer then what is on files retrieved by GetFile and finally send to a processor to remove them from source SFTP processor. While above would work in an "ideal" world. You would run in to issues when their was an interruption in the running dataflow causing multiple new files to get listed by the listSFTP processor because you would not know which one end up having its last modified timestamp stored in distributedMapCache. But in such a case the worst case if you have a couple files left lingering until the next run results in just one history file being listed and it goes back to expected. Otherwise, there are script base processor you could use to build you own scripted handling here. To be honest it seems like wasted IO to have NiFi consume these files int NiFi just to auto-terminate them when you could use an ExecuteStreamCommand processor to invoke a script that connects to your SFTP server and simply removes what you do not want without needing to pull anything across the network or write file content to NiFi that you don't need Hopefully this gives you some options to think about. If you found this response assisted with your query, please take a moment to login and click on "Accept as Solution" below this post. Thank you, Matt
... View more