Member since
09-23-2015
81
Posts
108
Kudos Received
41
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5702 | 08-17-2017 05:00 PM | |
2262 | 06-03-2017 09:07 PM | |
2783 | 03-29-2017 06:02 PM | |
5276 | 03-07-2017 06:16 PM | |
1959 | 02-26-2017 06:30 PM |
03-14-2017
09:38 PM
@Qingyang Kong Kafka rest proxy uses old client apis, which are not supported in secure cluster in Apache. However HDP kafka supports security in old consumer apis. To enable this you need to build kafka rest proxy code with HDP kafka_core dependency and pass a KafkaClient jaas config to your kafka rest server JVM.
... View more
03-07-2017
06:16 PM
2 Kudos
@Lester Martin To allow user to deploy a storm topology in secure mode 1. create user level storm.yaml under /home/username/.storm/storm.yaml . This storm.yaml can only contain few required settings or you can just copy the entire storm.yaml from /etc/storm/conf/storm.yaml required settings if you are adding are 1. nimbus.seeds 2. storm.thrift.transport: "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin" 3. java.security.auth.login.config: "/etc/storm/conf/client_jaas.conf" 4. Make sure /etc/storm/conf/client_jaas.conf contents looks like this StormClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=false
useTicketCache=true
serviceName="nimbus";
}; 5. Run kinit with user's keytab before running storm jar command By default Storm uses SimpleACLAuthorizer as authorizer to give permissions to user. This will allow all user's with valid keytab to submit topologies.
... View more
02-26-2017
06:30 PM
1 Kudo
@Ryan Spring As part of Kafka Security work, we shipped Kerberos security in old consumer api's for Kafka. This is not part of Apache Kafka and hence Apache Storm's storm-kafka (old implementation) which uses old consumer API won't be able to talk to secure Apache Kafka. This is only possible with HDP Kafka as we support old Kafka client API connecting Kerberos Kafka. So if you make dependency from the hortonworks artifacts you'll get this functionality.
... View more
02-26-2017
06:20 PM
3 Kudos
@Param NC Kafka security is developed by Hortonworks. Before it shipped into Apache Kafka we shipped it in HDP. At that time we called the SASL protocol as PLAINTEXTSASL which later changed SASL_PLAINTEXT. These protocols are synonymous so you can use them interchangeably. Older version of AMBARI still calls it as PLAINTEXTSASL which will be changing to SASL_PLAINTEXT in upcoming version. In your case, producer & consumer are working because PLAINTEXTSASL or SASL_PLAINTEXT means the same thing and can be used interchangeably. For consistency purpose , we recommend you to use SASL_PLAINTEXT everywhere.
... View more
02-26-2017
06:16 PM
1 Kudo
@Sandeep R here are the best practices to follow when provisioning Kafka. https://community.hortonworks.com/articles/80813/kafka-best-practices-1.html
... View more
02-01-2017
05:52 AM
1 Kudo
@Edgar Orendain Definitely do not recommend dot as part of topic names this will cause issues with kafka metrics. Using _ is better option.
... View more
01-31-2017
11:29 PM
19 Kudos
Kafka Broker: Java Version We recommend latest java 1.8 with G1 collector ( which is default in new version). If you are using Java 1.7 and G1 collector make sure you are on u51 or higher. A recommended setting for JVM looks like following -Xmx8g -Xms8g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 OS Settings Once the JVM size is determined leave rest of the RAM to OS for page caching. You need sufficient memory for page cache to buffer for the active writers and readers. In general disk throughput is a performance bottleneck and more disks are better. Depending on how one configures the flush behavior , a faster disk will be beneficial if they log.flush.interval.messages set to flush for every 100k messages or so. File Descriptors limits: Kafka needs open file descriptors for files and network connections . We recommend at least 128000 allowed for file descriptors. Max socket buffer size , can be increased to enable high-performance data transfer. More details are here http://www.psc.edu/index.php/networking/641-tcp-tune Disks And File System We recommend using multiple drives to get good throughput. Do not share the same drives with any other application or for kafka application logs. Multiple drives can be configured using log.dirs in server.properties. Kafka assigns partitions in round-robin fashion to log.dirs directories. Note: If the data is not well balanced among partitions this can lead to load imbalance among the disks. Also kafka currently doesn’t good job of distributing data to less occupied disk in terms of space. So users can easily run out of disk space on 1 disk and other drives have free disk space and which itself can bring the Kafka down. We highly recommend users to create alerts on disk usage for kafka drives to avoid any interruptions to running Kafka service. RAID can potentially do better load balancing among the disks. But RAID can cause performance bottleneck due to slower writes and reduces available disk space. Although RAID can tolerate disk failures but rebuilding RAID array is I/O intensive that effectively disables the server. So RAID doesn’t provide much real availability improvement. Log Flush Management Kafka always write data to files immediately and allows users to configure log.flush.interval.messages to enforce flush for every configure number of messages. One needs to set log.flush.scheduler.interval.ms to a reasonable value for the above config to take into affect. Also Kafka flushes the log file to disk whenever a log file reaches log.segment.bytes or log.roll.hours. Note: durability in kafka does not require syncing data to disk, as a failed broker can recover the topic-partitions from its replicas. But pay attention to replica.lag.time.max.ms , defaults to 10 secs If a follower didn’t issue any fetch request or hasn’t consumed from leaders log-end offset for at least this time , leader will remove the follower from ISR. Due to the nature of this there is slight chance of message loss if you do not explicitly set log.flush.interval.messages . If the leader broker goes down and if the follower didn’t caught up to the leader it can still be under ISR for those 10 secs and the messages during this leader transition to follower can be lost. We recommend using the default flush settings which disables the explicit fsync entirely. This means relying on background flush done by OS and Kafka’s own background flush. This provides great throughput and latency and full recovery guarantees provided by replication are stronger than sync to the local disk. The drawback of enforcing the flushing is that its less efficient in its disk usage pattern and it can introduce latency as fsync in most Linux filesystems blocks writes to the file system compared to background flushing does much more granular page-level locking. FileSystem Selection Kafka uses regular files on disk, and such it has no hard dependency on a specific file system. We recommend EXT4 or XFS. Recent improvements to the XFS file system have shown it to have the better performance characteristics for Kafka’s workload without any compromise in stability. Note: Do not use mounted shared drives and any network file systems. In our experience Kafka is known to have index failures on such file systems. Kafka uses MemoryMapped files to store the offset index which has known issues on a network file systems. Zookeeper Do not co-locate zookeeper on the same boxes as Kafka We recommend zookeeper to isolate and only use for Kafka not any other systems should be depend on this zookeeper cluster Make sure you allocate sufficient JVM , good starting point is 4Gb Monitor: Use JMX metrics to monitor the zookeeper instance Choosing Topic/Partitions Topic/Partition is unit of parallelism in Kafka Partitions in Kafka drives the parallelism of consumers Higher the number of partitions more parallel consumers can be added , thus resulting in a higher throughput. Based on throughput requirements one can pick a rough number of partitions. Lets call the throughput from producer to a single partition is P Throughput from a single partition to a consumer is C Target throughput is T Required partitions = Max (T/P, T/C) More partitions can increase the latency The end-to-end latency in Kafka is defined by the time from when a message is published by the producer to when the message is read by the consumer. Kafka only exposes a message to a consumer after it has been committed, i.e., when the message is replicated to all the in-sync replicas. Replication 1000 partitions from one broker to another can take up 20ms. This can be too high for some real-time applications In new Kafka producer , messages will be accumulated on the producer side. It allows users to set upper bound on the amount of memory used for buffering incoming messages. Internally, producers buffers the message per partition. After enough data has been accumulated or enough time has passed, the accumulated messages will be removed and sent to the broker If we have more partitions , messages will be accumulated for more partitions on producer side. Similarly on the consumer side , it fetches batch of messages per partitions . The more partitions that consumer is subscribing to, the more memory it needs. Factors Affecting Performance Main memory. More specifically File system buffer cache. Multiple dedicated disks. Partitions per topic. More partitions allows increased parallelism. Ethernet bandwidth. Kafka Broker configs Set kafka broker JVM by exporting KAFKA_HEAP_OPTS Log.retention.hours . This setting controls when the old messages in a topic will be deleted. Take into consideration of your disk space and how long you would the messages to be available. An active consumer can read fast and deliver the message to destination. Message.max.bytes . Maximum size of the message the server can receive. Make sure you set replica.fetch.max.bytes to be equal or greater than message.max.bytes Delete.topic.enable - This will allow users to delete a topic from Kafka. This is set to false by default. Delete topic functionality will only work from Kafka 0.9 onwards. unclean.leader.election - This config set to true by default. By turning this on, User is making choice of availability over durability. For example, If a broker which is hosting leader of a topic partition and for some reason it goes down. Then a replica of that partition gets elected as leader when the original broker comes backup it becomes leader without it being a ISR. This means there is chance of data loss. If durability is more important, we recommend you to set this to false. Kafka Producer: org.apache.kafka.producer.KafkaProduer , Upgrade to the New producer . Critical Configs Batch.size (size based batching) Linger.ms ( time based batching) Compression.type Max.in.flight.requests.per.connection (affects ordering) Acks ( affects durability) Performance Notes 1. A producer thread going to the same partition is faster than a producer thread that sprays to multiple partitions. 2. The new Producer API provides a flush() call that client can optionally choose to invoke. If using it, the key number of bytes between two flush() calls is key factor for good performance. Microbenchmarking shows that around 4MB we get good perf (we used event of 1KB size). Thumb rule to set batch size when using flush() batch.size = total bytes between flush() / partition count. 3. If producer throughput maxes out and there is spare CPU and network capacity on box, add more producer processes. 4. Performance is sensitive to event size. In our measurements, 1KB events streamed faster than 100byte events. Larger events are likely to give better throughput. 5. No simple rule of thumb for linger.ms. Needs to be tried out on specific use cases. For small events (100 bytes or less), it did not not seem to have much impact in microbenchmarks. Lifecycle of a request from Producer to Broker Polls batch from the batch queue , 1 batch per partition Groups batches based on the leader broker Sends the grouped batches to the brokers Pipelining if max.in.flight.requests.per.connection > 1 A batch is ready when one of the following is true: Batch.size is reached Linger.ms is reached Another batch to the same broker is ready flush() or close() is called Big batching means Better compression ratio , higher throughput Higher Latency Compression.type Compression is major part of producer’s work Speed of different compression types differs a lot Compression is in user thread, so adding more threads helps with the throughput if compression is slow ACKs Defines durability level for producer. Acks Throughput Latency Durability 0 High Low No Gurantee 1 Medium Medium Leader -1 Low High ISR Max.in.flight.requests.per.connection Max.in.flight.requests.per.connection > 1 means pipelining. Gives better throughput May cause out of order delivery when retry occurs Excessive pipelining , drops throughput Kafka Consumer: Performance Notes On the consumer side it is generally easy to get good performance without need for tweaking. Simple rule of thumb for good consumer performance is to keep Number of consumer threads = Partition count Microbenchmarking showed that Consumer performance was not as sensitive to event size or batch size as compared to Producer. Both 1kb and 100byte events showed similar throughput. More details on Kafka microbenchmarking can be found here. https://drive.google.com/drive/u/1/folders/0ByKuMXNl6yEPfjVTRXIwaU45Qmh1Y3ktaExQa3YwZlR6SlZQTVVMckY2RGptb09QRS0zbVE
... View more
Labels:
01-27-2017
07:29 PM
@Ganesh Raju It looks like you are having issues with connecting to maven repository (https://repository.apache.org/content/repositories/snapshots): peer not authenticated -> [Help 1] Also recommend you to add hortonworks maven repo as well <repository>
<id>hortonworks.repo</id>
<url>http://nexus-private.hortonworks.com/nexus/content/groups/public/</url>
<name>Hortonworks Repo</name>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
... View more
01-26-2017
05:14 AM
14 Kudos
Kafka's mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. This tool uses Kafka consumer to consume messages from the source cluster, and
re-publishes those messages to the target cluster using an
embedded Kafka producer.
Running Mirror Maker
To set up a mirror, run kafka.tools.MirrorMaker . The following table
lists configuration options.
At a minimum, MirrorMaker requires one or more consumer configuration files, a
producer configuration file, and either a whitelist or a blacklist of topics. In the
consumer and producer configuration files, point the consumer to the the source cluster, and point the producer to the destination (mirror) cluster, respectively. bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"
Parameter Description Examples --consumer.config Specifies a file that contains configuration settings
for the source cluster. For more information about this file, see
the "Consumer Configuration File" subsection. --consumer.config
hdp1-consumer.properties --producer.config Specifies the file that contains configuration settings
for the target cluster. For more information about this file, see
the "Producer Configuration File" subsection. --producer.config
hdp1-producer.properties --whitelist --blacklist (Optional) For a partial mirror, you can specify
exactly one comma-separated list of topics to include
(--whitelist) or exclude (--blacklist). In general, these options accept Java regex patterns. For caveats, see the note after this
table. --whitelist my-topic --num.streams Specifies the number of consumer stream threads to
create. --num.streams 4 --num.producers Specifies the number of producer instances. Setting
this to a value greater than one establishes a producer pool that
can increase throughput. --num.producers 2 --queue.size Queue size: number of messages that are buffered, in
terms of number of messages between the consumer and producer.
Default = 10000. --queue.size 2000 --help List MirrorMaker command-line options. A comma (',') is interpreted as the regex-choice symbol ('|')
for convenience. If you specify - -white-list=".*" , MirrorMaker tries to
fetch data from the system-level topic
__consumer-offsets and produce that data to the
target cluster. Make sure you added exclude.internal.topics=true in consumer properties
Workaround: Specify topic names, or to replicate all topics, specify
--blacklist="__consumer-offsets" .
Example Consumer config file
Consumer config bootstrap.servers should point to source cluster
Here is a sample consumer configuration file: bootstrap.servers=kafka-source-server1:6667,kafka-source-server2:6667,kafka-source-server3-6667
groupid=dp-MirrorMaker-group
exclude.internal.topics=true
mirror.topics.whitelist=app_log
client.id=mirror_maker_consumer
Example Producer Configuration File Producer config bootstrap.servers should point to target cluster Here is a sample producer configuration file: bootstrap.servers=kafka-target-server1:6667,kafka-target-server2:6667,kafka-target-server3-6667
acks=1
batch.size=100
client.id=mirror_maker_producer Best Practices Create topics in target cluster If you have consumers that are going to consume data from target cluster and your parallelism requirement for a consumer is same as your source cluster, Its important that you create a same topic in target cluster with same no.of partitions. Example: If you have a topic name called "click-logs" with 6 partitions in source cluster , make sure you have same no.of partitions in the target cluster. If you are using a target cluster as more of a backup, not active this might not need to be same. If users didn't create a topic in target cluster, producer in mirrormaker will attempt to create a topic and target cluster broker will create a topic with configured num.partitions and num.replicas, this may not be the partitions and replication that the user wants. Where to run MirrorMaker We recommend to run MirrorMaker on target cluster. No data loss Make sure you've following configs in consumer config and producer config for No data loss. For Consumer, set auto.commit.enabled=false in consumer.properties For Producer
max.in.flight.requests.per.connection=1 retries=Int.MaxValue acks=-1 block.on.buffer.full=true For MirrorMaker, set --abortOnSendFail The following actions will be taken by MirrorMaker
Mirror maker will only send one request to a broker at any given point. If any exception is caught in mirror maker thread, mirror maker will try to commit the acked offsets then exit immediately. For
RetriableException in producer, producer will retry indefinitely. If
retry did not work, eventually the entire mirror maker will block on
producer buffer full. For None-retriable exception, if
--abort.on.send.fail is specified, stops the mirror maker. Otherwise
producer callback will record the message that was not successfully sent
but let the mirror maker move on. In this case, that message will be lost in target cluster. As the last point stated if there is any error occurred your mirror maker process will be killed. So users are recommend to use a watchdog process like supervisord to restart the killed mirrormaker process. Mirror Maker Sizing
num.streams
num.streams option in mirror-maker allows you to create specified no.of consumers. Mirror-Maker deploys the specified no.of threads in num.streams
Each thread instantiates and uses one consumer. That is a 1:1 mapping between mirror-maker threads and consumers. Each thread shares the same producer. That is a N:1 mapping between threads and producers. Keep in mind, a topic-partition is the unit of parallelism in Kafka. If you have a topic called "click-logs" with 6 partitions then max no.of consumers you can run is 6. If you run more than 6 , additional consumers will be idle and if you run less than 6 , all 6 partitions will be distributed among available consumers. More partitions leads to more throughput. So before going further into num.streams, we recommend you to run multiple instances of mirror-maker across the machines with same "groupId" in consumer.config. This will help if a mirror-maker process goes down for any reason and the topic-partitions owned by killed mirror-maker will be re-balanced among other running mirror-maker processes. So this will give high-availability of mirror-maker. Coming back to choosing num.streams. Lets say you've 3 topics with 4 partitions each and you are running 3 mirror maker processes. You can choose 4 as your num.streams this way each instance of mirror-maker starts 4 consumers reading 4 topic-partitions each and writing to target cluster. If you just run 1 mirror maker by choosing 4 as num.streams then 4 consumers will be reading from all 12 topic-partitions . This means lot more traffic into a single machine and it will be slower. Also if the mirror-maker process is stopped there are no other mirror-maker processes to take over. For maximum performance, total number of num.streams should match all of the topic partitions that the mirror maker trying to copy to target cluster. co-locate One co-locate more than one mirror maker in a single machine. Always run more than one mirror-maker processes. Make sure you use the same groupId in consumer config. Socket buffer sizes In general, you should set a high value for the socket buffer size on the mirror-maker's consumer configuration (socket.buffersize) and the source cluster's broker configuration (socket.send.buffer). Also, the mirror-maker consumer's fetch size (fetch.size)
should be higher than the consumer's socket buffer size. Note that the
socket buffer size configurations are a hint to the underlying
platform's networking code. Check the health of Mirror-Maker The consumer offset checker tool is useful to gauge how well your mirror is keeping up with the source cluster. Note that the --zkconnect argument
should point to the source cluster's ZooKeeper.
Also, if the topic is not specified, then the tool prints information
for all topics under the given consumer group. For example: bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect dc1-zookeeper:2181 --topic test-topic minimal lag here indicates healthy Mirror-Maker Running in Secure Clusters SSL We recommend to use SSL in mirror-maker from kafka 0.10.x version onwards. You can learn more details about setting SSL for brokers, producers and consumers here http://kafka.apache.org/documentation.html#security_ssl . Users can share the same key & trust stores for both producer & consumer in Mirror-Maker. Make sure you give read & write permissions for the certificate/hostname if you are using a authorizer with SSL. KERBEROS In kafka 0.9.x and 0.10.0.1, 0.10.1.0 , consumers and producers in mirror-maker cannot run with different principals/keytabs as they both run inside a single JVM. So the users need to use single principal to configure both consumer and producer. This means same principal needs to have at least read & describe access on the source cluster topics and write & describe access to topics on target cluster. In future version of kafka users can configure different principal/keytabs for consumer & producer in mirror-maker.
... View more
Labels:
01-18-2017
05:43 AM
3 Kudos
@Christian Guegi You can do the rolling upgrade without impacting downtime. Before starting the upgrade add these two properties to kafka config. You can add the below configs via ambari -> kafka -> Advanced custom properties. 1. inter.broker.protocol.version = 0.9.0 ( HDP 2.4 kafka version) 2. log.message.format.version = 0.9.0 once these properties are set you can upgrade one broker at a time. Once the broker upgraded and made it into ISR move onto next one to minimize any downtime clients might see. Once the upgrade finished you can remove inter.broker.protocol.version . Once you upgrade the clients to kafka_clients 0.10.0.1 version you can remove the log.message.format.version as well. More details are on this page https://kafka.apache.org/0100/documentation.html#upgrade
... View more