Member since
06-27-2019
147
Posts
8
Kudos Received
11
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
890 | 01-31-2022 08:42 AM | |
170 | 11-24-2021 12:11 PM | |
303 | 11-24-2021 12:05 PM | |
1041 | 10-08-2019 10:00 AM | |
1378 | 10-07-2019 12:08 PM |
02-01-2022
08:23 PM
This article provides the steps and fields required to configure Streams Replication Manager using external accounts between two kerberized clusters.
This article assumes that Kerberos is configured properly between the clusters and we can produce and consume data correctly.
Environment details :
Cluster A : co-located cluster (SRM and Kafka running in this cluster) CDP 7.1.7
Cluster B : External cluster which is another CDP 7.1.7
Both clusters are using SASL_PLAINTEXT security.protocol for their clients
Clusters are configured with cross-domain realm (MIT kerberos), more details on cross-domain realm configuration here
Steps to configure external accounts (feature available from CDP 7.1.7)
Go to cluster A > Cloudera Manager > Administration > External accounts > Kafka Credentials tab
Configure the following fields:
Name
Bootstrap servers
Security protocol
JAAS Secret [1-3]
JAAS Template
Kerberos Service Name
SASL Mechanism
Example:
Name
c289
Bootstrap servers
c289-node2.clusterB.com:9092,c289-node3. clusterB.com:9092
Security protocol
SASL_PLAINTEXT
JAAS Secret 1
kafka/c189-node4.clusterA.com@CLUSTERA.COM
JAAS Secret 2
/opt/cloudera/kafka.keytab
JAAS Template
c om.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="##JAAS_SECRET_1##" principal="##JAAS_SECRET_2##";
Kerberos Service Name
kafka
SASL Mechanism
GSSAPI
In the JAAS secret fields, we can also use "username", "password" and replace the Krb5LoginModule with PlainLoginModule
Also, make sure that if we are using more than one srm driver, copy the keytab to each srm-driver host with the correct permissions (SRM PID owner), for example: -rw------- 1 streamsrepmgr streamsrepmgr 216 Jan 28 14:32 kafka.keytab
We can also configure an external account for the co-located cluster, but this is not required
* values means not required
Finally, go to cluster A > Cloudera Manager > Streams Replication Manager > Configuration and add the external account name c289 in the External Kafka Accounts field:
Configure the replication details under cluster A > Cloudera Manager > Streams Replication Manager > Configuration > Streams Replication Manager's Replication Configs
Start the SRM cluster and validate that the properties are correct in the srm-driver.log files.
Additional details about SRM configuration Configuring Streams Replication Manager.
... View more
Labels:
01-31-2022
08:42 AM
1 Kudo
@inyongkim Try CM > Kafka > Configuration > (use filter) Kafka Connect Advanced Configuration Snippet (Safety Valve) for connect-distributed.properties Then add: connector.client.config.override.policy=All Check the kafka connect log files for this line and see if the property changed properly: "org.apache.kafka.connect.runtime.distributed.DistributedConfig: DistributedConfig values" Let me know if that helped.
... View more
01-24-2022
10:40 AM
1 Kudo
@mike_bronson7 In kafka 0.1x we will see this statement (Consumer group ‘deeg_data’ is rebalancing) when the group is rebalancing but in newer versions, we will see something like: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
GroupName topicName 0 0 0 0 - - - Which means no active consumers in this group (or rebalancing). Regarding rebalancing of a group this can be triggered for multiple reasons, but mostly because of: 1. A new consumer is added/joined to the group 2. A consumer was removed from the group (because of client shutdown, timeout, network glitches) 3. Timeout issues between brokers/client To get more details about consumers rebalancing (if no errors from the broker side) checking the application log files might provide some details about the underlying issue.
... View more
01-24-2022
10:06 AM
1 Kudo
Hi @mike_bronson7 1. Do you see anything interesting from the broker 1010 log file? this is to try to understand why 1010 is not able to register in zookeeper. 2. Try forcing a new controller by using: [zk: localhost:2181(CONNECTED) 11] rmr /controller 3. Are these broker ids unique? if you describe other topics, do you see the same brokers ids and same behavior (leader none for some partitions)? 4. Fi nally, if this is dev env: 4.1 You can enable unclean leader election = true and restart the brokers Or: 4.2 (if this happening just for this topic) remove __consumer_offsets topic (just from zookeeper) and restart kafka
... View more
01-17-2022
05:17 AM
Hi @danurag It's recommended to set up retention at the topic level (unless you want all your topics to use 24 hours by default), example: kafka-configs --bootstrap-server <brokerHost:brokerPort> --alter --entity-type topics --entity-name <topicName> --add-config retention.ms =3600000 The most common configuration for how long Kafka will retain messages is by time. The default is specified in the configuration file using the log.retention.hours parameter, and it is set to 168 hours, or one week. However, there are two other parameters allowed, log.retention.minutes and log.retention.ms . All three of these control the same goal (the amount of time after which messages may be deleted) but the recommended parameter to use is log.retention.ms , as the smaller unit size will take precedence if more than one is specified. This will make sure that the value set for log.retention.ms is always the one used. If more than one is specified, the smaller unit size will take precedence.
... View more
11-30-2021
09:48 AM
@sarm After digging a little bit more, there is a metric exposed by producers called kafka.producer:type=producer-metrics,client-id=producer-1 > objectName record-send-total: This will show the total number of records sent by this producer. To get more details about the available metrics in Kafka I would suggest checking this Cloudera community article.
... View more
11-29-2021
08:22 PM
1 Kudo
Components required:
jconsole (UI required)
jmxterm (for Linux environment - CLI only)
Kafka client (java producer/consumer) exposing JMX
Kafka Brokers exposing jmx
Steps to get the available metrics (mbeans) available in a Kafka consumer (Environment with a UI available):
Add the following JVM properties to your java consumer: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=9090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
After starting, your consumer uses jconsole to connect to the host:port specified in Step 1.
After we add the <producer-hostname>:<JMXPort>, we should be able to see the following in the jconsole UI.
Here, we have to click on the mbeans tab and navigate through the available metrics, in this example, we want to see the "records-consumer-rate" which in the program has the following definition "The average number of records consumed per second". In this case, the average number of messages processed by this consumer is 0.5.
If we want to pass this to a Kafka command line, we have to get the ObjectName from jconsole.
After that, run the following command line and replace the "objectName" accordingly. Example output: ./kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://clientHost:clientJMXPort/jmxrmi --object-name 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1'
Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://clientHost:clientJMXPort/jmxrmi.
"time","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:bytes-consumed-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:bytes-consumed-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-latency-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-latency-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-size-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-size-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-throttle-time-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-throttle-time-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-consumed-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-consumed-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-lag-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-lead-min","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-per-request-avg"
1638221356007,9.076115605876556,12850.0,669.0075187969925,770.0,3.0063291139240507,18.0,18.0,0.0,0.0,1755.0,0.5042286447709198,720.0,0.0,2002.0,1.0
1638221358013,9.072183021431389,12868.0,669.2446043165468,770.0,3.005860346430811,18.0,18.0,0.0,0.0,1761.0,0.5040101678572994,721.0,0.0,2002.0,1.0
1638221360012,9.068771517339826,12886.0,669.951724137931,770.0,3.005492797181055,18.0,18.0,0.0,0.0,1767.0,0.5038206398522126,722.0,0.0,2002.0,1.0 Each comma is separated by a different metric. If you count the number of metrics available in jconsole and you want to identify the "records-consumed-rate", just count the number of lines in jconsole, and then count the number of commas in the output, in this case, the records-consumed-rate is listed in the jconsole line 12: Then taking one line from the terminal output we see that line 12 value is "0.5038206398522126": 1638221360012,9.068771517339826,12886.0,669.951724137931,770.0,3.005492797181055,18.0,18.0,0.0,0.0,1767.0,0.5038206398522126,722.0,0.0,2002.0,1.0 The above steps apply to a producer and brokers; we just have to identify the JMX port used by the service and make sure we have access to get the metrics. In the case we don't have a UI or access to the JMX ports from external hosts, jmxterm is a good alternative to list the mbeans available. See the steps to run jmxterm below:
Download jmxterm from the official site.
In the terminal (make sure the JMX port is available for your service); execute the following: java -jar jmxterm-1.0.2-uber.jar --url <kafkahost>:<kafkaJMXPort>
If the connection is successful, we will see the following: Welcome to JMX terminal. Type "help" for available commands.
$>
Here we can list the mbeans available for the service that we are connected to, for example, trimmed for a broker host: $>beans
...
...
#domain = kafka.controller:
kafka.controller:name=ActiveControllerCount,type=KafkaController
kafka.controller:name=AutoLeaderBalanceRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControlledShutdownRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControllerChangeRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControllerShutdownRateAndTimeMs,type=ControllerStats
...
...
Then if we want to get the active controller metric, we can use: [root@brokerHost ~]# kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://brokerHost:brokerJMXPort/jmxrmi --object-name 'kafka.controller:name=ActiveControllerCount,type=KafkaController'
21/11/29 21:50:26 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://brokerHost:brokerJMXPort/jmxrmi.
"time","kafka.controller:type=KafkaController,name=ActiveControllerCount:Value"
1638222626788,1
1638222628783,1
1638222630783,1
... View more
Labels:
11-29-2021
05:20 AM
Hi @hbinduni From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly-once delivery. In particular producer, retries will no longer introduce duplicates. It's important to mention that If the producer is already configured with acks=all , there will be no difference in performance. Additionally, the Order of messages produced to each partition will be guaranteed, through all failure scenarios, even if max.in.flight.requests.per.connection is set to more than 1 (5 is the default, and also the highest value supported by the idempotent producer). More details in the document below: https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
... View more
11-29-2021
05:07 AM
Hi @dansteu The kerberos service name property has to be the service name specified for the kafka service which is usually "kafka".
... View more
11-24-2021
02:22 PM
Hi @sarm I think there is no metric for that, on the other hand, you can create a simple java consumer and add the following details: // consumer details here
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Date date;
System.out.println(date = new Date(record.timestamp()));
System.out.printf("Partition = %d\n",record.partition());
System.out.printf("Offset = %d\n", record.offset());
System.out.printf("Key = %s\n", record.key());
System.out.printf("Value = %s\n", record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
} This should provide the following output: Wed Nov 24 19:16:27 CLST 2021
Partition = 0
Offset = 439
Key = null
Value = S Then you can create some logic to count the number of messages between some specific timing. I hope that helps.
... View more
11-24-2021
02:00 PM
Hi @AshwinPatil If I understood correctly, the question is if topic alter configs will take precedence over the broker global settings, right? if yes, then the answer is "yes" if we alter the topic using retention.ms, for example, this will take presence over log.retention.hours specified in the brokers.
... View more
11-24-2021
12:24 PM
@Ani1991 From the documentation: https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/smm-security/topics/smm-securing-streams-messaging-manager.html " If you deploy SMM without security, the login page is not enabled on the SMM UI by default. When you enable Kerberos authentication, SMM uses SPNEGO to authenticate users and allows them to view or create topics within Kafka by administering Ranger Kafka Policies. " This looks like a Kerberos issue with the token in cache in the machine that you're trying to access the SMM UI. Can you try using firefox browser and make sure it's configured properly, documentation for more details: https://docs.cloudera.com/documentation/enterprise/latest/topics/cdh_sg_browser_access_kerberos_protected_url.html
... View more
11-24-2021
12:11 PM
Hi @jaeseung For SMM "WARNING": The status of a replication flow is calculated based on the replication latency and the throughput. if any of the metrics are not present -> INACTIVE if latency max and latency age is smaller than a fixed grace period (60 sec), and throughput max is not zero -> ACTIVE if throughput age is smaller than a fixed grace period (60 sec), and throughput max is not zero -> WARNING otherwise -> INACTIVE
... View more
11-24-2021
12:05 PM
Hi @jaeseung The client configurations have to be passed using the cluster alias replication: for consumer configs: primary->secondary.consumer.<config> > for producer configs: primary->secondary.producer.override.<config> Please try using Under SRM configs: <source>-><target>.producer.override.max.request.size=<desired value> If that doesn't work use: <source>-><target>.producer.max.request.size=<desired value>
... View more
04-30-2021
08:41 AM
From Kafka perspective max.poll.records is an upper bound property in the number of messages that can be retrieved in a single poll call, a consumer is constantly consuming messages, for example, imagine that you have a topic and you send 10 messages, if max.poll.records are 10000 then the messages would never be consumed, so for the same reason, this is just an upper bound that's usually configured when the consumers start timing out because the processing of those messages is not happening in the max.poll.interval.ms (default 5 minutes). In summary, consumers are constantly consuming messages (1 or many), and max.poll.records is just an upper bound property used to control the number of messages we can get in each poll call to make sure these messages are processed on time (max.pol.interval.ms). Hope that information clarifies the usage of that property.
... View more
04-30-2021
08:31 AM
I'm afraid that kafka doesn't come with HDFS sink connectorsor or something similar out of the box in HDP 2.6.5 , this is coming from CDP 7.1.1. I believe nifi or spark are alternatives that can be used for this.
... View more
04-30-2021
08:14 AM
I would suggest checking the keystores you're using in the nifi consumer with a simple producer/consumer in the kafka host itself, for example: Create a file called client.properties and add the SSL details, example below: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/configuring-wire-encryption/content/configuring_kafka_producer_and_kafka_consumer.html Then run the consumer and see if the issue is replicated, if yes, you can enable debugging for the client to get more details about the exception. I hope that helps to find the root cause.
... View more
04-30-2021
08:05 AM
Support for Kafka connect is added in CDP 7.1.1, please review the documentation below: https://docs.cloudera.com/runtime/7.1.1/release-notes/topics/rt-whats-new-kafka.html
... View more
04-30-2021
07:54 AM
If the connector is running on top of CDP, you can check the log files under /var/log/kafka or /run/cloudera-scm-agent/process/xxxxxxxx-kafka-KAFKA_CONNECT/logs If this is standalone kafka connect and no details in any log file, I would suggest adding the below JVM property to the process: -XX:ErrorFile=targetDir/hs_err_pid_%p.log The above property is to create a file when the JVM crash, when this happens no details are added to the log files. Hope that helps to find the root cause.
... View more
04-30-2021
07:44 AM
If I understood correctly you're asking for the connectors provided by Cloudera, could you please confirm? If yes in the below document you can find the current connectors supported by Cloudera: https://docs.cloudera.com/cdp-private-cloud-base/7.1.5/kafka-connect/topics/kafka-connect-connector.html On the other hand, you can load any connector by following the steps mentioned in the document below: https://docs.cloudera.com/cdp-private-cloud-base/7.1.5/kafka-connect/topics/kafka-connect-connector-install.html Please let us know if that answers your question.
... View more
06-18-2020
12:01 PM
This is a step by step guide to test Kafka clients from a Windows machine that connects to an HDF/HDP environment.
We start with the review of the current Kafka broker listeners. In this case, we will cover the following:
SASL_PLAINTEXT > Kerberized environments
PLAINTEXT > Plain connections
This can be done by using the Ambari console > Kafka > configs > Kafka Broker . A fter that, search for listeners and make sure either one or both protocols are enabled.
PLAINTEXT security protocol
Go to your Windows machine and download the apache Kafka software.
It is recommended to download the same version that it's running in your HDP/HDF cluster. Select the "Scala 2.12" link to avoid exceptions while running the Kafka clients.
Extract the content of this folder in a preferred location in the Windows host.
While connecting to Kafka through PLAINTEXT listener, Kafka does not have a way to identify you as a user. Hence, add Kafka ACLs and give permissions to ANONYMOUS users. To achieve this run the following command as a Kafka user in one of the Kafka brokers: /usr/hd<p/f>/current/kafka-broker/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<zkHost>:<zkPort> --add --allow-principal User:ANONYMOUS --operation All --topic=* --group=* --cluster The above command will give all permissions to the anonymous user in Kafka, change the topic and group to specific ones if required.
In a Kafka host, create a new test topic or use an existing one. To create a new topic, run the following command with the Kafka user: kafka-topics --create --topic <topicName> --partitions <N of partitions> --replication-factor <N of replicas> --zookeeper <zkHost>:<zkPort>
When adding anonymous user permissions, go to our Windows Machine and navigate to the following Kafka folder: Note: This step assumes that we already have connectivity to the brokers and the firewall and DNS (if any) are configured properly. C:\<preferred location>\kafka_<version>\bin\windows
In this folder, there is a list of .bat files, similar to the ones in Linux hosts with .sh extension. In order to run .bat producer, use the following command: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName>
To run a consumer, please run the following command: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning
Run the clients using Kerberos (SASL_PLAINTEXT)
To run the clients using Kerberos (SASL_PLAINTEXT), first ensure that Kerberos is configured properly in the environment. Once you get valid tickets, do the following to connect with the Kafka clients:
If using Kafka Ranger plugin, go to Ranger Admin UI -> Kafka and add a new policy for the user that is used to connect from Windows host pointing to the topic/s that needs access.
After the Ranger policies are configured, then go to the Windows Host and configure the Kerberos details for the Kafka client connection. To achieve this, do the following:
Create a file with extension .conf and add the following content: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=false
serviceName="kafka";
keyTab="/path_to_file/file.keytab"
principal="principal_name@REALM.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path_to_file/file.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="principal_name@REALM.COM";
};
client: is used to connecting to the Zookeeper and KafkaClient is to connect to the Kafka Brokers.
principal: is the user that will be used to connect from Windows to the Kafka Brokers (the same user that we add grants in Ranger UI)
keyTab: is the keytab file that contains the principal specified in "principal".
With that file created, open a Windows Command Prompt and execute the following command before running any command line: set KAFKA_OPTS="-Djava.security.auth.login.config=/path_to_conf_file/file.conf" That command will pass the keytab/principal to the Kafka client.
In the same command prompt, run a Kafka Producer/Consumer using the following commands for Kafka versions <= 1.0: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName> --security-protocol SASL_PLAINTEXT For the consumer, use the following command line: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning --security-protocol SASL_PLAINTEXT For Kafka versions > 1.0, use the following producer/consumer command line: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName> --producer-property security.protocol=SASL_PLAINTEXT For Kafka consumer > 1.0, use the following command line C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning
--consumer-property security.protocol=SASL_PLAINTEXT
... View more
Labels:
06-15-2020
09:37 AM
From jaas file I see that the debug=true was added, on the other hand, the debug is not showing up in the producer output, which means that the jaas file provided is not picker up properly. If you check the kafka-console-producer.sh you'll notice below lines: # check if kafka_jaas.conf in config , only enable client_kerberos_params in secure mode.
KAFKA_HOME="$(dirname $(cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ))"
KAFKA_JAAS_CONF=$KAFKA_HOME/config/kafka_jaas.conf
if [ -f $KAFKA_JAAS_CONF ]; then
export KAFKA_CLIENT_KERBEROS_PARAMS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_client_jaas.conf"
fi Try editing kafka_client_jaas.conf or also you can try to export using KAFKA_CLIENT_KERBEROS_PARAMS and see if that helps. Regards, Manuel.
... View more
03-02-2020
12:27 PM
1 Kudo
This community article assumes that we already have CDH 6.x and Kerberos enabled, in case we have to install Kerberos, please use the document below:
https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/cm_sg_intro_kerb.html
1. Install a database
In this case, we are using MySQL:
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-installing_mysql.html
2. Configure the database for schema registry and SMM
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-configuring-schema-registry-metadata-stores-in-mysql.html
3. Download Schema Registry and SMM parcels
SMM https://www.cloudera.com/downloads/cdf/csm.html
CSR https://www.cloudera.com/downloads/cdf/csp.html
4. Install the Parcels
Install the services in this order:
1. Schema Registry
2. SRM (if no SRM installation, avoid this step)
3. SMM
https://docs.cloudera.com/csp/2.0.1/deployment/topics/csp-get-parcel-csd.html
5. Distribute and activate the parcels.
In Schema registry point “Schema Registry storage connector url” to the mysql hostname. Check “Enable Kerberos Authentication”.
Use the database registry password for “Schema Registry storage connector password”
5.1 For SMM use
cm.metrics.host = cloudera manager host
cm.metrics.password = cloudera manager UI password
cm.metrics.service.name = kafka (default)
Streams Messaging Manager storage connector url = jdbc:mysql://FQDN_MYHSQL:3306/streamsmsgmgr
Streams Messaging Manager storage connector password = user database password specified
Check “Enable Kerberos Authentication”
6. Add Kafka service
Check "Enable Kerberos Authentication"
7. Configure and access the SMM UI
Property "cm.metrics.service.name" must match with the Kafka service name, by default is "kafka"
Create streamsmsgmgr principal in the KDC, example when using MIT KDC
kadmin.local
add_principal streammsmmgr
Finally copy the /etc/krb5.conf to your local machine and get a valid kerberos ticket for streammsmmgr user by using "kinit streammsmmgr" and use the same password chosen for the user creation time.
... View more
- Find more articles tagged with:
- cdh-6
- schema-registry
01-31-2020
09:06 AM
@mike_bronson7 Kafka broker needs at least the following number of file descriptors to just track log segment files: (number of partitions)*(partition size / segment size) You can review the current limits configuration under: cat /proc/<kafka_pid>/limits If you want to change them, if you're using ambari console you can go to > Kafka > config > and search for " kafka_user_nofile_limit " Finally, To see open file descriptors, run: lsof -p KAFKA_BROKER_PID
... View more
01-24-2020
07:13 AM
1 Kudo
Sometimes when we create our clusters, we use a small amount of Kafka brokers. After some time, there may be a requirement for adding more brokers, usually because of load, performance or high availability. In this article, we provide the considerations and steps to keep in mind after adding new brokers to HDP/HDF clusters.
If there is one Kafka broker in a cluster and plans are to add more brokers for high availability, it's important to mention that Kafka has an internal topic called __consumer_offsets. This topic is created by Kafka internally to store the consumer's committed offsets. At this point, if there is only one broker, Kafka will, by default, create this topic using one single replica and 50 partitions. As a result, it's highly recommended to change the number of replicas for this topic when adding more brokers to the cluster. These numbers are handled by the following properties:
offsets.topic.num.partitions: The number of partitions for the offset commit topic (should not change after deployment).
offsets.topic.replication.factor: The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
One of the considerations after adding new Kafka brokers to a cluster, is that Kafka doesn't have a way to reassign the current existing topics to the new brokers automatically. This has to be done manually using a script that comes with Kafka installation called Kafka reassign partition tool, bin/kafka-reassign-partitions.sh. In other words, if N topics are already created and assigned to a broker and a replica, after adding more brokers, use the reassign partition tool to increase the number of replicas. The following is a Kafka command-line example to add more partitions:
Topic using 1 partition and 1 replica:
[kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
After adding 2 new brokers, the topic "testTopic2" will remain exactly the same. To add more replicas and partitions, the following steps need to be performed:
Increase the number of partitions: /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3 Example: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper c489-node2:2181 --alter --topic testTopic2 --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partition The following should be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:1 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002 Isr: 1002
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003 Isr: 1003
Increase the number of replicas:
First create a JSON file with the topic that has to be modified: {
"version":1,
"partitions":[
{"topic":"testTopic2","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic2","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic2","partition":2,"replicas":[1003,1001,1002]}
]
} From the above output, specify different orders in the replicas. This is because the first broker id in the list of replicas will be the partition leader. This helps to distribute the partitions among brokers.
Run the following command to apply the changes: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper c489-node2:2181 --reassignment-json-file /tmp/topic-replication.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"testTopic2","partition":0,"replicas":[1001],"log_dirs":["any"]},{"topic":"testTopic2","partition":2,"replicas":[1003],"log_dirs":["any"]},{"topic":"testTopic2","partition":1,"replicas":[1002],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
The following will be the output: [kafka@c489-node2 ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --topic testTopic2 --zookeeper c489-node2:2181
Topic:testTopic2 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: testTopic2 Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1001,1002,1003
Topic: testTopic2 Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1001,1003
Topic: testTopic2 Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1003,1001,1002
To modify multiple topics, use the following JSON template:
{
"version":1,
"partitions":[
{"topic":"testTopic3","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic3","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic3","partition":2,"replicas":[1003,1001,1002]},
{"topic":"testTopic4","partition":0,"replicas":[1001,1002,1003]},
{"topic":"testTopic4","partition":1,"replicas":[1002,1003,1001]},
{"topic":"testTopic4","partition":2,"replicas":[1003,1001,1002]}
]
}
In the above JSON template, testTopic3 and testTopic4 are modified. To add more, the important thing to note is that the latest "topic" line must not have a comma.
In summary:
Add the new brokers to HDP/HDF using ambari UI.
Kafka doesn't reassign the topics that are created automatically after adding new brokers. Follow the steps previously provided to reassign the already created topics. For new topics, use --replication-factor and --partitions properties.
... View more
Labels:
01-20-2020
06:59 AM
4 Kudos
When we face high CPU issues, the next step is to identify the application and the thread causing the same. This article explains how to identify the thread/s causing high CPU issues.
In order to identify high CPU issues, the first thing we have to do is to identify the PID. This can be done by using a simple "top" command, so if you see a PID constantly using more than 70-100% CPU, then run the following command:
top -H -b -n1 -p <PID>
The command above will show us all the threads associated with that specific PID. The following is an example:
[kafka@c489-node2 conf]$ top -H -b -n1 -p 25910
top - 14:21:54 up 84 days, 12:32, 1 user, load average: 6.79, 6.62, 6.69
Threads: 145 total, 0 running, 145 sleeping, 0 stopped, 0 zombie
%Cpu(s): 18.3 us, 4.0 sy, 0.0 ni, 77.6 id, 0.0 wa, 0.0 hi, 0.0 si, 0.0 st
KiB Mem : 20971520 total, 16874368 free, 1725732 used, 2371420 buff/cache
KiB Swap: 5242872 total, 5242872 free, 0 used. 17874368 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
25910 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.04 java
26259 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:02.96 java
26260 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26261 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26262 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26263 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26264 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26265 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26266 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26267 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26268 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26269 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26270 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26271 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26272 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26273 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.26 java
26274 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26275 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.27 java
26276 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26277 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26278 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26279 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26280 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.25 java
26281 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.23 java
26282 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.24 java
26283 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.53 java
26284 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26285 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
26286 kafka 20 0 13.6g 680608 22108 S 0.0 3.2 0:00.00 java
From the above output, we have to check %CPU column. Thread ids that you identify using 70% or more are the threads we are looking for. Above command should be taken 5 to 10 times every 5 to 10 seconds during the issue, this will help us to check a pattern in the threads, because sometimes one thread that was using 70%, 10 seconds later goes down to 0 or 10%, as a result, we have to check for the threads constantly using most of the %CPU memory, then if you see that one or more threads are using 70% or more in all the snapshots, then we have identified our high CPU thread. At the same time, we have to take some thread dumps, same timings 5 to 10 seconds, 5 to 10 times each, this will print all PID associated threads.
Thread dumps can be taken as follows:
1. Change to the PID user owner
2. JAVA_HOME/jstack -l <PID>
The example output is very large, on the other hand, each thread is seen as follows:
[kafka@c489-node2 conf]$ /usr/jdk64/jdk1.8.0_112/bin/jstack -l 25910
2020-01-20 14:29:45
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #386 daemon prio=9 os_prio=0 tid=0x00007fd524001000 nid=0xad96 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
....
more lines here
....
"kafka-network-thread-1003-ListenerName(PLAINTEXT)-PLAINTEXT-2" #95 prio=5 os_prio=0 tid=0x00007fd629b43000 nid=0x672a runnable [0x00007fd3a23da000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000c94d9e70> (a sun.nio.ch.Util$3)
- locked <0x00000000c94d9e60> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c94d9d48> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.java:686)
at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
at kafka.network.Processor.poll(SocketServer.scala:665)
at kafka.network.Processor.run(SocketServer.scala:582)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
From the above output, you can see that the thread has the following identifier which is a hexadecimal value: nid=0x672a
Then from the previous output "top -H -b -n1 -p <PID>" and after identified our thread or threads using more than 70% constantly, we will get a decimal number, let's say 25910, which is the first in the thread list previously added to this article. We have to change this value from decimal to hexadecimal, for example,
25910 is 6536 in hexadecimal value
After that, we have to search for 6536 un our thread dumps to identify the thread causing the high CPU issue. In some cases you may see that the identifies thread is something like below:
"Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007fd62803f800 nid=0x669a runnable
The above thread usually means that the available memory for the application is not enough and more memory is required.
In summary, the steps to identify a high CPU issue are:
Run top command to identify the PID using high CPU.
When PID is identified, run the following command 5 to 10 times every 5 to 10 seconds to identify the threads associated with the PID and their usage top -H -b -n1 -p <PID> , at the same time take some thread dumps, same amount (5 to 10 every 5 to 10 seconds) using jstack -l <pid> with the PID owner.
Finally, when we identify the thread/s consuming most of the CPU, change the value from decimal to hexadecimal and search for those in the thread dumps.
There is a command we can run to automate this process, basically, take the thread dumps and CPU thread output.
for i in {1..10}; do echo $i ; top -H -b -n1 -p <PID> > /tmp/cpu-$(date +%s)-$i.log; JAVA_HOME/jstack -l <PID> > /tmp/jstack-$(date +%s)-$i ;sleep 2; done
Example command with values:
for i in {1..10}; do echo $i ; /usr/jdk64/jdk1.8.0_112/bin/jstack -l 293558 > /tmp/kafka/jstack-$(date +%s)-$i ;sleep 2; done
Also, I have created a simple script to identify the threads causing the issue, feel free to modify the same accordingly:
#Execution: ./highCpu.sh <argument1 = PID> <argument2 = sleep in seconds>
echo "=== Script to identify threads for High CPU issues ==="
read -p "Select the path to store jstacks - i.e /tmp/highCPU: " jPath
read -p "Select the JVM path - i.e /usr/jdk64/jdk1.8.0_112/bin: " JVMPath
for i in {1..10}; do echo "Collecting thread dumps and cpu: " $i "of 10" ; top -H -b -n1 -p $1 > $jPath/cpu-$(date +%s)-$i.log; $JVMPath/jstack -l $1 > $jPath/jstack-$(date +%s)-$i ;sleep $2; done
echo
echo "PID %CPU"
tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1 " " $9} }'
column1=$(tail -n +6 $jPath/cpu-* | awk '{ if ($9 > 80) { print $1} }')
hexValues=$(printf '%x\n' $column1)
echo
echo "Printing HEX values of high usage threads: " $hexValues
... View more
Labels:
11-27-2019
07:01 AM
@Sriram When a record is added to a batch, there is a time limit for sending that batch to ensure it has been sent within a specified duration controlled by request.timeout.ms. Default is 30 seconds, you can try increasing this value and monitor. Can you share processor configurations to get more details?
... View more
11-26-2019
09:40 AM
@sagarshimpi I believe debug for worker nodes its fine for the storm side. From the Kafka side, we can review the broker .log files usually located under /var/log/kafka/server.log. Also, you can monitor if there are any under replicated partitions by running the command line below: bin/kafka-topics.sh $ZK --describe --under-replicated-partitions
... View more
11-26-2019
07:14 AM
@sagarshimpi So if I understood correctly the issue is not occurring now, right? If not, then this could be related to a connection issue: [ERROR] Failed to reconnect to cluster (consider increasing 'ne
tworkTimeout' configuration property) [networkTimeout=5000]
... View more
11-26-2019
07:10 AM
@sampathkumar_ma export KAFA_OPTS should work in this case. Could you please add "debug=true" to the jaas file: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/home/<user>/user.keytab"
storeKey=true
debug=true
useTicketCache=false
serviceName="kafka"
principal="user@domain.COM";
}; Share the complete output, we should see something similar to: Debug is true storeKey false useTicketCache true useKeyTab false doNotPrompt false ticketCache is null isInitiator true KeyTab is null refreshKrb5Config is false principal is null tryFirstPass is false useFirstPass is false storePass is false clearPass is false
Acquire TGT from Cache
Principal is kafka/host@EXAMPLE.COM
Commit Succeeded Also along with that you can enable DEBUG under: /etc/kafka/conf/tools-log4j.properties Change WARN to DEBUG and run the client and share the details.
... View more