Member since
06-27-2019
147
Posts
9
Kudos Received
11
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2393 | 01-31-2022 08:42 AM | |
603 | 11-24-2021 12:11 PM | |
1025 | 11-24-2021 12:05 PM | |
1940 | 10-08-2019 10:00 AM | |
2460 | 10-07-2019 12:08 PM |
02-01-2022
08:23 PM
1 Kudo
This article provides the steps and fields required to configure Streams Replication Manager using external accounts between two kerberized clusters.
This article assumes that Kerberos is configured properly between the clusters and we can produce and consume data correctly.
Environment details:
Cluster A: co-located cluster (SRM and Kafka running in this cluster) CDP 7.1.7
Cluster B: External cluster which is another CDP 7.1.7
Both clusters are using SASL_PLAINTEXT security.protocol for their clients
Clusters are configured with cross-domain realm (MIT kerberos), more details on cross-domain realm configuration here
Steps to configure external accounts (feature available from CDP 7.1.7)
Go to cluster A > Cloudera Manager > Administration > External accounts > Kafka Credentials tab
Configure the following fields:
Name
Bootstrap servers
Security protocol
JAAS Secret [1-3]
JAAS Template
Kerberos Service Name
SASL Mechanism
Example:
Name
c289
Bootstrap servers
c289-node2.clusterB.com:9092,c289-node3.clusterB.com:9092
Security protocol
SASL_PLAINTEXT
JAAS Secret 1
kafka/c189-node4.clusterA.com@CLUSTERA.COM
JAAS Secret 2
/opt/cloudera/kafka.keytab
JAAS Template
com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="##JAAS_SECRET_1##" principal="##JAAS_SECRET_2##";
Kerberos Service Name
kafka
SASL Mechanism
GSSAPI
In the JAAS secret fields, we can also use "username", "password" and replace the Krb5LoginModule with PlainLoginModule
Also, make sure that if we are using more than one srm driver, copy the keytab to each srm-driver host with the correct permissions (SRM PID owner), for example: -rw------- 1 streamsrepmgr streamsrepmgr 216 Jan 28 14:32 kafka.keytab
We can also configure an external account for the co-located cluster, but this is not required
* values means not required
Finally, go to cluster A > Cloudera Manager > Streams Replication Manager > Configuration and add the external account name c289 in the External Kafka Accounts field:
Configure the replication details under cluster A > Cloudera Manager > Streams Replication Manager > Configuration > Streams Replication Manager's Replication Configs
Start the SRM cluster and validate that the properties are correct in the srm-driver.log files.
Additional details about SRM configuration Configuring Streams Replication Manager.
... View more
Labels:
01-31-2022
08:42 AM
1 Kudo
@inyongkim Try CM > Kafka > Configuration > (use filter) Kafka Connect Advanced Configuration Snippet (Safety Valve) for connect-distributed.properties Then add: connector.client.config.override.policy=All Check the kafka connect log files for this line and see if the property changed properly: "org.apache.kafka.connect.runtime.distributed.DistributedConfig: DistributedConfig values" Let me know if that helped.
... View more
01-24-2022
10:40 AM
1 Kudo
@mike_bronson7 In kafka 0.1x we will see this statement (Consumer group ‘deeg_data’ is rebalancing) when the group is rebalancing but in newer versions, we will see something like: GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
GroupName topicName 0 0 0 0 - - - Which means no active consumers in this group (or rebalancing). Regarding rebalancing of a group this can be triggered for multiple reasons, but mostly because of: 1. A new consumer is added/joined to the group 2. A consumer was removed from the group (because of client shutdown, timeout, network glitches) 3. Timeout issues between brokers/client To get more details about consumers rebalancing (if no errors from the broker side) checking the application log files might provide some details about the underlying issue.
... View more
01-24-2022
10:06 AM
1 Kudo
Hi @mike_bronson7 1. Do you see anything interesting from the broker 1010 log file? this is to try to understand why 1010 is not able to register in zookeeper. 2. Try forcing a new controller by using: [zk: localhost:2181(CONNECTED) 11] rmr /controller 3. Are these broker ids unique? if you describe other topics, do you see the same brokers ids and same behavior (leader none for some partitions)? 4. Finally, if this is dev env: 4.1 You can enable unclean leader election = true and restart the brokers Or: 4.2 (if this happening just for this topic) remove __consumer_offsets topic (just from zookeeper) and restart kafka
... View more
01-17-2022
05:17 AM
Hi @danurag It's recommended to set up retention at the topic level (unless you want all your topics to use 24 hours by default), example: kafka-configs --bootstrap-server <brokerHost:brokerPort> --alter --entity-type topics --entity-name <topicName> --add-config retention.ms=3600000 The most common configuration for how long Kafka will retain messages is by time. The default is specified in the configuration file using the log.retention.hours parameter, and it is set to 168 hours, or one week. However, there are two other parameters allowed, log.retention.minutes and log.retention.ms. All three of these control the same goal (the amount of time after which messages may be deleted) but the recommended parameter to use is log.retention.ms, as the smaller unit size will take precedence if more than one is specified. This will make sure that the value set for log.retention.ms is always the one used. If more than one is specified, the smaller unit size will take precedence.
... View more
11-30-2021
09:48 AM
@sarm After digging a little bit more, there is a metric exposed by producers called kafka.producer:type=producer-metrics,client-id=producer-1 > objectName record-send-total: This will show the total number of records sent by this producer. To get more details about the available metrics in Kafka I would suggest checking this Cloudera community article.
... View more
11-29-2021
08:22 PM
1 Kudo
Components required:
jconsole (UI required)
jmxterm (for Linux environment - CLI only)
Kafka client (java producer/consumer) exposing JMX
Kafka Brokers exposing jmx
Steps to get the available metrics (mbeans) available in a Kafka consumer (Environment with a UI available):
Add the following JVM properties to your java consumer: -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.port=9090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
After starting, your consumer uses jconsole to connect to the host:port specified in Step 1.
After we add the <producer-hostname>:<JMXPort>, we should be able to see the following in the jconsole UI.
Here, we have to click on the mbeans tab and navigate through the available metrics, in this example, we want to see the "records-consumer-rate" which in the program has the following definition "The average number of records consumed per second". In this case, the average number of messages processed by this consumer is 0.5.
If we want to pass this to a Kafka command line, we have to get the ObjectName from jconsole.
After that, run the following command line and replace the "objectName" accordingly. Example output: ./kafka-run-class.sh kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://clientHost:clientJMXPort/jmxrmi --object-name 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1'
Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://clientHost:clientJMXPort/jmxrmi.
"time","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:bytes-consumed-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:bytes-consumed-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-latency-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-latency-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-size-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-size-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-throttle-time-avg","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-throttle-time-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:fetch-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-consumed-rate","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-consumed-total","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-lag-max","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-lead-min","kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-test1-1:records-per-request-avg"
1638221356007,9.076115605876556,12850.0,669.0075187969925,770.0,3.0063291139240507,18.0,18.0,0.0,0.0,1755.0,0.5042286447709198,720.0,0.0,2002.0,1.0
1638221358013,9.072183021431389,12868.0,669.2446043165468,770.0,3.005860346430811,18.0,18.0,0.0,0.0,1761.0,0.5040101678572994,721.0,0.0,2002.0,1.0
1638221360012,9.068771517339826,12886.0,669.951724137931,770.0,3.005492797181055,18.0,18.0,0.0,0.0,1767.0,0.5038206398522126,722.0,0.0,2002.0,1.0 Each comma is separated by a different metric. If you count the number of metrics available in jconsole and you want to identify the "records-consumed-rate", just count the number of lines in jconsole, and then count the number of commas in the output, in this case, the records-consumed-rate is listed in the jconsole line 12: Then taking one line from the terminal output we see that line 12 value is "0.5038206398522126": 1638221360012,9.068771517339826,12886.0,669.951724137931,770.0,3.005492797181055,18.0,18.0,0.0,0.0,1767.0,0.5038206398522126,722.0,0.0,2002.0,1.0 The above steps apply to a producer and brokers; we just have to identify the JMX port used by the service and make sure we have access to get the metrics. In the case we don't have a UI or access to the JMX ports from external hosts, jmxterm is a good alternative to list the mbeans available. See the steps to run jmxterm below:
Download jmxterm from the official site.
In the terminal (make sure the JMX port is available for your service); execute the following: java -jar jmxterm-1.0.2-uber.jar --url <kafkahost>:<kafkaJMXPort>
If the connection is successful, we will see the following: Welcome to JMX terminal. Type "help" for available commands.
$>
Here we can list the mbeans available for the service that we are connected to, for example, trimmed for a broker host: $>beans
...
...
#domain = kafka.controller:
kafka.controller:name=ActiveControllerCount,type=KafkaController
kafka.controller:name=AutoLeaderBalanceRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControlledShutdownRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControllerChangeRateAndTimeMs,type=ControllerStats
kafka.controller:name=ControllerShutdownRateAndTimeMs,type=ControllerStats
...
...
Then if we want to get the active controller metric, we can use: [root@brokerHost ~]# kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://brokerHost:brokerJMXPort/jmxrmi --object-name 'kafka.controller:name=ActiveControllerCount,type=KafkaController'
21/11/29 21:50:26 INFO utils.Log4jControllerRegistration$: Registered kafka:type=kafka.Log4jController MBean
Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://brokerHost:brokerJMXPort/jmxrmi.
"time","kafka.controller:type=KafkaController,name=ActiveControllerCount:Value"
1638222626788,1
1638222628783,1
1638222630783,1
... View more
Labels:
11-29-2021
05:20 AM
Hi @hbinduni From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly-once delivery. In particular producer, retries will no longer introduce duplicates. It's important to mention that If the producer is already configured with acks=all, there will be no difference in performance. Additionally, the Order of messages produced to each partition will be guaranteed, through all failure scenarios, even if max.in.flight.requests.per.connection is set to more than 1 (5 is the default, and also the highest value supported by the idempotent producer). More details in the document below: https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
... View more
11-29-2021
05:07 AM
Hi @dansteu The kerberos service name property has to be the service name specified for the kafka service which is usually "kafka".
... View more
11-24-2021
02:22 PM
Hi @sarm I think there is no metric for that, on the other hand, you can create a simple java consumer and add the following details: // consumer details here
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Date date;
System.out.println(date = new Date(record.timestamp()));
System.out.printf("Partition = %d\n",record.partition());
System.out.printf("Offset = %d\n", record.offset());
System.out.printf("Key = %s\n", record.key());
System.out.printf("Value = %s\n", record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
} This should provide the following output: Wed Nov 24 19:16:27 CLST 2021
Partition = 0
Offset = 439
Key = null
Value = S Then you can create some logic to count the number of messages between some specific timing. I hope that helps.
... View more