We're looking for some advice on how to identify possible improvements on our current NiFi flow that is a copy of what we had implemented in Flume, both running on AWS with CDP Private Cloud.
Our use case begins with an HTTPS listener that in its simplest form receives GET requests and processes them and writes events into Kafka.
Between those two ends, we perform some validations and transformations: - we convert the request parameters into JSON format (using 5 processors with AttributesToJSON and ReplaceText) - convert JSON format to rename properties using a JoltTransformJSON - validations using a couple of RouteOnAttribute + ReplaceText to confirm mandatory fields and transform null values - mapping of numeric values into strings using reference data (using UpdateAttribute, RouteOnAttribute and 2 LookupAttribute) - convert JSON into Avro format with ConvertRecord - returning the HTTP response + publishing each individual record to Kafka
In total, we have around 20-25 processors on the critical path and around the same to process any errors along the way and we're able to reach about 3,000 records/second using 3 16-core/16GB heap nodes (sizing recommended by Cloudera).
Before, we were reaching 9,000 records/second in Flume, having 3 identical nodes to process HTTP and validations and 3 more to receive the Avro messages and publish them to Kafka.
We've already tried switching our gp2 disks (3000 IOPS, 125MiB/s) to gp3 (3000 IOPS, 500MiB/s), putting the repositories into different volumes, but none of those translated into gains in performance.
Any ideas on how identify improvements like CPU (that only reaches 60% usage even with 128 threads), heap (currently using 16GB), disks (using what was reported above, but monitoring doesn't show them being stressed), custom processor development for our custom validations and transformations, etc?
There are probably ways to optimize tha amount of processors you use, but as a start I would try and monitor where the slowness comes from. Is there any backpressure in the flow (files queueing up before a processor)? Are you sending requests at the same rate? if so does that mean those 6000 records/second you lost translate to 5xx responses from nifi to the origin service?
In terms of optimizing, perhaps you can transform null values with your initial JoltTransformJson. it sounds like you use a lot of ReplaceText processors too - perhaps UpdateRecord would be of more use for updating your fields. I'd recommend looking at QueryRecord for your validation/routing instead of using UpdateAttribute -> RouteOnAttribute (if applicable and you route based on the json fields).
there is a single point of the flow where a queue is filling, and backpressure is being applied: a logattribute to register the ips of the incoming requests:
Apart from that, there are roughly 5000 flowfiles flowing in the system at a given moment when were processing a steady flow of events, which given the fact that end-to-end latency is approx 1second, seems acceptable.
The scenario in which we are obtaining these results is a development environment, where we are performing stress tests using apache benchmark. So the production rate is steady.
The requests are directed at a load balancer. The failed requests seem to originate from that load balancer, which is filling its queue, and dropping incoming requests when that happens.
1.UpdateRecord I could not find a freeform text processor. Here is an example of the incoming data in the form of query parameters:
when event type id is 11, and the householdid is either 0,"" or " ", we want to change the HHID to null
I could not find a way to change the value to null and cannot evaluate one field and transform another. Is it possible to transform a value to null (the value null and not the string "null")? What is being done wrong in the example below?
We're doing control flow: evaluating if a given field is numeric, then we're seeing whether there was any error, and based on the messageError attribute I'm routing to success and error. With the query we can apply a transformation but cannot decide based on the evaluation of the sql query. Is this interpretation correct in your opinion?
there is a single point of the flow where a queue is filling, and backpressure is being applied: a logattribute to register the ips of the incoming requests
Have you tried increasing the "Run Duration" property in the scheduling tab of the LogAttribute processor?
Sometimes setting this even at 25ms can considerably increase your flow's throughput. I'd recommend playing around with this setting and seeing what works best.
1.UpdateRecord I could not find a freeform text processor. Here is an example of the incoming data in the form of query parameters: TS=20221108-093700&UID=12345&ETID=22&EGID=44&DID=Hello%20World!&HHID=492489&OID=666 What reader could be used to transform free text to json?
If you claim to receive the data in the form of query parameters, perhaps you can use one of the following. For starters, query parameters should be added to your received flowfiles as attributes IF you use the HandleHttpRequest and HandleHttpResponse processors (which ideally you should be doing instead of using the limited ListenHTTP processor). All query parameters received by the HandleHttpRequest processor are added as attributes named http.query.param.<parameter name>. If it is only the query parameters you wish to process in your flow, you can now use the AttributesToJSON processor and list all these query param attributes to generate a json more strictly. If you want to process the query params in addition to your request body, I'd like you to give me a more detailed example of input/output and I'll post a better solution.
2. I didn't 100% understand your use-case. Regarding inserting null values (and not the string 'null'), I found this thread in which a solution is offered. Essentially, you first use your shift operation to get rid of keys for which you want to insert null, then you use a default operation and add null as the default value. I created the following spec and it seems to inject null correctly:
If you only wish to inject nulls when ETID == 11, the transformation will be a bit more complex.. I think it would be easier as a whole to just extract ETID and HHID to attributes and then use expression language with the "ifElse" operation to evaluate what will be shifted into HouseholdId.
3. I'm afraid I didn't completely understand this point either. If the issue is routing to specific relationships with QueryRecord, every dynamic property you add will create a new relationship. In order for a flowfile to pass to the right relationship, you should add a WHERE clause to your sql query that will only match flowfiles that should pass to that specific route/relationship.
I'd be glad to help more 🙂! If you still have questions, providing examples (expected input/output) will make it easier for me to offer a solution for the different parts in your flow.
thank you for taking the time to respond, it has been helpful in learning more about what works in nifi processors.
Increasing the Run Duration of the log processor eliminated the queued flowfiles and that was good.
The baseline throughput of the system is about 5600 req/s before any change.
The installation includes 6 t5.4xlarge ec2 instances (each has 16 threads)
I have been trying to change the flow but also trying to tune the nifi configurations. About the changes to the flow:
1. Parsing query parameters
I used the http.query.param.YYY attributes with the AttributesToJson processor instead of parsing the http query string. This replaced 6 processors by 1 and the performance gain was about 5% (registered aprox 5800 req/s)
2. When http.query.param.ETID = 11 and http.query.param.HHID is in (0,'0','') i affected http.query.param.HHID with a special value that it should not have (it is supposed to contain either integers of numeric characters), and then removed http.query.param.HHID. Then I applied the jolt expression that you shared. Here, because the approach is probably not the best, there was no significant performance gain.
3. I used the queryrecord processor to test if a field is numeric with an sql query, and in that case routed it to the error processing I used a select * from Flowfile with a where clause contrary to the first to route it to non-error processing. Here, the performance gain was also about 5%.
So up to this point I there were about 6,1k events/second, no major breakthrough acchieved that can be attributed to changes in the nifi flow. I consider this far from what i expected.
As to the nifi configurations tuning:
So the first thing i did was changing the provenance repository to in-memory and that had a good impact on the performance. A gain of about 600 req/s.
At this point I had 3-node cluster and added 3 more nodes. The first tests that i did i was well impressed, i got consistently above 7700 req/s. At one point I got about 14200 req/s.
I could never reproduce the results again, as subsequent tests registered a decrease in performance.
I saw some posts that seemed to describe a situation similar to mine. A cluster that had good performance, then got worse results. The tickets indicated that there could be a garbage collector interference, and reported that the initial throughput was acchieved again after restart of the service. This did not happen in my case. The performance degraded.
I am using 5 clients with concurrency level of 1000 simultaneous requests per client and at a particular time there are about 5000 flowfiles being routed. This means that there are several dozen requests or a couple hundred requests per queue at a particular time. So in my opinion there is no backpressure being applied (the queues are configured to start backpressure when 20k requests are queued).
I am not sure about the HandleHttpRequest... it could be... but requests are not being dropped, as during the tests i only see several hundred failed requests in 500 000.
I will try to apply the reccomendations in the topic and will report.
Regarding the load balancer, it is a classic load ballancer in the aws cloud, and offers little configuration apart from security, target groups, etc. At one time it was dropping requests, in the first tests i did. I could not wrap my head around why. But then again, it is not happening now, and it offers limited possibility of configuration.
I went through the response you shared. I am running a cluster with 16 core cpus, so i set the maximum thread pool size to 96.
The HandleHttpRequest and Response both have 16 concurrent tasks. I tried to assign 24 to each of them but got no performance increase so I lowered it again.
Another question that was dealt in the post was the load balancer, which in my case seems to be doing the job of distributing load and not dropping requests.
In summary, I'm puzzled by the low CPU, and disk utilization. However I cannot seem to understand how to make better use of the resources, as when i increase the number of concurrent tasks there is no significant change.
Perhaps the source of my own confusion is that I do not know how Apache Benchmark works.
Considering only a few hundred requests get dropped (out of hundreds of thousands), this would mean nifi is indeed able to ingest (almost) all the data you send it. As such, I think we can rule out the load balancer and ListenHttpRequest processor.
You expressed that you don't believe any Back Pressure is applied in the flow because the queues between processors are configured to contain 20,000 flowfiles at once whilst you only send about 5000 concurrent requests (this also confuses me - 5000 concurrent requests, but at what rate? every second?). The way you described this sounds odd to me - whether a flow has any backpressure is not a function of its configuration and the data ingestion rate. Rather, I expected you would visually examine the flow as you run your benchmark test and check whether at any point in the flow the queues fill up and turn red. Could you verify that this is how you checked for backpressure?
If you claim that no queues get filled up in nifi and no requests get dropped, I can't think of an explanation for how the req/s rate changes. No Back Pressure implies the flow is never under duress, and no requests dropped means that all requests that get sent are processed. Again, perhaps my lack of understanding is due to not knowing how Apache Benchmark works.
I believe if you'll clarify these points for me we could get much closer to understanding what the source of the issue is 🙂
Hello Eyal, thanks once again for your interest in helping out.
I have not expressed myself correctly. I believe there is no backpressure, as I confirmed visually on the nifi UI. On the upper banner I see there are globally about 5000 flowfiles circulating at a particular time. I see some of the queues have 20, 50, 100, 200 flowfiles waiting at an instant. The queue that has more queued flow files is the kafka producer processor, that has about 2000. I believe these quantities are not sufficient to trigger backpressure-caused adjustment of scheduling of tasks.
Apache Benchmark sends concurrent http requests. In this case, i set each of the 5 client instances to send each 1000 requests concurrently.
I have thread pool sized to 96 and assigned 16 concurrent tasks to all of the critical path processors. Despite this, I get a low core load average (between 2 and 7), the disk's IOPS are in the dozens per second (the maximum that the disks allow is 3000). I'm all out of ideas at this point. 😄