Member since
06-27-2019
147
Posts
9
Kudos Received
11
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 3936 | 01-31-2022 08:42 AM | |
| 1127 | 11-24-2021 12:11 PM | |
| 1984 | 11-24-2021 12:05 PM | |
| 2872 | 10-08-2019 10:00 AM | |
| 3872 | 10-07-2019 12:08 PM |
02-01-2022
08:23 PM
1 Kudo
This article provides the steps and fields required to configure Streams Replication Manager using external accounts between two kerberized clusters.
This article assumes that Kerberos is configured properly between the clusters and we can produce and consume data correctly.
Environment details:
Cluster A: co-located cluster (SRM and Kafka running in this cluster) CDP 7.1.7
Cluster B: External cluster which is another CDP 7.1.7
Both clusters are using SASL_PLAINTEXT security.protocol for their clients
Clusters are configured with cross-domain realm (MIT kerberos), more details on cross-domain realm configuration here
Steps to configure external accounts (feature available from CDP 7.1.7)
Go to cluster A > Cloudera Manager > Administration > External accounts > Kafka Credentials tab
Configure the following fields:
Name
Bootstrap servers
Security protocol
JAAS Secret [1-3]
JAAS Template
Kerberos Service Name
SASL Mechanism
Example:
Name
c289
Bootstrap servers
c289-node2.clusterB.com:9092,c289-node3.clusterB.com:9092
Security protocol
SASL_PLAINTEXT
JAAS Secret 1
kafka/c189-node4.clusterA.com@CLUSTERA.COM
JAAS Secret 2
/opt/cloudera/kafka.keytab
JAAS Template
com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="##JAAS_SECRET_1##" principal="##JAAS_SECRET_2##";
Kerberos Service Name
kafka
SASL Mechanism
GSSAPI
In the JAAS secret fields, we can also use "username", "password" and replace the Krb5LoginModule with PlainLoginModule
Also, make sure that if we are using more than one srm driver, copy the keytab to each srm-driver host with the correct permissions (SRM PID owner), for example: -rw------- 1 streamsrepmgr streamsrepmgr 216 Jan 28 14:32 kafka.keytab
We can also configure an external account for the co-located cluster, but this is not required
* values means not required
Finally, go to cluster A > Cloudera Manager > Streams Replication Manager > Configuration and add the external account name c289 in the External Kafka Accounts field:
Configure the replication details under cluster A > Cloudera Manager > Streams Replication Manager > Configuration > Streams Replication Manager's Replication Configs
Start the SRM cluster and validate that the properties are correct in the srm-driver.log files.
Additional details about SRM configuration Configuring Streams Replication Manager.
... View more
Labels:
01-31-2022
08:42 AM
1 Kudo
@inyongkim Try CM > Kafka > Configuration > (use filter) Kafka Connect Advanced Configuration Snippet (Safety Valve) for connect-distributed.properties Then add: connector.client.config.override.policy=All Check the kafka connect log files for this line and see if the property changed properly: "org.apache.kafka.connect.runtime.distributed.DistributedConfig: DistributedConfig values" Let me know if that helped.
... View more
01-24-2022
10:06 AM
1 Kudo
Hi @mike_bronson7 1. Do you see anything interesting from the broker 1010 log file? this is to try to understand why 1010 is not able to register in zookeeper. 2. Try forcing a new controller by using: [zk: localhost:2181(CONNECTED) 11] rmr /controller 3. Are these broker ids unique? if you describe other topics, do you see the same brokers ids and same behavior (leader none for some partitions)? 4. Finally, if this is dev env: 4.1 You can enable unclean leader election = true and restart the brokers Or: 4.2 (if this happening just for this topic) remove __consumer_offsets topic (just from zookeeper) and restart kafka
... View more
01-17-2022
05:17 AM
Hi @danurag It's recommended to set up retention at the topic level (unless you want all your topics to use 24 hours by default), example: kafka-configs --bootstrap-server <brokerHost:brokerPort> --alter --entity-type topics --entity-name <topicName> --add-config retention.ms=3600000 The most common configuration for how long Kafka will retain messages is by time. The default is specified in the configuration file using the log.retention.hours parameter, and it is set to 168 hours, or one week. However, there are two other parameters allowed, log.retention.minutes and log.retention.ms. All three of these control the same goal (the amount of time after which messages may be deleted) but the recommended parameter to use is log.retention.ms, as the smaller unit size will take precedence if more than one is specified. This will make sure that the value set for log.retention.ms is always the one used. If more than one is specified, the smaller unit size will take precedence.
... View more
11-30-2021
09:48 AM
@sarm After digging a little bit more, there is a metric exposed by producers called kafka.producer:type=producer-metrics,client-id=producer-1 > objectName record-send-total: This will show the total number of records sent by this producer. To get more details about the available metrics in Kafka I would suggest checking this Cloudera community article.
... View more
11-29-2021
08:22 PM
1 Kudo
Components required:
jconsole (UI required)
jmxterm (for Linux environment - CLI only)
Kafka client (java producer/consumer) exposing JMX
Kafka Brokers exposing jmx
Steps to get the available metrics (mbeans) available in a Kafka consumer (Environment with a UI available):
Add the following JVM properties to your java consumer: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=9090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
After starting, your consumer uses jconsole to connect to the host:port specified in Step 1.
After we add the <producer-hostname>:<JMXPort>, we should be able to see the following in the jconsole UI.
Here, we have to click on the mbeans tab and navigate through the available metrics, in this example, we want to see the "records-consumer-rate" which in the program has the following definition "The average number of records consumed per second". In this case, the average number of messages processed by this consumer is 0.5.
If we want to pass this to a Kafka command line, we have to get the ObjectName from jconsole.
After that, run the following command line and replace the "objectName" accordingly. Example output: ./kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://clientHost:clientJMXPort/jmxrmi --object-name 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1'
Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://clientHost:clientJMXPort/jmxrmi.
"time","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:bytes-consumed-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:bytes-consumed-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-latency-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-latency-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-size-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-size-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-throttle-time-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-throttle-time-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-consumed-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-consumed-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-lag-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-lead-min","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-per-request-avg"
1638221356007,9.076115605876556,12850.0,669.0075187969925,770.0,3.0063291139240507,18.0,18.0,0.0,0.0,1755.0,0.5042286447709198,720.0,0.0,2002.0,1.0
1638221358013,9.072183021431389,12868.0,669.2446043165468,770.0,3.005860346430811,18.0,18.0,0.0,0.0,1761.0,0.5040101678572994,721.0,0.0,2002.0,1.0
1638221360012,9.068771517339826,12886.0,669.951724137931,770.0,3.005492797181055,18.0,18.0,0.0,0.0,1767.0,0.5038206398522126,722.0,0.0,2002.0,1.0 Each comma is separated by a different metric. If you count the number of metrics available in jconsole and you want to identify the "records-consumed-rate", just count the number of lines in jconsole, and then count the number of commas in the output, in this case, the records-consumed-rate is listed in the jconsole line 12: Then taking one line from the terminal output we see that line 12 value is "0.5038206398522126": 1638221360012,9.068771517339826,12886.0,669.951724137931,770.0,3.005492797181055,18.0,18.0,0.0,0.0,1767.0,0.5038206398522126,722.0,0.0,2002.0,1.0 The above steps apply to a producer and brokers; we just have to identify the JMX port used by the service and make sure we have access to get the metrics. In the case we don't have a UI or access to the JMX ports from external hosts, jmxterm is a good alternative to list the mbeans available. See the steps to run jmxterm below:
Download jmxterm from the official site.
In the terminal (make sure the JMX port is available for your service); execute the following: java -jar jmxterm-1.0.2-uber.jar --url <kafkahost>:<kafkaJMXPort>
If the connection is successful, we will see the following: Welcome to JMX terminal. Type "help" for available commands.
$>
Here we can list the mbeans available for the service that we are connected to, for example, trimmed for a broker host: $>beans
...
...
#domain = kafka.controller:
kafka.controller:name=ActiveControllerCount,type=KafkaController
kafka.controller:name=AutoLeaderBalanceRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControlledShutdownRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControllerChangeRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControllerShutdownRateAndTimeMs,type=ControllerStats
...
...
Then if we want to get the active controller metric, we can use: [root@brokerHost ~]# kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://brokerHost:brokerJMXPort/jmxrmi --object-name 'kafka.controller:name=ActiveControllerCount,type=KafkaController'
21/11/29 21:50:26 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://brokerHost:brokerJMXPort/jmxrmi.
"time","kafka.controller:type=KafkaController,name=ActiveControllerCount:Value"
1638222626788,1
1638222628783,1
1638222630783,1
... View more
Labels:
11-24-2021
02:22 PM
Hi @sarm I think there is no metric for that, on the other hand, you can create a simple java consumer and add the following details: // consumer details here
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Date date;
System.out.println(date = new Date(record.timestamp()));
System.out.printf("Partition = %d\n",record.partition());
System.out.printf("Offset = %d\n", record.offset());
System.out.printf("Key = %s\n", record.key());
System.out.printf("Value = %s\n", record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
} This should provide the following output: Wed Nov 24 19:16:27 CLST 2021
Partition = 0
Offset = 439
Key = null
Value = S Then you can create some logic to count the number of messages between some specific timing. I hope that helps.
... View more
11-24-2021
12:11 PM
Hi @jaeseung For SMM "WARNING": The status of a replication flow is calculated based on the replication latency and the throughput. if any of the metrics are not present -> INACTIVE if latency max and latency age is smaller than a fixed grace period (60 sec), and throughput max is not zero -> ACTIVE if throughput age is smaller than a fixed grace period (60 sec), and throughput max is not zero -> WARNING otherwise -> INACTIVE
... View more
11-24-2021
12:05 PM
Hi @jaeseung The client configurations have to be passed using the cluster alias replication: for consumer configs: primary->secondary.consumer.<config> > for producer configs: primary->secondary.producer.override.<config> Please try using Under SRM configs: <source>-><target>.producer.override.max.request.size=<desired value> If that doesn't work use: <source>-><target>.producer.max.request.size=<desired value>
... View more
06-18-2020
12:01 PM
This is a step by step guide to test Kafka clients from a Windows machine that connects to an HDF/HDP environment.
We start with the review of the current Kafka broker listeners. In this case, we will cover the following:
SASL_PLAINTEXT > Kerberized environments
PLAINTEXT > Plain connections
This can be done by using the Ambari console > Kafka > configs > Kafka Broker. After that, search for listeners and make sure either one or both protocols are enabled.
PLAINTEXT security protocol
Go to your Windows machine and download the apache Kafka software.
It is recommended to download the same version that it's running in your HDP/HDF cluster. Select the "Scala 2.12" link to avoid exceptions while running the Kafka clients.
Extract the content of this folder in a preferred location in the Windows host.
While connecting to Kafka through PLAINTEXT listener, Kafka does not have a way to identify you as a user. Hence, add Kafka ACLs and give permissions to ANONYMOUS users. To achieve this run the following command as a Kafka user in one of the Kafka brokers: /usr/hd<p/f>/current/kafka-broker/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<zkHost>:<zkPort> --add --allow-principal User:ANONYMOUS --operation All --topic=* --group=* --cluster The above command will give all permissions to the anonymous user in Kafka, change the topic and group to specific ones if required.
In a Kafka host, create a new test topic or use an existing one. To create a new topic, run the following command with the Kafka user: kafka-topics --create --topic <topicName> --partitions <N of partitions> --replication-factor <N of replicas> --zookeeper <zkHost>:<zkPort>
When adding anonymous user permissions, go to our Windows Machine and navigate to the following Kafka folder: Note: This step assumes that we already have connectivity to the brokers and the firewall and DNS (if any) are configured properly. C:\<preferred location>\kafka_<version>\bin\windows
In this folder, there is a list of .bat files, similar to the ones in Linux hosts with .sh extension. In order to run .bat producer, use the following command: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName>
To run a consumer, please run the following command: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning
Run the clients using Kerberos (SASL_PLAINTEXT)
To run the clients using Kerberos (SASL_PLAINTEXT), first ensure that Kerberos is configured properly in the environment. Once you get valid tickets, do the following to connect with the Kafka clients:
If using Kafka Ranger plugin, go to Ranger Admin UI -> Kafka and add a new policy for the user that is used to connect from Windows host pointing to the topic/s that needs access.
After the Ranger policies are configured, then go to the Windows Host and configure the Kerberos details for the Kafka client connection. To achieve this, do the following:
Create a file with extension .conf and add the following content: KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
useTicketCache=false
serviceName="kafka";
keyTab="/path_to_file/file.keytab"
principal="principal_name@REALM.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/path_to_file/file.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="principal_name@REALM.COM";
};
client: is used to connecting to the Zookeeper and KafkaClient is to connect to the Kafka Brokers.
principal: is the user that will be used to connect from Windows to the Kafka Brokers (the same user that we add grants in Ranger UI)
keyTab: is the keytab file that contains the principal specified in "principal".
With that file created, open a Windows Command Prompt and execute the following command before running any command line: set KAFKA_OPTS="-Djava.security.auth.login.config=/path_to_conf_file/file.conf" That command will pass the keytab/principal to the Kafka client.
In the same command prompt, run a Kafka Producer/Consumer using the following commands for Kafka versions <= 1.0: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName> --security-protocol SASL_PLAINTEXT For the consumer, use the following command line: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning --security-protocol SASL_PLAINTEXT For Kafka versions > 1.0, use the following producer/consumer command line: C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-producer.bat --broker-list <brokerHost>:<brokerPort> --topic <topicName> --producer-property security.protocol=SASL_PLAINTEXT For Kafka consumer > 1.0, use the following command line C:\<preferred location>\kafka_<version>\bin\windows\bin\kafka-console-consumer.bat --bootstrap-server <brokerHost>:<brokerPort> --topic <topicName> --from-beginning
--consumer-property security.protocol=SASL_PLAINTEXT
... View more
Labels: