Member since
08-01-2021
52
Posts
10
Kudos Received
7
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2940 | 11-18-2022 09:06 AM | |
| 4154 | 11-15-2022 05:46 PM | |
| 2973 | 10-12-2022 03:18 AM | |
| 2134 | 10-11-2022 08:52 AM | |
| 5226 | 10-08-2022 08:23 AM |
10-08-2022
10:15 PM
1 Kudo
The ReplaceText procesor has a prepend/append mode which might be of help
... View more
10-08-2022
08:23 AM
1 Kudo
Hey @Fredi , I believe the answer for your problem is the processor UpdateRecord. Update record allows you to directly manipulate the fields in your file content. You add dynamic properties to the processor where the key of the property is /<field> (so in your case, '/api_value'), and in the value of this dynamic property you can write down some logic to determine what value to insert into api_value. In the processor, there is a field called "Replacement Value Strategy", which defines how the value of the property will be read. If you set this to "Record Path Value", it means you can now give a path to a different field in your file (url_value!) - I can't test this right now because I'm not at my office, but I'm not entirely sure whether you can manipulate the result after giving a record path (to extract the api_value from the evaluated url_value). Regardless, I'm just about 100% sure this can be done with two processors - One EvaluateJsonPath to extract the url_value into an attribute, then UpdateRecord that uses the 'Literal Value' replacement strategy - with this strategy, you can just add a property with key '/api_value' and value '${url_value}' (or whatever attribute name you gave to the extracted url_value) and once you can access url_value with the expression language (via ${url_value}) you can use all the available functions to manipulate expression language variables. Here's an article with a couple of examples on UpdateRecord: https://community.cloudera.com/t5/Community-Articles/Update-the-Contents-of-FlowFile-by-using-UpdateRecord/ta-p/248267 (I noticed in the article they used some recordPath related functions like "replaceRegex", so I believe there might be a way to use these and then limit the entire issue to just one UpdateRecord processor! Sadly I'm not too familiar with these myself and this was the first time I've seen them) And here's the expression language documentation: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html You can see there are lots of useful functions to extract your api_value once you have ${url_value} as an attribute variable, for example "substring", "find"/"replace", "ifElse", etc. all of which you can try and use to ensure only the api_value is left in the end. Hope this helps! I'm sure using ReplaceText and possibly JoltTransform could provide alternate solutions to the issue, however I believe UpdateRecord is the cleanest solution for this and truly makes use of the processor's abilities. If you struggle to use it correctly, you can reply with an example json and expected output and I'll try to write down the flow when I have time.
... View more
10-06-2022
08:33 AM
1 Kudo
Since ReplaceText can make use of regex groups, I believe you could do something along the lines of Match text: (.*) Replace text:<xml> '$1'</xml> $1 allows you to inject the first regex group you match, which in case of the regex above would match the entire file content. I may be wrong about needing to surround it with single quotes but a quick read of the processor's documentation should clear things up. This could be a hefty task if you need to load massive files into memory, however I don't believe your encoded strings should pose a problem. Hope this helps, it's always nice to optimize flows 🙂
... View more
10-06-2022
04:20 AM
I believe a ReplaceText where you just match the entire encoded content and then inject it into an xml already written as the replacement value would be the ideal way to do this.
... View more
10-06-2022
04:12 AM
I've run into similar issues and haven't reached a clear conclusion either. It seems your have very high heap usage which might potentially be relevant.
... View more
04-27-2022
12:29 AM
Hi, I have a flow that receives JSON arrays as input. I would like to validate each of these jsons' schema, however using the ValidateRecord processor doesn't quite seem to do the job. I need to validate things such as certain fields being Enum values, having a max/min length, and ensuring required fields are present (sometimes inside of optional nested jsons). It seems an avro schema does not allow some of these functionalities and as such the Record processors can't quite validate my data as I need it. I would love to hear if anyone has had a similar use case and what they did to solve it. I am considering using the ScriptedValidateRecord processor, however I would prefer to avoid that and might instead opt for using EvaluateJsonPath to extract all the fields I want to validate and then using RouteOnAttribute with the expression language to filter out bad records. If there is a more appropriate way to validate records like this then I'm all ears. Thanks I'm advance!
... View more
Labels:
- Labels:
-
Apache NiFi
10-02-2021
03:56 PM
@yashratan Is it possible your nifi is configured to run the embedded zookeeper despite your trying to connect to your own zookeepers? Check if the nifi.state.management.embedded.zookeeper.start property in your nifi.properties file is set to true. Check if you are able to communicate with all your zookeepers from each of your nodes. This definitely seems like an issue communicating with your ZKs.
... View more
09-08-2021
07:54 AM
1 Kudo
EDIT: @MattWho 's answer made it clear to me I slightly misunderstood the question. His suggestion of managing the entire process using a script is definitely the way to go and would perfectly fit your use-case of someone creating a new instance of an existing process group. If I may add, it sounds like using the nifi registry might benefit you. You could upload your base process group to the registry and use version control with it. Then, when creating new copies, you would instead 'pull' the same process group from the registry instead of creating a copy of process group that exists in your canvas and might unintentionally get changed or deleted. ---------------- I do not believe there is a built-in way to directly run an event when a process group is started/stopped, but a reasonable workaround would be to monitor nifi's app-logs and trigger your own event when you receive a log that mentions starting your PG. If you only want to use tools offered by nifi, you could use the TailFile processor and configure it to run over your app-logs file, then use another processor (such as RouteOnContent) to match the log for starting the PG. From there you can try and do whatever it is that starts the administrative tasks (such as sending an http request with InvokeHTTP processor) If this answer helped, please mark it as 'solved' and/or apply 'kudos' 🙂
... View more
09-06-2021
08:56 AM
It's a bit hard to imagine your flow just from the description, but I think I understood it. What other questions do you have about it? In my opinion it doesn't sound too great adding an attribute to every flowfile after it is written to the DB, only to then write it to a cache which control m will query (if I understood correctly). If your only requirement is to know whether all the files were successfully written to your DB, you should simply ignore files which were successfully inserted and only apply some logic when an insert fails. Perhaps if a file fails you can write it to someplace else so you will be able to investigate why it failed (some place more persistent than a cache). If you just want to be alerted when an insert fails / want to return a response to control m, just add an invokehttp processor after the failure relationship from your putDB processor (if I correctly understood that control m expects http calls). Because nifi is stream oriented, it's hard to tell exactly when a batch of files has finished writing to your DB unless you know exactly how many records should be written (and then counting the flowfiles passed to success is actually reasonable).
... View more
09-04-2021
03:15 PM
In general, Nifi is not very well suited for event-based processing (E.G. an external scheduling tool pinging nifi to start a process group run). I do not know how Control M works, but what you're describing sounds like it could be achieved with Nifi's REST API (you can directly start/stop a specific process group by its ID). The requirement for checking if everything got inserted to your database is also quite hard to accomplish accurately. You could use the REST API once more to check your process group has no queued files (which would mean all your flowfiles successfully passed through the flow), though you'll also have to think about what should happen if writing to the DB fails. I don't believe there is any great way to check if your scheduled run 'completed', but you could definitely use some other processor to 'notify' yourself if something failed. If this answer helped, please mark it as 'solved' and/or apply 'kudos' 🙂.
... View more
- « Previous
- Next »