Member since
08-01-2021
48
Posts
10
Kudos Received
7
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1892 | 11-18-2022 09:06 AM | |
2435 | 11-15-2022 05:46 PM | |
1838 | 10-12-2022 03:18 AM | |
1252 | 10-11-2022 08:52 AM | |
3255 | 10-08-2022 08:23 AM |
09-21-2023
12:42 AM
I am also experiencing this issue when attempting to write data to a redis in cluser mode. Did you find a solution or workaround @sofronic ?
... View more
12-10-2022
06:18 AM
Hey Samuel, Perhaps the source of my own confusion is that I do not know how Apache Benchmark works. Considering only a few hundred requests get dropped (out of hundreds of thousands), this would mean nifi is indeed able to ingest (almost) all the data you send it. As such, I think we can rule out the load balancer and ListenHttpRequest processor. You expressed that you don't believe any Back Pressure is applied in the flow because the queues between processors are configured to contain 20,000 flowfiles at once whilst you only send about 5000 concurrent requests (this also confuses me - 5000 concurrent requests, but at what rate? every second?). The way you described this sounds odd to me - whether a flow has any backpressure is not a function of its configuration and the data ingestion rate. Rather, I expected you would visually examine the flow as you run your benchmark test and check whether at any point in the flow the queues fill up and turn red. Could you verify that this is how you checked for backpressure? If you claim that no queues get filled up in nifi and no requests get dropped, I can't think of an explanation for how the req/s rate changes. No Back Pressure implies the flow is never under duress, and no requests dropped means that all requests that get sent are processed. Again, perhaps my lack of understanding is due to not knowing how Apache Benchmark works. I believe if you'll clarify these points for me we could get much closer to understanding what the source of the issue is 🙂 Regards, Eyal
... View more
12-09-2022
05:17 AM
Hey @F_Amini, Has your issue been resolved? I'd be glad to help if needed.
... View more
12-09-2022
03:16 AM
Hello Samuel, I would like to verify- As you test your flow, do the queues fill up and cause any backpressure? What is unclear to me is whether the bad performance originates from: the flow not running well (queues begin to fill up which then causes up-stream processors to stop getting scheduled, etc.) the initial HandleHttpRequest processor not being capable of handling ingestion (and so the flow looks healthy but requests get dropped before being turned into flowfiles). If the first case is true, I'd like to know where the backpressure in the flow originates from. If the second case is true, I've found this post where a couple of nifi experts advise how to increase http ingestion. As a side thought - is it possible your load balancer can't keep up with the throughput? My regards, Eyal
... View more
11-25-2022
10:32 AM
1 Kudo
So long as backpressure is building up behind a processor, the processor that is previous in line will not be scheduled. This will fall back all the way throughout your flow until it reaches the processor that fetches the data and cause it to stop. As such, all it takes is one weak-link processor to limit your entire flow's throughput. I see the relationship coming out of your HashContent is also red, which implies that there is a different processor further down-flow that is causing backpressure. Can you describe which processor causes this? perhaps fixing its slow performance will improve your entire flow.
... View more
11-25-2022
10:25 AM
That's correct. I believe my earlier tests were failing because I used the "inherit record schema" setting, which obviously wouldn't work on the writer if the schema changes after forking. Here is an avro schema I generated that should describe the output exactly as @Fredi described (generated using this website), however even when using this schema in my record writer, the output still comes out empty. {
"name": "MyClass",
"type": "record",
"namespace": "myNamespace",
"fields": [
{
"name": "Key1",
"type": {
"type": "array",
"items": {
"name": "Key1_record",
"type": "record",
"fields": [
{
"name": "NestedKey1",
"type": "string"
},
{
"name": "NestedKey2",
"type": "string"
},
{
"name": "NestedKey3",
"type": "string"
}
]
}
}
}
]
} I believe at this point the challenge is simply writing an accurate avro schema for the output data.
... View more
11-25-2022
09:34 AM
1 Kudo
there is a single point of the flow where a queue is filling, and backpressure is being applied: a logattribute to register the ips of the incoming requests Have you tried increasing the "Run Duration" property in the scheduling tab of the LogAttribute processor? Sometimes setting this even at 25ms can considerably increase your flow's throughput. I'd recommend playing around with this setting and seeing what works best. 1.UpdateRecord I could not find a freeform text processor. Here is an example of the incoming data in the form of query parameters: TS=20221108-093700&UID=12345&ETID=22&EGID=44&DID=Hello%20World!&HHID=492489&OID=666 What reader could be used to transform free text to json? If you claim to receive the data in the form of query parameters, perhaps you can use one of the following. For starters, query parameters should be added to your received flowfiles as attributes IF you use the HandleHttpRequest and HandleHttpResponse processors (which ideally you should be doing instead of using the limited ListenHTTP processor). All query parameters received by the HandleHttpRequest processor are added as attributes named http.query.param.<parameter name>. If it is only the query parameters you wish to process in your flow, you can now use the AttributesToJSON processor and list all these query param attributes to generate a json more strictly. If you want to process the query params in addition to your request body, I'd like you to give me a more detailed example of input/output and I'll post a better solution. 2. I didn't 100% understand your use-case. Regarding inserting null values (and not the string 'null'), I found this thread in which a solution is offered. Essentially, you first use your shift operation to get rid of keys for which you want to insert null, then you use a default operation and add null as the default value. I created the following spec and it seems to inject null correctly: [
{
"operation":"shift",
"spec":{
"TS":"TimeStamp",
"UID":"UserID",
"OID":"OpCoID",
"ETID":"EventTypeID",
"EGID":"EventGroupID",
"DID":"DeviceID",
"HHID":{
"0":{
"null":"HouseholdID"
},
" ":{
"null":"HouseholdID"
},
"":{
"null":"HouseholdID"
},
"*":{
"@(2,HID)":"HouseholdID"
}
},
"*":"Custom.&"
}
},
{
"operation":"default",
"spec":{
"Custom":{
},
"HouseholdID": null
}
}
] If you only wish to inject nulls when ETID == 11, the transformation will be a bit more complex.. I think it would be easier as a whole to just extract ETID and HHID to attributes and then use expression language with the "ifElse" operation to evaluate what will be shifted into HouseholdId. 3. I'm afraid I didn't completely understand this point either. If the issue is routing to specific relationships with QueryRecord, every dynamic property you add will create a new relationship. In order for a flowfile to pass to the right relationship, you should add a WHERE clause to your sql query that will only match flowfiles that should pass to that specific route/relationship. I'd be glad to help more 🙂! If you still have questions, providing examples (expected input/output) will make it easier for me to offer a solution for the different parts in your flow. My regards, Eyal.
... View more
11-25-2022
07:43 AM
Hi @ripo , I've tried it out in my own environment and it does not seem like ports have an 'ENABLED' state. If a port is DISABLED, you can only update it to be STOPPED. from a STOPPED state, you can either switch the port to RUNNING or back to DISABLED.
... View more
11-25-2022
07:25 AM
Hey @F_Amini , If the data has already been loaded into nifi, I don't believe you can do much other than to increase your resources. I see you're already familar with increasing concurrent tasks (it appears your HashContent processor has an increased number) - perhaps if your entire flow is getting bottlenecked by a single processor, you could temporarily increase its tasks even further (at the cost of other processors getting less).
... View more
11-25-2022
05:11 AM
Hi @Fredi , The processor 'ForkRecord' is exactly what you are looking for. However, I am currently trying to configure it to split exactly as you have described and am having trouble getting it to work... I am attempting to use the 'extract' mode, with a fork path of "/Key1[*]/NestedKey2" which, according to the documentation, is supposed to do exactly what you described (split on nested json). For some reason the output is coming out empty though. Perhaps someone more familiar with the processor could reply and explain how to use it correctly for your use-case.
... View more