Member since
08-14-2023
19
Posts
4
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1311 | 01-31-2024 02:21 AM |
04-02-2024
08:53 AM
I'm just curious to know. When I start a processor in the NIFI cluster, where does NIFI store the processor state "RUNNING" in this case?
... View more
Labels:
- Labels:
-
Apache NiFi
01-31-2024
02:21 AM
1 Kudo
Upon examination, implementing the queue as a First In First Out prioritizer and configuring the load balancing strategy to Partition by attribute with the kafka.partition attribute has proven effective in maintaining the order.
... View more
01-31-2024
02:17 AM
1 Kudo
I'm trying to use CaptureChangeMySQL for mariadb but I'm getting one record that includes only metadata from the database (the table has 10 records ) {"type":"commit","timestamp":1706628370000,"binlog_filename":"mysql-bin.000001","binlog_position":516,"database":"copy"} I'm wondering if CaptureChangeMySQL can function with MariaDB?
... View more
Labels:
- Labels:
-
Apache NiFi
12-14-2023
12:53 AM
@MattWho Thank you for your comprehensive response. We are using NIFI cluster topology and only two processors ConsumeKafka and PublishKafka. Our topics are configured with 36 partitions ,we have 12 nodes in our NIFI cluster, so we configured the concurrent tasks to 3 . As mentioned, our topics are defined with cleanup police : compact, which requires a Kakfa key for each record. Can we use ConsumeKafkaRecord and PublishKafkaRecord when a Kafka key is involved?
... View more
12-14-2023
12:12 AM
Thank you for your insights @steven-matison ,I will check this out.
... View more
12-13-2023
04:47 AM
Hello , I'm using NIFI to replicate 250 million messages between Kafka topics. The problem is that NIFI replicates messages in a non-sequential order, resulting in the destination topic storing messages differently than the source topic. for example source topic - partition 0 offset:5 key:a value:v1 offset:6 key:a value:v2 offset:7 key:a value:v3 destination topic - partition 0 offset:5 key:a value:v2 offset:6 key:a value:v1 offset:7 key:a value:v3 The topics are configured with a cleanup policy: compact. I'm using ConsumeKafka and PublishKafka processors to replicate topics.
... View more
Labels:
- Labels:
-
Apache NiFi
11-13-2023
05:44 AM
It looks like the high heap on the primary was caused by too many open files,Is there a way to identify the processor that open many files ? We are using openjdk 17.0.7 ,nifi v 1.21.0 and custom processors . @MattWho wrote: @edim2525 GC kicks in around 80% of heap memory usage. You could certainly enable GC debug logging to verify that GC is executing. GC can only clean-up unused memory (Memory no linger being held by a process). I see you have three NiFi nodes. Are you only having Heap memory usage issues on the one node? I see the node with growing heap usage is the elected primary node. What processors do you have running as "primary node" execution? Does your primary node have a lot more queued FlowFiles than the other nodes? If you disconnect the primary node from your cluster which will force a new primary node to be elected, does the heap then start to grow on the new elected primary node? What version of NiFi are you running? What version of Java is your NiFi running with? Have you collected heap dumps and analyzed them to see where the heap usage is being used? Do you have any custom processors added to your NiFi? Are you using any scripting based processors where you have written your own code that is being executed within NiFi? Matt 2023-11-11 04:13:18,240 ERROR [Timer-Driven Process Thread-30] o.a.n.p.kafka.pubsub.PublishKafka_2_6 PublishKafka_2_6[id=c8d6883a-8b07-3d38-b28f-fc8263c83a39] Processing halted: yielding [1 sec] java.lang.IllegalStateException: Cannot complete publishing to Kafka because Publisher Lease was already closed at org.apache.nifi.processors.kafka.pubsub.PublisherLease.complete(PublisherLease.java:476) at org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6.onTrigger(PublishKafka_2_6.java:490) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) 2023-11-11 04:13:18,241 WARN [Timer-Driven Process Thread-30] o.a.n.controller.tasks.ConnectableTask Processing halted: uncaught exception in Component [PublishKafka_2_6[id=c8d6883a-8b07-3d38-b28f-fc8263c83a39]] java.lang.IllegalStateException: Cannot complete publishing to Kafka because Publisher Lease was already closed at org.apache.nifi.processors.kafka.pubsub.PublisherLease.complete(PublisherLease.java:476) at org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6.onTrigger(PublishKafka_2_6.java:490) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1360) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) 2023-11-11 04:13:18,247 ERROR [Listen to Bootstrap] org.apache.nifi.BootstrapListener Failed to process request from Bootstrap due to java.io.IOException: Too many open files java.io.IOException: Too many open files at java.base/sun.nio.ch.Net.accept(Native Method) at java.base/sun.nio.ch.NioSocketImpl.timedAccept(NioSocketImpl.java:711) at java.base/sun.nio.ch.NioSocketImpl.accept(NioSocketImpl.java:752) at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:675) at java.base/java.net.ServerSocket.platformImplAccept(ServerSocket.java:641)
... View more
11-07-2023
05:27 AM
Thank you for your reply, I'll check and update.
... View more
10-03-2023
04:17 AM
Hi , I have NIFI cluster with three nodes . Each node has 16 CPU and 100G ram . Currently, the JVM heap memory on the primary node gradually grows until it reaches full RAM capacity and crashes on OOM. Can you please help me to address this issue . Why does the memory on the primary node keep growing ? Is the GC not supposed to clean the heap ? bootstrap.conf # Java command to use when running NiFi
java=java
# Username to use when running NiFi. This value will be ignored on Windows.
run.as=nifi
# Preserve shell environment while runnning as "run.as" user
preserve.environment=false
# Configure where NiFi's lib and conf directories live
lib.dir=/opt/nifi/lib
conf.dir=/opt/nifi/conf
# How long to wait after telling NiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20
# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
# JVM memory settings
java.arg.2=-Xms40g
java.arg.3=-Xmx40g
# Enable Remote Debugging
java.arg.4=-Djava.net.preferIPv4Stack=true
# allowRestrictedHeaders is required for Cluster/Node communications to work properly
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
# The G1GC is known to cause some problems in Java 8 and earlier, but the issues were addressed in Java 9. If using Java 8 or earlier,
# it is recommended that G1GC not be used, especially in conjunction with the Write Ahead Provenance Repository. However, if using a newer
# version of Java, it can result in better performance without significant "stop-the-world" delays.
#java.arg.13=-XX:+UseG1GC
java.arg.7=-XX:+UseG1GC
java.arg.9=-XX:+UseLargePages
java.arg.10=-XX:+AlwaysPreTouch
java.arg.11=-XX:MetaspaceSize=256m
java.arg.12=-XX:MaxGCPauseMillis=100
java.arg.13=-XX:G1HeapRegionSize=32M
#Set headless mode by default
java.arg.14=-Djava.awt.headless=true The green line is the primary node .
... View more
Labels:
- Labels:
-
Apache NiFi