Member since
06-27-2019
147
Posts
9
Kudos Received
11
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 4016 | 01-31-2022 08:42 AM | |
| 1150 | 11-24-2021 12:11 PM | |
| 2019 | 11-24-2021 12:05 PM | |
| 2917 | 10-08-2019 10:00 AM | |
| 3908 | 10-07-2019 12:08 PM |
03-02-2020
12:27 PM
1 Kudo
This community article assumes that we already have CDH 6.x and Kerberos enabled, in case we have to install Kerberos, please use the document below:
https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/cm_sg_intro_kerb.html
1. Install a database
In this case, we are using MySQL:
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-installing_mysql.html
2. Configure the database for schema registry and SMM
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-configuring-schema-registry-metadata-stores-in-mysql.html
3. Download Schema Registry and SMM parcels
SMM https://www.cloudera.com/downloads/cdf/csm.html
CSR https://www.cloudera.com/downloads/cdf/csp.html
4. Install the Parcels
Install the services in this order:
1. Schema Registry
2. SRM (if no SRM installation, avoid this step)
3. SMM
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-get-parcel-csd.html
5. Distribute and activate the parcels.
In Schema registry point “Schema Registry storage connector url” to the mysql hostname. Check “Enable Kerberos Authentication”.
Use the database registry password for “Schema Registry storage connector password”
5.1 For SMM use
cm.metrics.host = cloudera manager host
cm.metrics.password = cloudera manager UI password
cm.metrics.service.name = kafka (default)
Streams Messaging Manager storage connector url = jdbc:mysql://FQDN_MYHSQL:3306/streamsmsgmgr
Streams Messaging Manager storage connector password = user database password specified
Check “Enable Kerberos Authentication”
6. Add Kafka service
Check "Enable Kerberos Authentication"
7. Configure and access the SMM UI
Property "cm.metrics.service.name" must match with the Kafka service name, by default is "kafka"
Create streamsmsgmgr principal in the KDC, example when using MIT KDC
kadmin.local
add_principal streammsmmgr
Finally copy the /etc/krb5.conf to your local machine and get a valid kerberos ticket for streammsmmgr user by using "kinit streammsmmgr" and use the same password chosen for the user creation time.
... View more
01-24-2020
07:13 AM
1 Kudo
Sometimes when we create our clusters, we use a small amount of Kafka brokers. After some time, there may be a requirement for adding more brokers, usually because of load, performance or high availability. In this article, we provide the considerations and steps to keep in mind after adding new brokers to HDP/HDF clusters.
If there is one Kafka broker in a cluster and plans are to add more brokers for high availability, it's important to mention that Kafka has an internal topic called __consumer_offsets. This topic is created by Kafka internally to store the consumer's committed offsets. At this point, if there is only one broker, Kafka will, by default, create this topic using one single replica and 50 partitions. As a result, it's highly recommended to change the number of replicas for this topic when adding more brokers to the cluster. These numbers are handled by the following properties:
offsets.topic.num.partitions: The number of partitions for the offset commit topic (should not change after deployment).
offsets.topic.replication.factor: The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
One of the considerations after adding new Kafka brokers to a cluster, is that Kafka doesn't have a way to reassign the current existing topics to the new brokers automatically. This has to be done manually using a script that comes with Kafka installation called Kafka reassign partition tool, bin/kafka-reassign-partitions.sh. In other words, if N topics are already created and assigned to a broker and a replica, after adding more brokers, use the reassign partition tool to increase the number of replicas. The following is a Kafka command-line example to add more partitions:
Topic using 1 partition and 1 replica:
[kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
After adding 2 new brokers, the topic "testTopic2" will remain exactly the same. To add more replicas and partitions, the following steps need to be performed:
Increase the number of partitions: /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3 Example: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partition The following should be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
Increase the number of replicas:
First create a JSON file with the topic that has to be modified: {
"version":1,
"partitions":[
{"topic":"testTopic2","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic2","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic2","partition":2,"replicas":[1003,1001,1002]}
]
} From the above output, specify different orders in the replicas. This is because the first broker id in the list of replicas will be the partition leader. This helps to distribute the partitions among brokers.
Run the following command to apply the changes: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper c489-node2:2181 --reassignment-json-file /tmp/topic-replication.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"testTopic2","partition":0,"replicas":[1001],"log_dirs":["any"]},{"topic":"testTopic2","partition":2,"replicas":[1003],"log_dirs":["any"]},{"topic":"testTopic2","partition":1,"replicas":[1002],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
The following will be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1001,1002,1003
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1003,1001,1002
To modify multiple topics, use the following JSON template:
{
"version":1,
"partitions":[
{"topic":"testTopic3","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic3","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic3","partition":2,"replicas":[1003,1001,1002]},
{"topic":"testTopic4","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic4","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic4","partition":2,"replicas":[1003,1001,1002]}
]
}
In the above JSON template, testTopic3 and testTopic4 are modified. To add more, the important thing to note is that the latest "topic" line must not have a comma.
In summary:
Add the new brokers to HDP/HDF using ambari UI.
Kafka doesn't reassign the topics that are created automatically after adding new brokers. Follow the steps previously provided to reassign the already created topics. For new topics, use --replication-factor and --partitions properties.
... View more
Labels:
01-20-2020
06:59 AM
4 Kudos
When we face high CPU issues, the next step is to identify the application and the thread causing the same. This article explains how to identify the thread/s causing high CPU issues.
In order to identify high CPU issues, the first thing we have to do is to identify the PID. This can be done by using a simple "top" command, so if you see a PID constantly using more than 70-100% CPU, then run the following command:
top -H -b -n1 -p <PID>
The command above will show us all the threads associated with that specific PID. The following is an example:
[kafka@c489-node2 conf]$ top -H -b -n1 -p 25910
top - 14:21:54 up 84 days, 12:32, 1 user, load average: 6.79, 6.62, 6.69
Threads: 145 total, 0 running, 145 sleeping, 0 stopped, 0 zombie
%Cpu(s): 18.3 us, 4.0 sy, 0.0 ni, 77.6 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 20971520 total, 16874368 free, 1725732 used, 2371420 buff/cache
KiB Swap: 5242872 total, 5242872 free, 0 used. 17874368 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25910 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.04 java
26259 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:02.96 java
26260 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26261 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26262 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26263 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26264 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26265 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26266 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26267 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26268 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26269 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26270 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26271 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26272 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26273 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26274 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26275 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26276 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26277 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26278 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26279 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26280 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26281 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26282 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26283 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.53 java
26284 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26285 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26286 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
From the above output, we have to check %CPU column. Thread ids that you identify using 70% or more are the threads we are looking for. Above command should be taken 5 to 10 times every 5 to 10 seconds during the issue, this will help us to check a pattern in the threads, because sometimes one thread that was using 70%, 10 seconds later goes down to 0 or 10%, as a result, we have to check for the threads constantly using most of the %CPU memory, then if you see that one or more threads are using 70% or more in all the snapshots, then we have identified our high CPU thread. At the same time, we have to take some thread dumps, same timings 5 to 10 seconds, 5 to 10 times each, this will print all PID associated threads.
Thread dumps can be taken as follows:
1. Change to the PID user owner
2. JAVA_HOME/jstack -l <PID>
The example output is very large, on the other hand, each thread is seen as follows:
[kafka@c489-node2 conf]$ /usr/jdk64/jdk1.8.0_112/bin/jstack -l 25910
2020-01-20 14:29:45
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #386 daemon prio=9 os_prio=0 tid=0x00007fd524001000 nid=0xad96 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
....
more lines here
....
"kafka-network-thread-1003-ListenerName(PLAINTEXT)-PLAINTEXT-2" #95 prio=5 os_prio=0 tid=0x00007fd629b43000 nid=0x672a runnable [0x00007fd3a23da000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c94d9e70> (a sun.nio.ch.Util$3)
- locked <0x00000000c94d9e60> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c94d9d48> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:686)
at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
at kafka.network.Processor.poll(SocketServer.scala:665)
at kafka.network.Processor.run(SocketServer.scala:582)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
From the above output, you can see that the thread has the following identifier which is a hexadecimal value: nid=0x672a
Then from the previous output "top -H -b -n1 -p <PID>" and after identified our thread or threads using more than 70% constantly, we will get a decimal number, let's say 25910, which is the first in the thread list previously added to this article. We have to change this value from decimal to hexadecimal, for example,
25910 is 6536 in hexadecimal value
After that, we have to search for 6536 un our thread dumps to identify the thread causing the high CPU issue. In some cases you may see that the identifies thread is something like below:
"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007fd62803f800 nid=0x669a runnable
The above thread usually means that the available memory for the application is not enough and more memory is required.
In summary, the steps to identify a high CPU issue are:
Run top command to identify the PID using high CPU.
When PID is identified, run the following command 5 to 10 times every 5 to 10 seconds to identify the threads associated with the PID and their usage top -H -b -n1 -p <PID>, at the same time take some thread dumps, same amount (5 to 10 every 5 to 10 seconds) using jstack -l <pid> with the PID owner.
Finally, when we identify the thread/s consuming most of the CPU, change the value from decimal to hexadecimal and search for those in the thread dumps.
There is a command we can run to automate this process, basically, take the thread dumps and CPU thread output.
for i in {1..10}; do echo $i ; top -H -b -n1 -p <PID> > /tmp/cpu-$(date +%s)-$i.log; JAVA_HOME/jstack -l <PID> > /tmp/jstack-$(date +%s)-$i ;sleep 2; done
Example command with values:
for i in {1..10}; do echo $i ; /usr/jdk64/jdk1.8.0_112/bin/jstack -l 293558 > /tmp/kafka/jstack-$(date +%s)-$i ;sleep 2; done
Also, I have created a simple script to identify the threads causing the issue, feel free to modify the same accordingly:
#Execution: ./highCpu.sh <argument1 = PID> <argument2 = sleep in seconds>
echo "=== Script to identify threads for High CPU issues ==="
read -p "Select the path to store jstacks - i.e /tmp/highCPU: " jPath
read -p "Select the JVM path - i.e /usr/jdk64/jdk1.8.0_112/bin: " JVMPath
for i in {1..10}; do echo "Collecting thread dumps and cpu: " $i "of 10" ; top -H -b -n1 -p $1 > $jPath/cpu-$(date +%s)-$i.log; $JVMPath/jstack -l $1 > $jPath/jstack-$(date +%s)-$i ;sleep $2; done
echo
echo "PID %CPU"
tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1 " " $9} }'
column1=$(tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1} }')
hexValues=$(printf '%x\n' $column1)
echo
echo "Printing HEX values of high usage threads: " $hexValues
... View more
Labels:
11-20-2019
09:58 AM
@ManuelCalvo Thanks. Your solution works. My environment does not have Kerberos and Ranger enabled. So, I skipped step 2 For step 4, hbase.coprocessor.master.classes = org.apache.atlas.hbase.hook.HBaseAtlasCoprocessor Also, as I found out, Atlas currently only stores HBase metadata and does not store HBase lineage data as it does for Hive.
... View more
10-17-2019
04:47 AM
Hey, Optimizing your Kafka Cluster depends upon your cluster usage & use-case. Based on your main concern like throughput or CPU utilization or Memory/Disk usage, you need to modify different parameters and some changes may have an impact on other aspects. For example, if acknowledgments is set to "all", all brokers that replicate the partitions need to acknowledge that the data was written prior to confirming the next message needs to be sent. This will ensure data consistency but increase CPU utilization and network latency. Refer Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines) article[1] written by Jay Kreps(Co-founder and CEO at Confluent). [1]https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines Please let me know if this helps. Regards, Ankit.
... View more
10-08-2019
10:00 AM
@Peruvian81 You can try below flow which is just for testing purposes: Basically I have a tailFile processor passing data through splitText then these messages are sent to PublishKafka_1_0(use this processor for this test), finally I created a consumer to consume data from the same topic configured in PublishKafka_1_0 storing the data in the file system with putFile. In putFile I have configured Maximum File Count to 10, to avoid excessive space usage in the file system.
... View more
10-07-2019
02:01 PM
I got the solution - I should send messages to sandbox-hdf.hortonworks.com and see those messages in HDF rather than HDP.
... View more
10-03-2019
12:24 PM
Hi @Peruvian81 Kafka has multiple ways to be secured: SSL Kerberos PLAINTEXT No No SSL Yes No SASL_PLAINTEXT No Yes SASL_SSL Yes Yes If you already are using Kerberos, you can check the document below: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.4/authentication-with-kerberos/content/kerberos_kafka_configuring_kafka_for_kerberos_using_ambari.html For your clients, you can use below command line depending of the Kafka version: consumer example: bin/kafka-console-consumer.sh --bootstrap-server <kafkaHost>:<kafkaPort> --topic <topicName> --security-protocol SASL_PLAINTEXT For newer versions, consumer example: bin/kafka-console-consumer.sh --topic <topicName> --bootstrap-server <brokerHost>:<brokerPort> --consumer-property security.protocol=SASL_PLAINTEXT * Make sure to get a valid Kerberos ticket before running these commands (kinit -kt keytab principal) ** Ensure the Kerberos principal has permissions to publish/consume data from/to the selected topic
... View more
09-26-2019
07:44 AM
@Peruvian81 You can try below command for the consumer: ./kafka-console-consumer.sh --bootstrap-server w01.s03.hortonweb.com:6667 --topic PruebaNYC --consumer-property security.protocol=SASL_PLAINTEXT --from-beginning If that solves your issue, kindly put this thread as solved. Thanks.
... View more
08-23-2019
12:50 PM
Hi @BaluSaiD, If the topics you're producing/consuming data to/from have at least 2/3 in-sync replicas and min.insync.replicas=2, then it should be ok. If some topics have just 1 replica, and this broker dies, then you will not be able to produce/consume data from this topic. Properties to keep in mind: 1. Server-side: min.insync.replicas: When a producer sets acks to "all" (or "-1"), min.insync.replicas specify the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write. 2. Producer Side: ack: The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgment from all followers. In this case, should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting. Regards, Manuel.
... View more
- « Previous
-
- 1
- 2
- Next »