Member since
03-09-2022
8
Posts
1
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
431 | 03-14-2022 11:21 PM |
04-25-2022
11:43 PM
Hi, I am using mirrormaker 2 to copy data of one topic from source cluster to destination cluster. data is getting copied but I see 3 hours of lag. and I am not able to find the exact cause of it (whether the delay is at consumer side or producer ) as it is using kafka connect framework. Below are the config details test topic having 2 partition and replication factor as 3 number of messages coming into the source topic is 8K per second.
... View more
Labels:
- Labels:
-
Apache Kafka
03-30-2022
11:26 PM
Hi , I have two kafka cluster where I need to copy only the topic configuration from one cluster to other without copying any data. How can we do that. I know using replicator/mirrormaker it can be done but I am not interested in data here.
... View more
Labels:
- Labels:
-
Apache Kafka
03-14-2022
11:21 PM
This is now sorted. Batching using record based processor helped us to improve performance in multifold. didn't know that that the excess of flowfile might result in slowness in NIFI.
... View more
03-14-2022
11:19 PM
Thank you Araujo. This has helped a lot.
... View more
03-14-2022
01:01 AM
Hi, I am facing issue accessing one of the child json attribute while forming SQL in QureyRecord processor. PFB details here is my Avro schema added for JSON reader and writter. { "name": "MyClass", "type": "record", "namespace": "com.acme.avro", "fields": [ { "name": "labels", "type": { "name": "labels", "type": "record", "fields": [ { "name": "__name__", "type": "string" }, { "name": "cucsMemoryUnitInstanceId", "type": "string" }, { "name": "instance", "type": "string" }, { "name": "job", "type": "string" }, { "name": "monitor", "type": "string" }, { "name": "site_identifier", "type": "string" } ] } }, { "name": "name", "type": "string" }, { "name": "timestamp", "type": "string" }, { "name": "value", "type": "string" }, { "name": "producedAt", "type": "string" } ] } now the SQL quary for first level attribute works fine. like SELECT * from FLOWFILE where name = 'xyz' or SELECT * from FLOWFILE where producedAt = 'dd-mm-yyyy' however If I have to access attribute of nested JSON it wont work e.g SELECT * from FLOWFILE where site_identifier = 'xyz' --> This throws the error.
... View more
Labels:
- Labels:
-
Apache NiFi
03-10-2022
09:05 PM
Thanks Matt. The name field is different for each record json object. I will try using partition record with JSONTreeReader and see if I can segregate the flow based on name field and get it processed in batch.
... View more
03-10-2022
12:51 AM
Thanks. But I am now stuck on how to fetch cascaded fields in QueryRecord processor. everytime I use any attribute in sql which is cascaded it throws error saying no column found in table. Below is AVRO Schema. I am not able to access "cucsMemoryUnitInstanceId","site_identifier" but I am able to access "producedAt" "name" which are present at first level { "name": "MyClass", "type": "record", "namespace": "com.acme.avro", "fields": [ { "name": "labels", "type": { "name": "labels", "type": "record", "fields": [ { "name": "__name__", "type": "string" }, { "name": "cucsMemoryUnitInstanceId", "type": "string" }, { "name": "instance", "type": "string" }, { "name": "job", "type": "string" }, { "name": "monitor", "type": "string" }, { "name": "site_identifier", "type": "string" } ] } }, { "name": "name", "type": "string" }, { "name": "timestamp", "type": "string" }, { "name": "value", "type": "string" }, { "name": "producedAt", "type": "string" } ] }
... View more
03-09-2022
03:48 AM
1 Kudo
Hi,
We are trying to fetch the data from one of our kafka cluster (200000 messages per seconds) where we have 19 partition on the topic and using nifi (cluster of 4 hosts) inbuilt processor (ConsumeKafka_2_6) however we are observing delay in processing messages in the processor (linage time of 7 sec per message). We tried using increase concurrent task to match up with the partition as well but it did not work. however if we use demarcation strategy (to club multiple messages into single flowfile) it is working fine but then we need individual messages for data transformation.
The other processor which does the JSON evaluation also have similar problem as it can not handle large incoming data from the cosumekafka processor.
Notes : Backpressure threshold is 100000 file and size threshold is 1 GB
PFB reference SS of the processor configured.
Evaluate Json path config: (Run duration 0.05 ms, concurrent task-4 per node )
... View more
Labels:
- Labels:
-
Apache Kafka
-
Apache NiFi