Member since
10-02-2015
76
Posts
80
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1979 | 11-15-2016 03:28 PM | |
3396 | 11-15-2016 03:15 PM | |
2113 | 07-25-2016 08:03 PM | |
1741 | 05-11-2016 04:10 PM | |
3611 | 02-02-2016 08:09 PM |
09-08-2016
07:24 PM
8 Kudos
HBase supports several different compression algorithms which can be enabled on a ColumnFamily. Data block encoding attempts to limit duplication of information in keys, taking advantage of some of the fundamental designs and patterns of HBase, such as sorted row keys and the schema of a given table. Compressors reduce the size of large, opaque byte arrays in cells, and can significantly reduce the storage space needed to store uncompressed data.
Compressors and data block encoding can be used together on the same ColumnFamily.
Changes Take Effect Upon Compaction. If you change compression or encoding for a ColumnFamily, the changes take effect during compaction.
Some codecs take advantage of capabilities built into Java, such as GZip compression. Others rely on native libraries. Native libraries may be available as part of Hadoop, such as LZ4. In this case, HBase only needs access to the appropriate shared library. Other codecs, such as Google Snappy, need to be installed first. Some codecs are licensed in ways that conflict with HBase's license and cannot be shipped as part of HBase.
This articles discusses common codecs that are used and tested with HBase. No matter what codec you use, be sure to test that it is installed correctly and is available on all nodes in your cluster.
Block Compressors
none
Snappy
LZO
LZ4
GZ
LZO
Unfortunately, HBase cannot ship with LZO because of the licensing issues; HBase is Apache-licensed, LZO is GPL. Therefore LZO install is to be done post-HBase install. See the
Using LZO Compression wiki page for how to make LZO work with HBase.
A common problem users run into when using LZO is that while initial setup of the cluster runs smooth, a month goes by and some sysadmin goes to add a machine to the cluster only they'll have forgotten to do the LZO fixup on the new machine. In versions since HBase 0.90.0, we should fail in a way that makes it plain what the problem is, but maybe not.
GZIP
GZIP will generally compress better than LZO though slower. For some setups, better compression may be preferred. Java will use java's GZIP unless the native Hadoop libs are available on the CLASSPATH; in this case it will use native compressors instead (If the native libs are NOT present, you will see lots of
Got brand-new compressor reports in your logs
SNAPPY
If snappy is installed, HBase can make use of it (courtesy of
hadoop-snappy [32] ).
Build and install snappy on all nodes of your cluster (see below)
Use CompressionTest to verify snappy support is enabled and the libs can be loaded ON ALL NODES of your cluster:
$ hbase org.apache.hadoop.hbase.util.CompressionTest hdfs://host/path/to/hbase snappy
Create a column family with snappy compression and verify it in the hbase shell:
$ hbase> create 't1', { NAME => 'cf1', COMPRESSION => 'SNAPPY' }
hbase> describe 't1'
In the output of the "describe" command, you need to ensure it lists "COMPRESSION => 'SNAPPY'"
Installation
You will find the snappy library file under the .libs directory from your Snappy build (For example /home/hbase/snappy-1.0.5/.libs/). The file is called libsnappy.so.1.x.x where 1.x.x is the version of the snappy code you are building. You can either copy this file into your hbase directory under libsnappy.so name, or simply create a symbolic link to it. The second file you need is the hadoop native library. You will find this file in your hadoop installation directory under lib/native/Linux-amd64-64/ or lib/native/Linux-i386-32/. The file you are looking for is libhadoop.so.1.x.x. Again, you can simply copy this file or link to it, under the name libhadoop.so. At the end of the installation, you should have both libsnappy.so and libhadoop.so links or files present into lib/native/Linux-amd64-64 or into lib/native/Linux-i386-32 To point hbase at snappy support, in hbase-env.sh set export HBASE_LIBRARY_PATH=/pathtoyourhadoop/lib/native/Linux-amd64-64<br> In /pathtoyourhadoop/lib/native/Linux-amd64-64 you should have something like: libsnappy.a
libsnappy.so
libsnappy.so.1
libsnappy.so.1.1.2
Data Block Encoding Types Prefix - Often, keys are very similar. Specifically, keys often share a common prefix and only differ near the end. For instance, one key might be RowKey:Family:Qualifier0 and the next key might be RowKey:Family:Qualifier1 . In Prefix encoding, an extra column is added which holds the length of the prefix shared between the current key and the previous key. Assuming the first key here is totally different from the key before, its prefix length is 0. The second key's prefix length is 23 , since they have the first 23 characters in common.
Obviously if the keys tend to have nothing in common, Prefix will not provide much benefit. Diff - Diff encoding expands upon Prefix encoding. Instead of considering the key sequentially as a monolithic series of bytes, each key field is split so that each part of the key can be compressed more efficiently. Two new fields are added: timestamp and type. If the ColumnFamily is the same as the previous row, it is omitted from the current row. If the key length, value length or type are the same as the previous row, the field is omitted. In addition, for increased compression, the timestamp is stored as a Diff from the previous row's timestamp, rather than being stored in full. Given the two row keys in the Prefix example, and given an exact match on timestamp and the same type, neither the value length, or type needs to be stored for the second row, and the timestamp value for the second row is just 0, rather than a full timestamp.
Diff encoding is disabled by default because writing and scanning are slower but more data is cached. Fast Diff - Fast Diff works similar to Diff, but uses a faster implementation. It also adds another field which stores a single bit to track whether the data itself is the same as the previous row. If it is, the data is not stored again. Fast Diff is the recommended codec to use if you have long keys or many columns. The data format is nearly identical to Diff encoding, so there is not an image to illustrate it. Prefix Tree encoding was introduced as an experimental feature in HBase 0.96. It provides similar memory savings to the Prefix, Diff, and Fast Diff encoder, but provides faster random access at a cost of slower encoding speed. Prefix Tree may be appropriate for applications that have high block cache hit ratios. It introduces new 'tree' fields for the row and column. The row tree field contains a list of offsets/references corresponding to the cells in that row. This allows for a good deal of compression. Which Compressor or Data Block Encoder To Use?
The compression or codec type to use depends on the characteristics of your data. Choosing the wrong type could cause your data to take more space rather than less, and can have performance implications. In general, you need to weigh your options between smaller size and faster compression/decompression. Following are some general guidelines, expanded from a discussion at Documenting Guidance on compression and codecs. If you have long keys (compared to the values) or many columns, use a prefix encoder. FAST_DIFF is recommended, as more testing is needed for Prefix Tree encoding. If the values are large (and not precompressed, such as images), use a data block compressor.
Use GZIP for cold data, which is accessed infrequently. GZIP compression uses more CPU resources than Snappy or LZO, but provides a higher compression ratio. Use Snappy or LZO for hot data, which is accessed frequently. Snappy and LZO use fewer CPU resources than GZIP, but do not provide as high of a compression ratio. In most cases, enabling Snappy or LZO by default is a good choice, because they have a low performance overhead and provide space savings.
Before Snappy became available by Google in 2011, LZO was the default. Snappy has similar qualities as LZO but has been shown to perform better.
... View more
Labels:
08-04-2016
11:03 AM
12 Kudos
This article is applicable for anyone deploying a Kafka cluster for production use. It points out certain high level points that you should be thinking about before deploying your cluster.
It is divided into 3 parts: Important configurations for the Producer, Broker, Consumer and tips for Performance tuning.
Important Client /
Producer Configurations:
The most important
producer configurations:
compression
The compression type
for all data generated by the producer. The default is none (i.e. no
compression). Valid values arenone, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of
batching will also impact the compression ratio (more batching means better
compression).
Name:
compression.type Default:
None
sync vs async production
Batching is one of the
big drivers of efficiency, and to enable batching the Kafka producer will
attempt to accumulate data in memory and to send out larger batches in a single
request.
Name:
producer.type Default:
sync
batch size (for async producers)
A small batch size
will make batching less common and may reduce throughput (a batch size of zero
will disable batching entirely). A very large batch size may use memory a bit
more wastefully as we will always allocate a buffer of the specified batch size
in anticipation of additional records.
Name:
batch.size Default:
200
maximum message size
This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.
Account for this when doing disk sizing. Average Message size+Retention Period * Replication factor
Name:
max.message.bytes Default: 1,000,000
acks
The number of
acknowledgments the producer requires the leader to have received before
considering a request complete. This controls the durability of records that are
sent. The following settings are common:
acks=0 If set to zero then the producer will not wait for any
acknowledgment from the server at all. The record will be immediately added to
the socket buffer and considered sent. No guarantee can be made that the server
has received the record in this case, and theretries configuration will not take effect (as the client won't
generally know of any failures). The offset given back for each record will
always be set to -1.
acks=1 This will mean the leader will write the record to its
local log but will respond without awaiting full acknowledgement from all
followers. In this case should the leader fail immediately after acknowledging
the record but before the followers have replicated it then the record will be lost.
acks=all This means the leader will wait for the full set of
in-sync replicas to acknowledge the record. This guarantees that the record
will not be lost as long as at least one in-sync replica remains alive. This is
the strongest available guarantee.
Name:
acks Default:
1
To see full list of
producer configs:
http://kafka.apache.org/documentation.html#producerconfigs
Brokers:
The important configurations
are the following:
compression.type
Specify the final compression
type for a given topic. This configuration accepts the standard compression
codecs ('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which
is equivalent to no compression; and 'producer' which means retain the original
compression codec set by the producer.
Name:
compression.type Default:
producer
num.replica.fetchers
Number of fetcher threads
used to replicate messages from a source broker. Increasing this value can
increase the degree of I/O parallelism in the follower broker.
Name:
num.replica.fetchers Default:
1
offsets.topic.replication.factor
The replication factor for the offsets topic
(set higher to ensure availability). To ensure that the effective replication
factor of the offsets topic is the configured value, the number of alive
brokers has to be at least the replication factor at the time of the first
request for the offsets topic. If not, either the offsets topic creation will
fail or it will get a replication factor of min(alive brokers, configured
replication factor).
Name:
offsets.topic.replication.factor Default:
3
Credits: http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
Number of partitions for a topic
Number of topics and partitions impact how much can be stored in page cache
Topic/Partition is unit of parallelism in Kafka
Partitions in Kafka drives the parallelism of consumers
Throughput requirements drive number of number of partitions on a topic.
Formula:
P= Throughput from producer to a single partition is
https://github.com/apache/kafka/blob/trunk/bin/kafka-producer-perf-test.sh
C= Throughput from a single partition to a consumer
https://github.com/apache/kafka/blob/trunk/bin/kafka-consumer-perf-test.sh
T=Target throughput–Required # of partitions= Max (T/P, T/C)
CONSUMERS:
max.partition.fetch.bytes
The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be #partitions * max.partition.fetch.bytes . This size must be at least as large as the message.max.bytes or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.
fetch.message.max.bytes
The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as
message.max.bytes or else it is possible for the producer to send messages larger than the consumer can fetch.
Performance Tuning
General
num.io.threads should be greater than the number of disks dedicated for Kafka. I strongly recommend to start with same number of disks first.
num.network.threads adjust based on the number of producers + number of consumers + replication factor
Message size will affects network bandwidth. For higher performance kafka cluster, use 10GbEcards
Kernel tuning
While most Kafka deployments contain a lot of RAM, you can get comparable performance with spinning disks depending on the type of IOPS you need.
Give huge page cache & avoid disks if you are going to be doing random IO operations.
Kafka broker will always write to page cache (OS) first
Messages are flushed by based on several configurations
– http://kafka.apache.org/documentation.html
Flush message to disk controlled by log.flush.interval.message
Defaults to Long.MAX_VALUE which is very big
The number of messages written to a log partition before we force an fsync on the log
Name: log.flush.interval.message
The per-topic override for log.flush.interval.messages
Name: log.flush.interval.ms.per.topic
OS will flush (pdflush) regardless of kafka params
https://en.wikipedia.org/wiki/Page_cache
Default flush configurations will cover most use cases as the durability is provided by replication
Source: http://queue.acm.org/detail.cfm?id=1563874
Disk throughput & durability
If you enable auto commit enable.auto.commit , add longer intervals for auto.commit.interval.ms (default is 500 ms)
Since Kafka has replication the redundancy provided by RAID can also be provided at theapplication
Isolate disk and only use to store the application data
Using multi disk with multi-directory often resulted in better performance than using RAID
RAID can potentially do a better load balancing of the data at the low-level. But the major downside of RAID is usually a big performance hit for write throughput and reduces the available disk space. Another potential benefit of RAID is the ability to tolerate disk failures. Rebuilding the RAID array is so I/O intensive that it effectively disables the server,so this does not provide much real availability improvement.
Java/JVM tuning
Minimize GC pauses by using Oracle JDK it uses new G1 garbage-first collector
Kafka Heap Size
From HCC Article, by default kafka-broker jvm is set to 1Gb this can be increased using Ambari kafka-env template. When you are sending large messages JVM garbage collection can be an issue. Try to keep the Kafka Heap size below 4GB.
Example: In kafka-env.sh add following settings.
export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
Kafka Deployment Example
8 coredual socket (16CPU)
64-128 Gig ofRam
8-16GB Kafka JVM
Rest for OS + Page Cache
8x2TB disk
EXT4
XFS may handle locking during fsync better. EXT4 mostly used in field
SAS or SSD preferred
Based on Retention period
Account for Broker and # of partitions it will handle
JBOD – Assuming you will use replication
Optional
RAID10
The primary downside of RAIDis that it is usually a big performance hit for write throughput and reduces the available disk space.
10GigE Bonded NICs for extreme performance
LinkedIn Published Benchmarks
SOURCE: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
Single producer thread, no replication
821,557 records/sec(78.3 MB/sec)
Single producer thread, 3x asynchronous replication
786,980 records/sec(75.1 MB/sec)
... View more
Labels:
07-25-2016
08:03 PM
1 Kudo
@Timothy Spann There is no officially supported processor to schedule VORA jobs using NiFi. However, A VORA agent communicates directly with the Spark Client when running in Yarn mode. You can write your program in Python or Scala which invokes the VORA classes and then call those scripts through spark-submit in NiFi using the ExecuteCommand processor.
... View more
07-08-2016
04:56 PM
Hey Joe, What is the latest on this? Are we offering this or is there a timeline for it? Thanks, Vedant
... View more
05-11-2016
04:10 PM
1 Kudo
Hi Scott, I think you can ignore this message and wait for a few more minutes before the machines come up. This may just be a bug. --Vedant
... View more
02-02-2016
08:09 PM
I found the solution. I had copied this notebook from an earlier version of zeppelin by pasting it in the /opt/incubator-zeppelin/notebook/. That will not work. I believe the newer version of Zeppelin will include the ability to import notebooks. However, copying and pasting the notebook's note.json file between the same version allows you to import a notebook successfully as long as you are copying between the same versions.
... View more
02-02-2016
04:58 PM
@lmccay Thanks for the info Larry. We are in the process of working with the prospect. I'll let you know when we cross that bridge.
... View more
01-29-2016
11:11 PM
@Michael Young try $PHONEIX_HOME/bin/sqlline.py <hostname>:2181:/<hbase-unsecure>
... View more