Member since
Kudos Received
My Accepted Solutions
Title | Views | Posted |
2583 | 01-31-2022 08:42 AM | |
677 | 11-24-2021 12:11 PM | |
1117 | 11-24-2021 12:05 PM | |
2069 | 10-08-2019 10:00 AM | |
2630 | 10-07-2019 12:08 PM |
12:01 PM
This is a step by step guide to test Kafka clients from a Windows machine that connects to an HDF/HDP environment.
We start with the review of the current Kafka broker listeners. In this case, we will cover the following:
SASL_PLAINTEXT > Kerberized environments
PLAINTEXT > Plain connections
This can be done by using the Ambari console > Kafka > configs > Kafka Broker. After that, search for listeners and make sure either one or both protocols are enabled.
PLAINTEXT security protocol
Go to your Windows machine and download the apache Kafka software.
It is recommended to download the same version that it's running in your HDP/HDF cluster. Select the "Scala 2.12" link to avoid exceptions while running the Kafka clients.
Extract the content of this folder in a preferred location in the Windows host.
While connecting to Kafka through PLAINTEXT listener, Kafka does not have a way to identify you as a user. Hence, add Kafka ACLs and give permissions to ANONYMOUS users. To achieve this run the following command as a Kafka user in one of the Kafka brokers: /usr/hd<p/f>/current/kafka-broker/bin/ --authorizer-properties zookeeper.connect=<zkHost>:<zkPort> --add --allow-principal User:ANONYMOUS --operation All --topic=* --group=* --cluster The above command will give all permissions to the anonymous user in Kafka, change the topic and group to specific ones if required.
In a Kafka host, create a new test topic or use an existing one. To create a new topic, run the following command with the Kafka user: kafka-topics --create --topic <topicName> --partitions <N of partitions> --replication-factor <N of replicas> --zookeeper <zkHost>:<zkPort>
When adding anonymous user permissions, go to our Windows Machine and navigate to the following Kafka folder: Note: This step assumes that we already have connectivity to the brokers and the firewall and DNS (if any) are configured properly. C:\<preferred location>\kafka_<version>\bin\windows
In this folder, there is a list of .bat files, similar to the ones in Linux hosts with .sh extension. In order to run .bat producer, use the following command: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName>
To run a consumer, please run the following command: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning
Run the clients using Kerberos (SASL_PLAINTEXT)
To run the clients using Kerberos (SASL_PLAINTEXT), first ensure that Kerberos is configured properly in the environment. Once you get valid tickets, do the following to connect with the Kafka clients:
If using Kafka Ranger plugin, go to Ranger Admin UI -> Kafka and add a new policy for the user that is used to connect from Windows host pointing to the topic/s that needs access.
After the Ranger policies are configured, then go to the Windows Host and configure the Kerberos details for the Kafka client connection. To achieve this, do the following:
Create a file with extension .conf and add the following content: KafkaClient { required
Client { required
client: is used to connecting to the Zookeeper and KafkaClient is to connect to the Kafka Brokers.
principal: is the user that will be used to connect from Windows to the Kafka Brokers (the same user that we add grants in Ranger UI)
keyTab: is the keytab file that contains the principal specified in "principal".
With that file created, open a Windows Command Prompt and execute the following command before running any command line: set KAFKA_OPTS="" That command will pass the keytab/principal to the Kafka client.
In the same command prompt, run a Kafka Producer/Consumer using the following commands for Kafka versions <= 1.0: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName> --security-protocol SASL_PLAINTEXT For the consumer, use the following command line: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning --security-protocol SASL_PLAINTEXT For Kafka versions > 1.0, use the following producer/consumer command line: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName> --producer-property security.protocol=SASL_PLAINTEXT For Kafka consumer > 1.0, use the following command line C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning
--consumer-property security.protocol=SASL_PLAINTEXT
... View more
09:37 AM
From jaas file I see that the debug=true was added, on the other hand, the debug is not showing up in the producer output, which means that the jaas file provided is not picker up properly. If you check the you'll notice below lines: # check if kafka_jaas.conf in config , only enable client_kerberos_params in secure mode.
KAFKA_HOME="$(dirname $(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ))"
if [ -f $KAFKA_JAAS_CONF ]; then
export KAFKA_CLIENT_KERBEROS_PARAMS="$KAFKA_HOME/config/kafka_client_jaas.conf"
fi Try editing kafka_client_jaas.conf or also you can try to export using KAFKA_CLIENT_KERBEROS_PARAMS and see if that helps. Regards, Manuel.
... View more
12:27 PM
1 Kudo
This community article assumes that we already have CDH 6.x and Kerberos enabled, in case we have to install Kerberos, please use the document below:
1. Install a database
In this case, we are using MySQL:
2. Configure the database for schema registry and SMM
3. Download Schema Registry and SMM parcels
4. Install the Parcels
Install the services in this order:
1. Schema Registry
2. SRM (if no SRM installation, avoid this step)
3. SMM
5. Distribute and activate the parcels.
In Schema registry point “Schema Registry storage connector url” to the mysql hostname. Check “Enable Kerberos Authentication”.
Use the database registry password for “Schema Registry storage connector password”
5.1 For SMM use = cloudera manager host
cm.metrics.password = cloudera manager UI password = kafka (default)
Streams Messaging Manager storage connector url = jdbc:mysql://FQDN_MYHSQL:3306/streamsmsgmgr
Streams Messaging Manager storage connector password = user database password specified
Check “Enable Kerberos Authentication”
6. Add Kafka service
Check "Enable Kerberos Authentication"
7. Configure and access the SMM UI
Property "" must match with the Kafka service name, by default is "kafka"
Create streamsmsgmgr principal in the KDC, example when using MIT KDC
add_principal streammsmmgr
Finally copy the /etc/krb5.conf to your local machine and get a valid kerberos ticket for streammsmmgr user by using "kinit streammsmmgr" and use the same password chosen for the user creation time.
... View more
09:06 AM
@mike_bronson7 Kafka broker needs at least the following number of file descriptors to just track log segment files: (number of partitions)*(partition size / segment size) You can review the current limits configuration under: cat /proc/<kafka_pid>/limits If you want to change them, if you're using ambari console you can go to > Kafka > config > and search for "kafka_user_nofile_limit" Finally, To see open file descriptors, run: lsof -p KAFKA_BROKER_PID
... View more
07:13 AM
1 Kudo
Sometimes when we create our clusters, we use a small amount of Kafka brokers. After some time, there may be a requirement for adding more brokers, usually because of load, performance or high availability. In this article, we provide the considerations and steps to keep in mind after adding new brokers to HDP/HDF clusters.
If there is one Kafka broker in a cluster and plans are to add more brokers for high availability, it's important to mention that Kafka has an internal topic called __consumer_offsets. This topic is created by Kafka internally to store the consumer's committed offsets. At this point, if there is only one broker, Kafka will, by default, create this topic using one single replica and 50 partitions. As a result, it's highly recommended to change the number of replicas for this topic when adding more brokers to the cluster. These numbers are handled by the following properties:
offsets.topic.num.partitions: The number of partitions for the offset commit topic (should not change after deployment).
offsets.topic.replication.factor: The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
One of the considerations after adding new Kafka brokers to a cluster, is that Kafka doesn't have a way to reassign the current existing topics to the new brokers automatically. This has to be done manually using a script that comes with Kafka installation called Kafka reassign partition tool, bin/ In other words, if N topics are already created and assigned to a broker and a replica, after adding more brokers, use the reassign partition tool to increase the number of replicas. The following is a Kafka command-line example to add more partitions:
Topic using 1 partition and 1 replica:
[kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/ --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
After adding 2 new brokers, the topic "testTopic2" will remain exactly the same. To add more replicas and partitions, the following steps need to be performed:
Increase the number of partitions: /usr/hdp/current/kafka-broker/bin/ --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3 Example: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/ --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partition The following should be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/ --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
Increase the number of replicas:
First create a JSON file with the topic that has to be modified: {
} From the above output, specify different orders in the replicas. This is because the first broker id in the list of replicas will be the partition leader. This helps to distribute the partitions among brokers.
Run the following command to apply the changes: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/ --zookeeper c489-node2:2181 --reassignment-json-file /tmp/topic-replication.json --execute
Current partition replica assignment
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
The following will be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/ --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1001,1002,1003
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1003,1001,1002
To modify multiple topics, use the following JSON template:
In the above JSON template, testTopic3 and testTopic4 are modified. To add more, the important thing to note is that the latest "topic" line must not have a comma.
In summary:
Add the new brokers to HDP/HDF using ambari UI.
Kafka doesn't reassign the topics that are created automatically after adding new brokers. Follow the steps previously provided to reassign the already created topics. For new topics, use --replication-factor and --partitions properties.
... View more
06:59 AM
4 Kudos
When we face high CPU issues, the next step is to identify the application and the thread causing the same. This article explains how to identify the thread/s causing high CPU issues.
In order to identify high CPU issues, the first thing we have to do is to identify the PID. This can be done by using a simple "top" command, so if you see a PID constantly using more than 70-100% CPU, then run the following command:
top -H -b -n1 -p <PID>
The command above will show us all the threads associated with that specific PID. The following is an example:
[kafka@c489-node2 conf]$ top -H -b -n1 -p 25910
top - 14:21:54 up 84 days, 12:32, 1 user, load average: 6.79, 6.62, 6.69
Threads: 145 total, 0 running, 145 sleeping, 0 stopped, 0 zombie
%Cpu(s): 18.3 us, 4.0 sy, 0.0 ni, 77.6 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 20971520 total, 16874368 free, 1725732 used, 2371420 buff/cache
KiB Swap: 5242872 total, 5242872 free, 0 used. 17874368 avail Mem
25910 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.04 java
26259 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:02.96 java
26260 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26261 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26262 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26263 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26264 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26265 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26266 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26267 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26268 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26269 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26270 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26271 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26272 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26273 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26274 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26275 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26276 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26277 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26278 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26279 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26280 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26281 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26282 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26283 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.53 java
26284 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26285 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26286 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
From the above output, we have to check %CPU column. Thread ids that you identify using 70% or more are the threads we are looking for. Above command should be taken 5 to 10 times every 5 to 10 seconds during the issue, this will help us to check a pattern in the threads, because sometimes one thread that was using 70%, 10 seconds later goes down to 0 or 10%, as a result, we have to check for the threads constantly using most of the %CPU memory, then if you see that one or more threads are using 70% or more in all the snapshots, then we have identified our high CPU thread. At the same time, we have to take some thread dumps, same timings 5 to 10 seconds, 5 to 10 times each, this will print all PID associated threads.
Thread dumps can be taken as follows:
1. Change to the PID user owner
2. JAVA_HOME/jstack -l <PID>
The example output is very large, on the other hand, each thread is seen as follows:
[kafka@c489-node2 conf]$ /usr/jdk64/jdk1.8.0_112/bin/jstack -l 25910
2020-01-20 14:29:45
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #386 daemon prio=9 os_prio=0 tid=0x00007fd524001000 nid=0xad96 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
more lines here
"kafka-network-thread-1003-ListenerName(PLAINTEXT)-PLAINTEXT-2" #95 prio=5 os_prio=0 tid=0x00007fd629b43000 nid=0x672a runnable [0x00007fd3a23da000]
java.lang.Thread.State: RUNNABLE
at Method)
- locked <0x00000000c94d9e70> (a$3)
- locked <0x00000000c94d9e60> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c94d9d48> (a
Locked ownable synchronizers:
- None
From the above output, you can see that the thread has the following identifier which is a hexadecimal value: nid=0x672a
Then from the previous output "top -H -b -n1 -p <PID>" and after identified our thread or threads using more than 70% constantly, we will get a decimal number, let's say 25910, which is the first in the thread list previously added to this article. We have to change this value from decimal to hexadecimal, for example,
25910 is 6536 in hexadecimal value
After that, we have to search for 6536 un our thread dumps to identify the thread causing the high CPU issue. In some cases you may see that the identifies thread is something like below:
"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007fd62803f800 nid=0x669a runnable
The above thread usually means that the available memory for the application is not enough and more memory is required.
In summary, the steps to identify a high CPU issue are:
Run top command to identify the PID using high CPU.
When PID is identified, run the following command 5 to 10 times every 5 to 10 seconds to identify the threads associated with the PID and their usage top -H -b -n1 -p <PID>, at the same time take some thread dumps, same amount (5 to 10 every 5 to 10 seconds) using jstack -l <pid> with the PID owner.
Finally, when we identify the thread/s consuming most of the CPU, change the value from decimal to hexadecimal and search for those in the thread dumps.
There is a command we can run to automate this process, basically, take the thread dumps and CPU thread output.
for i in {1..10}; do echo $i ; top -H -b -n1 -p <PID> > /tmp/cpu-$(date +%s)-$i.log; JAVA_HOME/jstack -l <PID> > /tmp/jstack-$(date +%s)-$i ;sleep 2; done
Example command with values:
for i in {1..10}; do echo $i ; /usr/jdk64/jdk1.8.0_112/bin/jstack -l 293558 > /tmp/kafka/jstack-$(date +%s)-$i ;sleep 2; done
Also, I have created a simple script to identify the threads causing the issue, feel free to modify the same accordingly:
#Execution: ./ <argument1 = PID> <argument2 = sleep in seconds>
echo "=== Script to identify threads for High CPU issues ==="
read -p "Select the path to store jstacks - i.e /tmp/highCPU: " jPath
read -p "Select the JVM path - i.e /usr/jdk64/jdk1.8.0_112/bin: " JVMPath
for i in {1..10}; do echo "Collecting thread dumps and cpu: " $i "of 10" ; top -H -b -n1 -p $1 > $jPath/cpu-$(date +%s)-$i.log; $JVMPath/jstack -l $1 > $jPath/jstack-$(date +%s)-$i ;sleep $2; done
echo "PID %CPU"
tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1 " " $9} }'
column1=$(tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1} }')
hexValues=$(printf '%x\n' $column1)
echo "Printing HEX values of high usage threads: " $hexValues
... View more
07:01 AM
@Sriram When a record is added to a batch, there is a time limit for sending that batch to ensure it has been sent within a specified duration controlled by Default is 30 seconds, you can try increasing this value and monitor. Can you share processor configurations to get more details?
... View more
07:10 AM
@sampathkumar_ma export KAFA_OPTS should work in this case. Could you please add "debug=true" to the jaas file: KafkaClient { required
}; Share the complete output, we should see something similar to: Debug is true storeKey false useTicketCache true useKeyTab false doNotPrompt false ticketCache is null isInitiator true KeyTab is null refreshKrb5Config is false principal is null tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Acquire TGT from Cache
Principal is kafka/host@EXAMPLE.COM
Commit Succeeded Also along with that you can enable DEBUG under: /etc/kafka/conf/ Change WARN to DEBUG and run the client and share the details.
... View more
09:01 AM
@sampathkumar_ma Could you please check if the user running the command has permissions to get a valid ticket from: "/home/<user>/user.keytab" Also, add "debug=true" in the jaas file to get more details: KafkaClient { required
... View more
06:55 AM
@Adarsh_ks Are you facing the same issue after the changes? kindly share the exception if different from the original. Client { required
}; Make sure that the keytab and principal specified are accessible from the user that is running the job.
... View more