Member since
06-27-2019
147
Posts
9
Kudos Received
11
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 3939 | 01-31-2022 08:42 AM | |
| 1128 | 11-24-2021 12:11 PM | |
| 1984 | 11-24-2021 12:05 PM | |
| 2872 | 10-08-2019 10:00 AM | |
| 3872 | 10-07-2019 12:08 PM |
03-02-2020
12:27 PM
1 Kudo
This community article assumes that we already have CDH 6.x and Kerberos enabled, in case we have to install Kerberos, please use the document below:
https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/cm_sg_intro_kerb.html
1. Install a database
In this case, we are using MySQL:
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-installing_mysql.html
2. Configure the database for schema registry and SMM
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-configuring-schema-registry-metadata-stores-in-mysql.html
3. Download Schema Registry and SMM parcels
SMM https://www.cloudera.com/downloads/cdf/csm.html
CSR https://www.cloudera.com/downloads/cdf/csp.html
4. Install the Parcels
Install the services in this order:
1. Schema Registry
2. SRM (if no SRM installation, avoid this step)
3. SMM
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-get-parcel-csd.html
5. Distribute and activate the parcels.
In Schema registry point “Schema Registry storage connector url” to the mysql hostname. Check “Enable Kerberos Authentication”.
Use the database registry password for “Schema Registry storage connector password”
5.1 For SMM use
cm.metrics.host = cloudera manager host
cm.metrics.password = cloudera manager UI password
cm.metrics.service.name = kafka (default)
Streams Messaging Manager storage connector url = jdbc:mysql://FQDN_MYHSQL:3306/streamsmsgmgr
Streams Messaging Manager storage connector password = user database password specified
Check “Enable Kerberos Authentication”
6. Add Kafka service
Check "Enable Kerberos Authentication"
7. Configure and access the SMM UI
Property "cm.metrics.service.name" must match with the Kafka service name, by default is "kafka"
Create streamsmsgmgr principal in the KDC, example when using MIT KDC
kadmin.local
add_principal streammsmmgr
Finally copy the /etc/krb5.conf to your local machine and get a valid kerberos ticket for streammsmmgr user by using "kinit streammsmmgr" and use the same password chosen for the user creation time.
... View more
01-24-2020
07:13 AM
1 Kudo
Sometimes when we create our clusters, we use a small amount of Kafka brokers. After some time, there may be a requirement for adding more brokers, usually because of load, performance or high availability. In this article, we provide the considerations and steps to keep in mind after adding new brokers to HDP/HDF clusters.
If there is one Kafka broker in a cluster and plans are to add more brokers for high availability, it's important to mention that Kafka has an internal topic called __consumer_offsets. This topic is created by Kafka internally to store the consumer's committed offsets. At this point, if there is only one broker, Kafka will, by default, create this topic using one single replica and 50 partitions. As a result, it's highly recommended to change the number of replicas for this topic when adding more brokers to the cluster. These numbers are handled by the following properties:
offsets.topic.num.partitions: The number of partitions for the offset commit topic (should not change after deployment).
offsets.topic.replication.factor: The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
One of the considerations after adding new Kafka brokers to a cluster, is that Kafka doesn't have a way to reassign the current existing topics to the new brokers automatically. This has to be done manually using a script that comes with Kafka installation called Kafka reassign partition tool, bin/kafka-reassign-partitions.sh. In other words, if N topics are already created and assigned to a broker and a replica, after adding more brokers, use the reassign partition tool to increase the number of replicas. The following is a Kafka command-line example to add more partitions:
Topic using 1 partition and 1 replica:
[kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
After adding 2 new brokers, the topic "testTopic2" will remain exactly the same. To add more replicas and partitions, the following steps need to be performed:
Increase the number of partitions: /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3 Example: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partition The following should be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
Increase the number of replicas:
First create a JSON file with the topic that has to be modified: {
"version":1,
"partitions":[
{"topic":"testTopic2","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic2","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic2","partition":2,"replicas":[1003,1001,1002]}
]
} From the above output, specify different orders in the replicas. This is because the first broker id in the list of replicas will be the partition leader. This helps to distribute the partitions among brokers.
Run the following command to apply the changes: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper c489-node2:2181 --reassignment-json-file /tmp/topic-replication.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"testTopic2","partition":0,"replicas":[1001],"log_dirs":["any"]},{"topic":"testTopic2","partition":2,"replicas":[1003],"log_dirs":["any"]},{"topic":"testTopic2","partition":1,"replicas":[1002],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
The following will be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1001,1002,1003
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1003,1001,1002
To modify multiple topics, use the following JSON template:
{
"version":1,
"partitions":[
{"topic":"testTopic3","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic3","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic3","partition":2,"replicas":[1003,1001,1002]},
{"topic":"testTopic4","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic4","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic4","partition":2,"replicas":[1003,1001,1002]}
]
}
In the above JSON template, testTopic3 and testTopic4 are modified. To add more, the important thing to note is that the latest "topic" line must not have a comma.
In summary:
Add the new brokers to HDP/HDF using ambari UI.
Kafka doesn't reassign the topics that are created automatically after adding new brokers. Follow the steps previously provided to reassign the already created topics. For new topics, use --replication-factor and --partitions properties.
... View more
Labels:
01-20-2020
06:59 AM
4 Kudos
When we face high CPU issues, the next step is to identify the application and the thread causing the same. This article explains how to identify the thread/s causing high CPU issues.
In order to identify high CPU issues, the first thing we have to do is to identify the PID. This can be done by using a simple "top" command, so if you see a PID constantly using more than 70-100% CPU, then run the following command:
top -H -b -n1 -p <PID>
The command above will show us all the threads associated with that specific PID. The following is an example:
[kafka@c489-node2 conf]$ top -H -b -n1 -p 25910
top - 14:21:54 up 84 days, 12:32, 1 user, load average: 6.79, 6.62, 6.69
Threads: 145 total, 0 running, 145 sleeping, 0 stopped, 0 zombie
%Cpu(s): 18.3 us, 4.0 sy, 0.0 ni, 77.6 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 20971520 total, 16874368 free, 1725732 used, 2371420 buff/cache
KiB Swap: 5242872 total, 5242872 free, 0 used. 17874368 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25910 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.04 java
26259 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:02.96 java
26260 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26261 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26262 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26263 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26264 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26265 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26266 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26267 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26268 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26269 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26270 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26271 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26272 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26273 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26274 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26275 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26276 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26277 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26278 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26279 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26280 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26281 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26282 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26283 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.53 java
26284 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26285 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26286 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
From the above output, we have to check %CPU column. Thread ids that you identify using 70% or more are the threads we are looking for. Above command should be taken 5 to 10 times every 5 to 10 seconds during the issue, this will help us to check a pattern in the threads, because sometimes one thread that was using 70%, 10 seconds later goes down to 0 or 10%, as a result, we have to check for the threads constantly using most of the %CPU memory, then if you see that one or more threads are using 70% or more in all the snapshots, then we have identified our high CPU thread. At the same time, we have to take some thread dumps, same timings 5 to 10 seconds, 5 to 10 times each, this will print all PID associated threads.
Thread dumps can be taken as follows:
1. Change to the PID user owner
2. JAVA_HOME/jstack -l <PID>
The example output is very large, on the other hand, each thread is seen as follows:
[kafka@c489-node2 conf]$ /usr/jdk64/jdk1.8.0_112/bin/jstack -l 25910
2020-01-20 14:29:45
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #386 daemon prio=9 os_prio=0 tid=0x00007fd524001000 nid=0xad96 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
....
more lines here
....
"kafka-network-thread-1003-ListenerName(PLAINTEXT)-PLAINTEXT-2" #95 prio=5 os_prio=0 tid=0x00007fd629b43000 nid=0x672a runnable [0x00007fd3a23da000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c94d9e70> (a sun.nio.ch.Util$3)
- locked <0x00000000c94d9e60> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c94d9d48> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:686)
at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
at kafka.network.Processor.poll(SocketServer.scala:665)
at kafka.network.Processor.run(SocketServer.scala:582)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
From the above output, you can see that the thread has the following identifier which is a hexadecimal value: nid=0x672a
Then from the previous output "top -H -b -n1 -p <PID>" and after identified our thread or threads using more than 70% constantly, we will get a decimal number, let's say 25910, which is the first in the thread list previously added to this article. We have to change this value from decimal to hexadecimal, for example,
25910 is 6536 in hexadecimal value
After that, we have to search for 6536 un our thread dumps to identify the thread causing the high CPU issue. In some cases you may see that the identifies thread is something like below:
"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007fd62803f800 nid=0x669a runnable
The above thread usually means that the available memory for the application is not enough and more memory is required.
In summary, the steps to identify a high CPU issue are:
Run top command to identify the PID using high CPU.
When PID is identified, run the following command 5 to 10 times every 5 to 10 seconds to identify the threads associated with the PID and their usage top -H -b -n1 -p <PID>, at the same time take some thread dumps, same amount (5 to 10 every 5 to 10 seconds) using jstack -l <pid> with the PID owner.
Finally, when we identify the thread/s consuming most of the CPU, change the value from decimal to hexadecimal and search for those in the thread dumps.
There is a command we can run to automate this process, basically, take the thread dumps and CPU thread output.
for i in {1..10}; do echo $i ; top -H -b -n1 -p <PID> > /tmp/cpu-$(date +%s)-$i.log; JAVA_HOME/jstack -l <PID> > /tmp/jstack-$(date +%s)-$i ;sleep 2; done
Example command with values:
for i in {1..10}; do echo $i ; /usr/jdk64/jdk1.8.0_112/bin/jstack -l 293558 > /tmp/kafka/jstack-$(date +%s)-$i ;sleep 2; done
Also, I have created a simple script to identify the threads causing the issue, feel free to modify the same accordingly:
#Execution: ./highCpu.sh <argument1 = PID> <argument2 = sleep in seconds>
echo "=== Script to identify threads for High CPU issues ==="
read -p "Select the path to store jstacks - i.e /tmp/highCPU: " jPath
read -p "Select the JVM path - i.e /usr/jdk64/jdk1.8.0_112/bin: " JVMPath
for i in {1..10}; do echo "Collecting thread dumps and cpu: " $i "of 10" ; top -H -b -n1 -p $1 > $jPath/cpu-$(date +%s)-$i.log; $JVMPath/jstack -l $1 > $jPath/jstack-$(date +%s)-$i ;sleep $2; done
echo
echo "PID %CPU"
tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1 " " $9} }'
column1=$(tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1} }')
hexValues=$(printf '%x\n' $column1)
echo
echo "Printing HEX values of high usage threads: " $hexValues
... View more
Labels:
10-15-2019
07:18 AM
Hi @Peruvian81 It's difficult to suggest if there are no details about the cluster usage, but it would be useful to start reviewing the below article that provides Kafka best practices. https://community.cloudera.com/t5/Community-Articles/Kafka-Best-Practices/ta-p/249371 I hope that helps. Regards, Manuel.
... View more
10-08-2019
10:00 AM
@Peruvian81 You can try below flow which is just for testing purposes: Basically I have a tailFile processor passing data through splitText then these messages are sent to PublishKafka_1_0(use this processor for this test), finally I created a consumer to consume data from the same topic configured in PublishKafka_1_0 storing the data in the file system with putFile. In putFile I have configured Maximum File Count to 10, to avoid excessive space usage in the file system.
... View more
10-07-2019
12:08 PM
Hi @shashank_naresh Did you test your connectivity with the sandbox from your host?, if not, you can try below commands: ping sandbox-hdp.hortonworks.com Also, you can test if the port is reachable by using: telnet sandbox-hdp.hortonworks.com 6667 Expected output from telnet: PC:~ mrodriguez$ telnet 172.25.40.164 6667
Trying 172.25.40.164...
Connected to c489-xx.labs.xxxx.xxxx.com.
Escape character is '^]'. Cheers.
... View more
10-07-2019
11:59 AM
@Peruvian81 You can start testing a flow like below: tailFile --> PublishKafka_1_0(2_0 depending on your Kafka version) In publishKafka you can use a configuration example like below: Ensure that the principal has Ranger authorization to publish data to the topic. In Kafka brokers, provide the brokers FQDM, do not use localhost or IPs
... View more
10-03-2019
12:34 PM
Hi @Seaport If you're using ambari, Enable Atlas Hook should take case of that. In addition to that, follow the steps below: cp /usr/hdp/current/atlas-server/conf/atlas-application.properties /etc/hbase/conf get a valid ticket from atlas user export HBASE_CONF_DIR=/usr/hdp/current/hbase-client/conf In ambari > hbase > advanced hbase-site add: hbase.coprocessor.master.classes=org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor, org.apache.atlas.hbase.hook.HBaseAtlasCoprocessor (Restart required). Finally run: /usr/hdp/current/atlas-server/hook-bin/import-hbase.sh
... View more
10-03-2019
12:24 PM
Hi @Peruvian81 Kafka has multiple ways to be secured: SSL Kerberos PLAINTEXT No No SSL Yes No SASL_PLAINTEXT No Yes SASL_SSL Yes Yes If you already are using Kerberos, you can check the document below: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/authentication-with-kerberos/content/kerberos_kafka_configuring_kafka_for_kerberos_using_ambari.html For your clients, you can use below command line depending of the Kafka version: consumer example: bin/kafka-console-consumer.sh --bootstrap-server <kafkaHost>:<kafkaPort> --topic <topicName> --security-protocol SASL_PLAINTEXT For newer versions, consumer example: bin/kafka-console-consumer.sh --topic <topicName> --bootstrap-server <brokerHost>:<brokerPort> --consumer-property security.protocol=SASL_PLAINTEXT * Make sure to get a valid Kerberos ticket before running these commands (kinit -kt keytab principal) ** Ensure the Kerberos principal has permissions to publish/consume data from/to the selected topic
... View more
09-26-2019
07:44 AM
@Peruvian81 You can try below command for the consumer: ./kafka-console-consumer.sh --bootstrap-server w01.s03.hortonweb.com:6667 --topic PruebaNYC --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning If that solves your issue, kindly put this thread as solved. Thanks.
... View more
- « Previous
-
- 1
- 2
- Next »