Member since
07-30-2019
3131
Posts
1564
Kudos Received
909
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
103 | 01-09-2025 11:14 AM | |
652 | 01-03-2025 05:59 AM | |
393 | 12-13-2024 10:58 AM | |
421 | 12-05-2024 06:38 AM | |
356 | 11-22-2024 05:50 AM |
05-25-2016
10:14 PM
1 Kudo
Yes you can use that state directory and just create the zookeeper sub directory in which you will have the myid file. I do recommend that your state directory is instead created somewhere outside of the base NiFi install path. This can aid in simplifying future upgrades of NiFi. Since newer version will still want to reference the existing cluster wide state created in your existing NiFi version. If you do choose to move it form default, update the zookeeper properties file and create the new path.
... View more
05-17-2016
04:06 PM
2 Kudos
Is that the entire log message? Can you share the preceding lines to this stack trace? Marco,
The NoClassDefFoundError you have encountered is most likely caused by the contents of your core-sites.xml file. Check to see if the following line exists and if it does remove it from the file:
“com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec” from “io.compression.codecs” property in “core-site.xml” file. Thanks, Matt
... View more
04-28-2016
09:36 PM
2 Kudos
Understanding your flow will help us understand what is going on.
1. Are you creating a zero byte file that you are using as the trigger for your InvokeHTTP processor?
2. How do you have the invokeHTTP processor configured? (Is it set to Put Response Body In Attribute?)
If Put Response Body In Attribute is set to an attribute value, the content of the Flowfile on the "original" relationship will still have a zero byte content size. NiFi does not support the replay of flowfiles that are zero bytes in size. (A Jira is being entered for this as i see replay of zero byte file scan have a valid use case at times)
If you did not configure "Put Response Body In Attribute" property, a new FlowFile would have been generated where the response becomes the content and the FlowFile is routed to the "response" relationship. NiFi cannot replay files a creation time in the flow. The way replay works, Flowfiles are reinserted on the connection feeding the processor that produced the event. In cases where the processor producing the event actually created the Flowfile, there is no where to reinsert that claim for replay. You should however be able to replay that file at the next processor that produced an provenance event.
If that replay messgae is generated at a later in line processing event, it indicates that the content no longer exist in the content repos archive. Typically this is because the retention duration configured in the nifi.properties file has been exceeded for this content, but it could also be caused by other factors such as Content repo has exceeded the configured allowable disk utilization threshold percentage (also configured in nifi.properties file) or the content was manually deleted from repo (less likely). Queued active data in the flow takes precedence over archive data retention, so if you have a lot of queued data in your flow, you may not have an archived data at all because of the max disk utilization percentage configured for your NiFi.
... View more
04-26-2016
09:21 PM
There are additional items that will need to be taken in to consideration if you are running a NiFi cluster. See the following for more details:
https://community.hortonworks.com/content/kbentry/28180/how-to-configure-hdf-12-to-send-to-and-get-data-fr.html
... View more
04-26-2016
07:28 PM
Can you provide a little more detail on your use case? Where will the URLs you want to use originate from?
... View more
04-18-2016
09:28 PM
4 Kudos
Setting up Hortonworks Dataflow (HDF) to work with kerberized Kafka in Hortonworks Data Platform (HDP) HDF 1.2 does not contain the same Kafka client libraries as the Apache NiFi version. HDF Kafka libraries are specifically designed to work with the Kafka versions supplied with HDP. The following Kafka support matrix breaks down what is supported in each Kafka version: *** (Apache) refers to the Kafka version downloadable from the Apache website. For newer versions of HDF (1.1.2+), NiFi uses
zookeeper to maintain cluster wide state. So the following only applies if this
is a HDF NiFi cluster: 1. If a NiFi cluster has been setup to use a
kerberized external or internal zookeeper for state, every kerberized
connection to any other zookeeper would require using the same keytab and
principal. For example a kerberized embedded zookeeper in NiFi would need
to be configured to use the same client keytab and principal you want to use to
authenticate with a say a Kafka zookeeper. 2. If a NiFi cluster has been setup to use a
non-kerberized zookeeper for state, it cannot then talk to any other zookeeper
that does use kerberos. 3. If a NiFi cluster has been setup to use a kerberized
zookeeper for state, it cannot then communicate with any other non-kerberized
zookeeper. With that being said,
the PutKafka and GetKafka processors do not have properties like the HDFS
processors for keytab and principal. The keytab and principal would be
defined in the same jaas file used if you setup HDF cluster state management.
So before even trying to connect to kerberized Kafka, we need to get NiFi
state management configured to use either an embedded or external kerberized
zookeeper for state. Even if you are not clustered right now, you need to take
the above in to consideration if you plan on upgrading to being a cluster
later: —————————————— NiFi Cluster Kerberized State Management: https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#state_management Lets assume
you followed the above linked procedure to setup your NiFi cluster to create an
embedded zookeeper. At the end of the above procedure you will have made
the following config changes on each of your NiFi Nodes: 1. Created a zookeeper-jaas.conf file On nodes with embedded zookeeper, it will contain
something like this: Server
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true
keyTab="./conf/zookeeper-server.keytab" storeKey=true useTicketCache=false principal="zookeeper/myHost.example.com@EXAMPLE.COM"; }; Client
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true keyTab="./conf/nifi.keytab" storeKey=true useTicketCache=false principal="nifi@EXAMPLE.COM"; }; On Nodes without embedded zookeeper, it will look
something like this: Client
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true keyTab="./conf/nifi.keytab" storeKey=true useTicketCache=false principal="nifi@EXAMPLE.COM"; };
2. Added a config line to the NiFi
bootstrap.conf file: java.arg.15=-Djava.security.auth.login.config=/<path>/zookeeper-jaas.conf
*** the arg number (15 in this case) must
be unused by any other java.arg line in the bootstrap.conf file 3. Added 3 additional properties to the bottom of
the zookeeper.properties file you have configured per the linked procedure
above: authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider jaasLoginRenew=3600000 requireClientAuthScheme=sasl
————————————— Scenario 1 : Kerberized
Kafka setup for NiFI Cluster: So for scenario one, we will assume you are
running on a NiFi cluster that has been setup per the above to use a kerberized
zookeeper for NiFi state management. Now that you have that setup, you have the
foundation in place to add support for connecting to kerberized Kafka brokers
and Kafka zookeepers. The PutKafka processor connects to the
Kafka broker and the GetKafka processor connects to the Kafka zookeepers.
In order to connect to via Kerberos, we will need to do the following: 1. Modify
the zookeeper-jaas.conf file we created when you setup the kerberized state
management stuff above: You will need to add a new section to the
zookeeper-jass.conf file for the Kafka client: If your NiFi node is running an embedded
zookeeper node, your zookeeper-jaas.comf file will contain: Server
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true
keyTab="./conf/zookeeper-server.keytab" storeKey=true useTicketCache=false principal="zookeeper/myHost.example.com@EXAMPLE.COM"; }; Client
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true keyTab="./conf/nifi.keytab" storeKey=true useTicketCache=false principal="nifi@EXAMPLE.COM”; }; KafkaClient
{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka"
useKeyTab=true
keyTab="./conf/nifi.keytab"
principal="nifi@EXAMPLE.COM"; };
*** What is important to note here is that both
the “KafkaClient" and “Client" (used for both embedded zookeeper and
Kafka zookeeper) use the same principal and key tab *** *** The principal and key tab for the “Server”
(Used by the embedded NiFi zookeeper) do not need to be the same used by the
“KafkaClient" and “Client” *** If your NiFi cluster node is not running an
embedded zookeeper node, your zookeeper-jaas.comf file will contain: Client
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true keyTab="./conf/nifi.keytab" storeKey=true useTicketCache=false principal="nifi@EXAMPLE.COM”; }; KafkaClient
{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka"
useKeyTab=true
keyTab="./conf/nifi.keytab"
principal="nifi@EXAMPLE.COM"; };
*** What is important to note here is that
both the KafkaClient and the Client (used for both embedded zookeeper and Kafka
zookeeper) use the same principal and key tab *** 2. Add
additional property to the PutKafka and GetKafka processors: Now all the pieces are in place and we can start
our NiFi(s) and add/modify the PutKafka and GetKafka processors. You will need to add one new property by using
the on each putKafka and getKafka
processors “Properties tab: You will use this same security.protocol
(PLAINTEXTSASL) when intereacting with HDP Kafka versions 0.8.2 and 0.9.0. ———————————— Scenario 2 : Kerberized
Kafka setup for Standalone NiFi instance: For scenario two, a standalone NiFi does not use
zookeeper for state management. So rather then modifying and existing jaas.conf
file, we will need to create one from scratch. The PutKafka processor connects to the
Kafka broker and the GetKafka processor connects to the Kafka zookeepers.
In order to connect to via Kerberos, we will need to do the following: 1. You
will need to create a jaas.conf file somewhere on the server running your NiFi
instance. This file can be named whatever you want, but to avoid
confusion later should you turn your standlone NiFi deployment in to a NiFi
cluster deployment, I recommend continuing to name the file
zookeeper-jaas.conf. You will need to add the following lines to this
zookeeper-jass.conf file that will be used to talk to communicate with the
Kerberized Kafka brokers and Kerberized Kafka zookeeper(s) : Client
{ com.sun.security.auth.module.Krb5LoginModule
required useKeyTab=true keyTab="./conf/nifi.keytab" storeKey=true useTicketCache=false principal="nifi@EXAMPLE.COM”; }; KafkaClient
{
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka"
useKeyTab=true
keyTab="./conf/nifi.keytab"
principal="nifi@EXAMPLE.COM"; };
*** What is important to note here is that
both the KafkaClient and Client configs use the same principal and key tab *** 2. Added a config line
to the NiFi bootstrap.conf file: java.arg.15=-Djava.security.auth.login.config=/<path>/zookeeper-jaas.conf
*** the arg number (15 in this case) must
be unused by any other java.arg line in the bootstrap.conf file 3. Add
additional property to the PutKafka and GetKafka processors: Now all the pieces are in place and we can start
our NiFi(s) and add/modify the PutKafka and GetKafka processors. You will need to add one new property by using
the on each putKafka and getKafka
processors “Properties tab: You will use this same security.protocol
(PLAINTEXTSASL) when interacting with HDP Kafka versions 0.8.2 and 0.9.0. ———————————————————— That should be all you need to get setup and
going…. Let me fill you in on a few configuration
recommendations for your PutKafka and getKafka processors to achieve better
throughputs:
PutKafka: 1. Ignore for now what the documentation says for
the Batch Size property on the PutKafka processor. It is really a measure
of bytes, so jack that baby up from the default 200 to some much larger value. 2. Kafka can be configured to accept larger files
but is much more efficient working with smaller files. The default max
messages size accepted by Kafka is 1 MB, so try to keep the individual messages
smaller then that. Set the Max Record Size property to the max size a
message can be, as configured on your Kafka. Changing this value will not
change what your Kafka can accept, but will prevent NiFi from trying to send
something to big. 3. The Max Buffer Size property should be set to a
value large enough to accommodate the FlowFiles it is being fed. A single
NiFi FlowFile can contain many individual messages and the Message Delimiter
property can be used to split that large FlowFile content into is smaller
messages. The Delimiter could be new line or even a specific string of
characters to denote where one message ends and another begins. 4. Leave the run schedule at 0 sec and you may even
want to give the PutKafka an extra thread (Concurrent tasks)
GetKafka: 1. The Batch Size property on the GetKafka processor
is correct in the documentation and does refer to the number of messages to
batch together when pulled from a Kafka topic. The messages will end up
in a single outputted FlowFile and the configured Message Demarcator (default
new line) will be used to separate messages. 2. When pulling data from a Kafka topic that has
been configured to allow messages larger than 1 MB, you must add an additional
property to the GetKafka processor so it will pull those larger messages (the
processor itself defaults to 1 MB). Add fetch.message.max.bytes and
configure it to match the max allowed message size set on Kafka for the topic. 3. When using the GetKafka processor on a Standalone
instance of NiFi, the number of concurrent tasks should match the number of
partitions on the Kafka topic. This is not the case (dispite what the bulletin
tell you when it is started) when the GetKafka processor is running on a NIFi
cluster. Lets say you have 3 node NiFi cluster. Each Node in
the cluster will pull from a different partition at the same time. So if the
topic only has 3 partitions you will want to leave concurrent tasks at 1
(indicates 1 thread per NiFi node). If the topic has 6 partitions, set
concurrent tasks to 2. Let say the topic has 4 partitions, I would still
use one concurrent task. NiFi will still pull from all partitions, the
addition partition will be included in a Round Robin fashion. If you were
to set the same number of concurrent tasks as partitions in a NiFi cluster, you
will end up with only one Node pulling from every partition while your other
nodes sit idle. 4. Set your run schedule 500 ms to reduce excessive
CPU utilization.
... View more
03-22-2016
03:38 PM
2 Kudos
Bulletins are intended to be short lived within the UI. The same error messages are also being reported to the nifi-app.log where the length of time they are preserved is based your configuration of the NiFi instance's logback.xml file. There should be no difference between the detail in the bulletin and the detail in the nifi-app.log.
... View more
03-22-2016
02:27 PM
There have also been many improvements to the underlying code for the Kafka processors in newer releases of NiFi. I recommend upgrading.
... View more
03-22-2016
02:25 PM
5 Kudos
This ERROR messages is informing you that the configured buffer in your putKafka processor was not large enough to accommodate the batch of files it wanted to transfer to Kafka. So the log above shows that a batch of 3 files was created, 2 of the files from that batch transferred successfully, and 1 file was routed to the putKafka's failure relationship. The total size of the batch was recorded as 4294967296 (4GB). These are very large files for Kafka... The Failure relationship should be looped back on to the putKafka processor so after a short penalization, the failed file will get re-transmitted. There are 4 settings at play here in the putKafka processor you will want to play around with.
Max Buffer Size: <-- max amount of reserved buffer space Max Record Size: <-- max size of any one record Batch Size: <-- max number of records to batch Queue Buffering Max Time: <--- max amount of time spent on batching before transmitting. *** The batch will be transmitted when either the Batch Size is satisfied or Queue Buffering Max time is reached. Considering the size of the messages you are trying to send to your Kafka topic, I would recommend the following settings:
Max Buffer Size: 2 GB Max Record Size: 2 GB Batch Size: 1 Queue Buffering Max Time: 100 ms
Since you will be sending one file at a time, you may want to increase the number of Concurrent Tasks configured on the "Scheduling" tab of the putKafka processor. Only do this if the processor can not keep up with the flow of data. So start with the default of 1 and increase by only 1 at a time if needed. Keep in mind that the buffered records live in your JVM heap, so the the more concurrent tasks and the larger the Max Buffer Size configuration, the more heap this processor will use.
Thanks, Matt
... View more
03-16-2016
10:13 PM
The NCM in a NiFi cluster typically needs more heap memory. The number of components (processors, input ports, output ports and relationships) x the number of nodes in the NiFi cluster on the graph will drive how much memory your NCM will need. For ~300 - 400 components and 3 - 4 node cluster, the NCM seems pretty good with 8GB of heap. If you encounter heap issue still, you would need to increase the heap size and/or reduce the stat buffer size and/or frequency in the nifi.properties files (NCM and Nodes). nifi.components.status.repository.buffer.size=360 (defaults is 1440) nifi.components.status.snapshot.frequency=5 min (default is 1) This information is accurate as of NiFi 0.5.1 and HDF 1.1.2.
... View more