Member since
08-01-2021
48
Posts
10
Kudos Received
7
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1794 | 11-18-2022 09:06 AM | |
2266 | 11-15-2022 05:46 PM | |
1734 | 10-12-2022 03:18 AM | |
1175 | 10-11-2022 08:52 AM | |
3091 | 10-08-2022 08:23 AM |
10-14-2022
02:07 AM
thanks ,Green! your idea helped me solve the problem
... View more
10-10-2022
10:43 PM
Perhaps the connection you set between PutHDFS and UpdateHive3Table doesn't send the original file? I am a bit confused by your flow in general.. why convert to avro? Where are you reading files from? Why do you PutHDFS and then UpdateHiveTable instead of just using PutHiveQL?
... View more
10-10-2022
10:36 PM
@morti To add to this, this is essentially (if not just very similar) to nifi's Record API where Record processors require a RecordReader/Writer controller service where the schema for the incoming/outgoing files is defined. All these processors can get their schema from some registry, or have it configured hard-coded, or try to infer the schema, or simply rely on the schema that was given to the flowfile earlier in the flow. I think it's worth looking into Records in nifi, they're designed specifically for well-defined data and use-cases similar to what your described
... View more
10-08-2022
10:15 PM
1 Kudo
The ReplaceText procesor has a prepend/append mode which might be of help
... View more
10-08-2022
08:23 AM
1 Kudo
Hey @Fredi , I believe the answer for your problem is the processor UpdateRecord. Update record allows you to directly manipulate the fields in your file content. You add dynamic properties to the processor where the key of the property is /<field> (so in your case, '/api_value'), and in the value of this dynamic property you can write down some logic to determine what value to insert into api_value. In the processor, there is a field called "Replacement Value Strategy", which defines how the value of the property will be read. If you set this to "Record Path Value", it means you can now give a path to a different field in your file (url_value!) - I can't test this right now because I'm not at my office, but I'm not entirely sure whether you can manipulate the result after giving a record path (to extract the api_value from the evaluated url_value). Regardless, I'm just about 100% sure this can be done with two processors - One EvaluateJsonPath to extract the url_value into an attribute, then UpdateRecord that uses the 'Literal Value' replacement strategy - with this strategy, you can just add a property with key '/api_value' and value '${url_value}' (or whatever attribute name you gave to the extracted url_value) and once you can access url_value with the expression language (via ${url_value}) you can use all the available functions to manipulate expression language variables. Here's an article with a couple of examples on UpdateRecord: https://community.cloudera.com/t5/Community-Articles/Update-the-Contents-of-FlowFile-by-using-UpdateRecord/ta-p/248267 (I noticed in the article they used some recordPath related functions like "replaceRegex", so I believe there might be a way to use these and then limit the entire issue to just one UpdateRecord processor! Sadly I'm not too familiar with these myself and this was the first time I've seen them) And here's the expression language documentation: https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html You can see there are lots of useful functions to extract your api_value once you have ${url_value} as an attribute variable, for example "substring", "find"/"replace", "ifElse", etc. all of which you can try and use to ensure only the api_value is left in the end. Hope this helps! I'm sure using ReplaceText and possibly JoltTransform could provide alternate solutions to the issue, however I believe UpdateRecord is the cleanest solution for this and truly makes use of the processor's abilities. If you struggle to use it correctly, you can reply with an example json and expected output and I'll try to write down the flow when I have time.
... View more
10-06-2022
11:42 AM
Brilliant! That does the trick. Thanks!
... View more
10-06-2022
05:11 AM
@Green_ wrote: I've run into similar issues and haven't reached a clear conclusion either. It seems your have very high heap usage which might potentially be relevant. Once we stop nifi then only it is reducing..there is no data in queue but it is in high heap only.. unable to understanding why it is not reducing automatically..
... View more
10-06-2022
04:04 AM
Hello, I would like to monitor many clusters' status data. I have gone with the method of adding a SiteToSiteStatusReportingTask to each of my clusters and sending all the statuses to a single remote input port in a dedicated cluster. The issue I've encountered is that I couldn't find a reasonable way to aggregate statuses from different nodes to get a clearer picture of a component's status. For example, in a 4 node cluster I've got a process group with 250 files queued on each node, 1000 in total. The reporting task on this cluster would send 4 separate flowfiles to my dedicated cluster, each describing the 250 metric. All of these flowfiles have attributes describing their origin reporting task, however (perhaps for valid reasons) there is no unique identifier describing that all 4 of these flowfiles originated from a specific run of the reporting task. As such, I cannot aggregate these 4 files with a sum and get a more accurate number similar to what I can see in the UI when looking at the aforementioned process group. Since my clusters have varying numbers of nodes I cannot attempt to merge 4 records specifically and I do not want to create duplicate flows for differing node counts. In comparison, when using the REST API to GET a process group, there is a field named "aggregate Snapshot" which accurately calculates the sum of all nodes - this is what I am interested in. Overall I would like to use the reporting tasks so I remain in a data-push method rather than data-pull by sampling all my clusters' rest API. If anyone has any recommendations for how to do this they would be very much appreciated. Thanks in advance, Eyal
... View more
Labels:
- Labels:
-
Apache NiFi
07-21-2022
02:50 AM
Hello, I have a MergeRecord processor that is not merging despite my conditions (with the exception of bin age). I have configured: - 1 minimum record - 2000 maximum records - 1 MB minimum size - no size limit - 10 bins - 30 min max age - bin packing strategy, no correlation attribute I have a 3 node cluster, and the queue before the MergeRecord processor has got millions of files before it that reach a couple Gigabytes in size. I can see the processor keeps opening/closing tasks (by the thread count icon on it), but no files are getting merged and outputted, except for the bin age being reached. I believe all the minimum merge requirements are met, and even the max record limit should be reached, yet the processor isn't working as I understand. I would appreciate any help debugging why it is not merging as expecteded. If it is relevant, I use a json reader and parquet writer. Thanks, Eyal.
... View more
Labels:
- Labels:
-
Apache NiFi
- « Previous
-
- 1
- 2
- Next »