Member since
03-01-2016
104
Posts
97
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1674 | 06-03-2018 09:22 PM | |
28354 | 05-21-2018 10:31 PM | |
2210 | 10-19-2016 07:13 AM |
06-03-2018
09:22 PM
@MB My responses: 1. Yes more datanodes always help , you can have data replicated across nodes by choosing replication factor of your choice. Default is 3. Having DN and NN with sufficient resources on node is not a bad idea.Critical prod clusters could have a dedicated NN to avoid IO overheads caused by DNs creating any potential issues for NN. 2. Not sure the context behind Cassandra but having different spec nodes in a cluster should not be a problem. For better management of resources , try creating config groups of nodes and allocate / isolate better spec nodes to components which are critical for your use case or which may need more resources. 3. Research more but I dont think it should be a problem. 4. Should not be.
... View more
05-21-2018
10:31 PM
@Mike Wong 1. Is it a new or an existing cluster ? How many total nodes you have in cluster ? 2. Plz provide us output of following command from all zookeeper server nodes echo 'stat' | nc <ZK_HOST> 2181 Keeperexceptions could many times be due to large number of znode counts in zookeeper for various services. Also check zoo.cfg of all ZK nodes and verify if this file is identical across all nodes and hostnames for zk nodes referred are identical as well.
... View more
05-16-2018
07:15 AM
1 Kudo
Sqoop has been popular to be a client interfacing between relational databases and Hive / HDFS etc for importing and exporting datasets. It also supports importing table to HBase as well. The most common use case is to import data in bulk from Databases such as Oracle for the initial loads and then using tools like GoldenGate to continuously replicate data. In this article, we will import 4 million rows from a Mysql table to HBase table. Steps are as follows: Create a table in mysql CREATE TABLE sqooptest (id int NOT NULL PRIMARY KEY AUTO_INCREMENT, val int); Create a procedure in Mysql to generate some random data in the columns defined. DELIMITER $$
CREATE PROCEDURE prepare_data1()
BEGIN
DECLARE i INT DEFAULT 100;
WHILE i < 4000000 DO
INSERT INTO sqooptest (val) VALUES (i);
SET i = i + 1;END WHILE;
END$$
DELIMITER ; When you call this procedure , you'll have 4 million records in mysql table : CALL prepare_data1(); Lets Create HBase table from HBase shell. Shell > create ‘sqooptest’, ‘A’ insert one dummy record. Shell > put ‘sqooptest’, ‘1’, ‘A:val’, ‘123’ Now lets run Sqoop command to import data from mysql to HBase ( uses regular “put” internally ).There is another way and which is bulk loading using sqoop , we will discuss that shortly. sqoop import --connect jdbc:mysql://example.com/test --driver com.mysql.jdbc.Driver--username root -P --table sqooptest --hbase-table sqooptest --hbase-row-key id --column-family A —split-by id -m 8 Where --driver specifies database driver --connect indicates connection string to Mysql. --username is database user --table is database table name --hbase-table is table name at HBase side. --hbase-row-key is the primary key of source DB which you would like to consider as HBase row key. It would be a comma separated composite key as well. -- column-family is the family where writes are going to land in HBase. -- split-by is the column name at source DB which should be used to split the data to be imported across give number of mappers. So if you have defined 8 mappers for this job , column specified in this field would be divided equally in parts to be served by each mapper. Some additional useful options: --fetch-size is the number of rows which should be fetched by each mapper in one iteration from source DB. Increasing fetch-size corresponding to mapper container size (memory / CPU / bandwidth to be considered ) has proved to be beneficial in making the job faster and efficient. --hbase-bulkload enables bulkloading of data in HBase. Finally, the output of this job looks like any other regular mapreduce job. 18/05/16 06:42:52 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 32.4191 seconds (0 bytes/sec)18/05/16 06:42:52 INFO mapreduce.ImportJobBase: Retrieved 3999900 records. References: http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_importing_data_into_hbase
... View more
Labels:
04-15-2018
12:02 PM
8 Kudos
In this article series, part 1 , part 2 , part 3, part 4 covered various Hbase tuning parameters, scenarios, system side of things etc, in last and part 5 of this series, I will discuss little bit about Phoenix performance parameters and general tips for tuning.
I am taking an example of a query which was performing very slow and how we investigated this situation. We will start by reading explain plan of this query. I cannot quote exact query (customer's data) here but it was a select query with some where clause and finally order by conditions. The explain plan of the query is as follows:
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
| CLIENT 5853-CHUNK 234762256 ROWS 1671023360974 BYTES PARALLEL 5853-WAY FULL SCAN OVER MESSAGE_LOGS.TEST_MESSAGE | | SERVER FILTER BY ((ROUTING IS NULL OR ROUTING = ‘TEST') AND TESTNAME = 'createTest' AND TESTID = ’TEST’ AND ERRORCODE | | SERVER TOP 20 ROWS SORTED BY [CREATEDTIMESTAMP DESC, TESTID] | | CLIENT MERGE SORT |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
4 rows selected (0.958 seconds)
Firstly, lets learn to dissect an explain plan of a Phoenix query, following are my observations looking at this plan:
First statement
”CLIENT” , means this statement shall be executed at client side.
“5853-CHUNK”, this means query plan has logically divided the data in about 5853 chunks . And each chunk would utilize one single thread. So for future reference, lets keep it in mind, one chunk == one thread of client thread-pool.
“234762256 ROWS”, means these many rows will be processed by this query. Self explanatory.
“1671023360974 BYTES”, means about 1.6 TB of data will be processed.
“PARALLEL”, so the query processing on these 5853 chunks (5853-WAY) would be done in parallel.
“FULL SCAN OVER MESSAGE_LOGS.TEST_MESSAGE“ , this means it will scan entire table , most inefficient way and anti-pattern for Hbase / Phoenix use cases. This table requires secondary index to convert this full scan into a range scan.
Second Statement
“SERVER” , processing would happen in region servers
“FILTER BY” , returns only results that match the expression
Third Statement
“SERVER” , processing happening on server side, specifically “SORTED BY”
Fourth Statement
“CLIENT MERGE SORT”, meaning all the SORTED ROWS at server side would be brought back to client node and be merge sorted again.
What tuning was done to make query run faster ?
5853 chunks appeared too much for the query specially with a thread-pool having at default value of 128, this was making whole query get slower as only 128 threads would work at a time and rest all tasks would wait in queue. (phoenix.query.queueSize)
We decided to bump up thread-pool (phoenix.query.threadPoolSize) from default 128 to about 1000 , but customer did not have enough CPU cores on client side and he feared CPU contention there if we go beyond this number, so we decided to go for another tuning.
We increased guidepost width (phoenix.stats.guidepost.width) which are markers to logically distribute data in chunks. (from its default 100 MB to 500 MB ). This effectively reduced the number of chunks and hence the threads.
Read more about all tuning parameters including above ones here.
For making this query more effective, recommended customer to create secondary index on top of this data table and include most frequently used columns in it. Read more about secondary index here.
Thus after all the changes in place, the query which was earlier taking about 5 minutes was now taking about 15 - 20 seconds.
Tuning recommendations in general :
For improving read performance, create global secondary index, it will have some write penalty as data of chosen columns for index would be duplicated to another table.
For improving write performance, pre-split the table if you know the key ranges , also consider going for Local index which is written in same table and being added as another column. Local indexes will be more stable with HDP 3.0 with lot of bug fixes.
Choose most frequently used columns for primary key. Since all these columns are concatenated to form Hbase’s “row key” ,their order of appearance in row-key as well as its length matters. Order matters because if most frequently used column comes first in row key the range scan becomes more efficient. Length matters because this row key will be part of each cell and hence would occupy some memory and some disk.
Use Salt Buckets if you have a monotonically increasing row-key . Read more about it here
Please note Salting would incur read penalties as scans would be repeated for each bucket.
Don’t create too many salt buckets , thumb rule is to be equal to number of region servers in your cluster.
Reference:
http://phoenix.apache.org/explainplan.html
https://phoenix.apache.org/tuning.html
https://phoenix.apache.org/tuning_guide.html
Also see : PART 1 , PART 2 , PART 3, PART 4
... View more
Labels:
04-15-2018
11:36 AM
11 Kudos
In my articles part 1 and part 2 , I explained various parameters which could be tuned for achieving optimized performance from Hbase. In part 3, we discussed some scenarios and aspects to focus while investigating performance issues. Continuing this series, part 4 covers some system and network level investigations.
DISKS
Along with investigating potential issues at HBASE and HDFS layer, we must not ignore system side of thingslike OS, network and disks. We see several cases everyday when severe issues at this layer are identified.Detailed investigation is beyond the scope of this article , but we should know where to point fingers. The triggering factor to look at system side are messages such as following in datanode logs at the time of performance issue.
WARN datanode.DataNode (BlockReceiver.java:receivePacket(694)) - Slow BlockReceiver write data to disk cost:317ms (threshold=300ms)
Following are some of several tests we could do to ascertain disk performance:
- Run dd test to check read and write throughput and latencies.
For checking write throughput:
dd bs=1M count=10000 if=/dev/zero of=/data01/test.img conv=fdatasync
For checking read throughput:
dd if=/data01/test.img of=/dev/null bs=1M count=10000
Where /data is one of your data node data disk.
- For checking latencies either during read or write, prepend “time” command before above commands and you will know how much time it took to complete these operations and also if the actual delays were from user side or system side. Compare these results with the agreed upon throughputs with your storage vendor / Cloud service providers.
- Another important tool is Linux “ iostat” command which provides great deal of advanced information to diagnose such as how much time an IO request was in IO scheduler queue, disk controller queue, how many requests were waiting in queues, how much time disk took to complete an IO operation etc.
- This command could very well explain if your work load is way beyond your disk capacities or if your disks have issues either at hardware or driver / firmware level.
Another detailed article could be written to explain each and every parameter specified in this command but that’s beyond the scope of this article, some parameters though need highlighted:
A. Await: Covers the time that is taken through scheduler, driver, controller, transport (for example fibre san), and storage needed to complete each IO. Await is the average time, in milliseconds, for I/O requests completed by storage and includes the time spent by the requests in the scheduler queue and time spent by storage servicing them.
B.. avgqu-sz : the average number of IO queuedwithin both the IO scheduler queue and storage controller queue.
C. Svctm : Actual service time storage / disk took to serve IO request excluding all queue latencies.
D. Util : Percentage utilization of each disk.
- Needless to say, you would always check commands like top / vmstat/mpstat for identifying issues related to CPU / Memory / Swapping etc.
- Last but most important command to see live streaming of whats happening at your IO layer is “iotop” command.This command would give you real time details of which command , process and user is actually clogging your disks.
Some general tuning tips :
- Selecting right IO scheduler is very critical to latency sensitive work loads. “deadline” scheduler is proven to be the best scheduler for such use cases.Check and correct which scheduler you are getting your IO processed with:
[root@example.com hbase]# cat /sys/block/sd*/queue/scheduler
noop [deadline] cfq
- Choose right mount options for your data disks. Options such as “noatime” saves a great deal of IO overhead on data disks and in turn improve their performance.
- Check which mode your CPU cores are running in. We recommend them to run in performance mode. Virtual machines and cloud instances may not have this file.
echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
- Flushing of large pool of accumulated “dirty pages” to disks has been seen to be causing a significant IO overhead on systems. Please tune the following kernel parameters controlling this behavior. There is no single number which is suited for all and trial and error is a best resort here, but with systems having large amount of memory, we can keep this ratio smaller than their default values so that we dont end up accumulating a huge pool of dirty pages in memory eventually to be burst synced to disks with limited capacities and degrading application performance.
vm.dirty_background_ratio (default is 10)
vm.dirty_ratio (default is 40 )
Read more about this parameter here :
NETWORK
Network bandwith across the nodes of a HDP cluster plays critical role in heavy read / write use cases. It becomes further critical in distributed computing because any one limping node is capable of degrading entire cluster performance. While we are already in the world of gigabit networks and generally things are stable at this front. However we continue to see issues this side, messages such as the following seen in datanode logs could prove to be the triggering factor to investigate network side of things:
WARN datanode.DataNode (BlockReceiver.java:receivePacket(571)) - Slow BlockReceiver write packet to mirror took 319ms (threshold=300ms)
Following are some of the tools / commands we can use to find out if something is wrong here:
- “iperf” to test network bandwidth between nodes. See more details about iperf here.
- Use Linux commands like ping / ifconfig and “netstat -s” to find out there are any significant packet drops / socket buffer overruns and if this number is increasing over time.
-ethtool ethX command would help you provide negotiated network bandwidth.
- ethtool -S would help collect NIC and driver statistics.
Some general tuning tips:
- Generally, any sort of NIC level receive acceleration does not work well with our use cases and in turn prove to be a performance bottleneck in most scenarios. Disable any acceleration enabled on your NIC cards ( of course after consultation with your platform teams) :
- Check if receive offloading is enabled:
$ grep 'receive-offload' sos_commands/networking/ethtool_-k_eth0 | grep ': on'
generic-receive-offload: on
large-receive-offload: on
- Disable them using following commands:
# ethtool -K eth0 gro off
# ethtool -K eth0 lro off
Referenece
- Make sure MTU size is uniform across all nodes and switches in your network.
- Increase socket buffer sizes if you observe consistent overruns / prunes / collapses of packets as explained above. Consult your network and platform teams as how to tweak these values.
KERNEL / MEMORY
A tuned kernel is a mandatory requirement for the nature of work load you are expecting any node to process. However it has been seen that kernel tuning is often ignored at the time of design of such infrastructures. Although its a very vast topic to cover and is beyond the scope of this article, I will mention some important kernel parameters related to memory management which must be tuned on HDP cluster nodes. These configuration parameters stay in /etc/sysctl.conf
- vm.min_free_kbytes: Kernel tries to ensure that min_free_kbytes of memory is always available on the system. To achieve this the kernel will reclaim memory. Keeping this parameter to be about 2 - 5 % of total memory on node makes sure that your applications do not suffer due to prevailing memory fragmentation.
The first symptom of memory fragmentation is the appearance of message such as “Page Allocation Failures” in /var/log/messages or “dmesg” (kernel ring buffer) or worst, when kernel starts killing processes to free up memory by “OOM Killer”.
- vm.swappiness : This is a parameter reflecting tendency of a system to swap. Default value is 60. We dont want system to swap on its will , so keep this value to about 0 - 5 to keep system's swap tendencies to be minimal.
- It has been seen that transparent hugepages do not work well with the kind of work load we have. Its thus recommended to disable THP on our cluster nodes.
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
- On modern NUMA (read here about NUMA ) systems , we strongly recommend to disable zone reclaim mode. This is based on understanding that performance penalties incurred due to reclaiming pages from within a zone are far worse than having the requested page served from another zone. Usually applications strongly dependent on using cache prefer having this parameter disabled.
vm.zone_reclaim_mode = 0
All these kernel level changes can be made on running systems by editing /etc/sysctl.conf and running sysctl -p command to bring them into effect.
In last part (PART 5) of this article series , I will discuss Phoenix performance tuning in details.
Also see : PART 1 , PART 2 , PART 3
... View more
Labels:
04-15-2018
10:15 AM
13 Kudos
As we understood important tuning parameters of Hbase in part 1 and part 2 of this article series, this article focuses on various areas which should be investigated when handling any Hbase performance issue.
Locality
By locality we mean the physical HDFS blocks related to Hbase Hfiles need to be local to the region server node where this respective region is online. This locality is important because Hbase prefers to use short circuit reads directly from physical disks bypassing HDFS. If a region’s Hfiles are not local to it, it will incur cost to a read latency by doing reads across the node over network.
One can monitor locality of a table / region from Hbase Master UI specifically on table’s page by clicking on the listed table name. The value of “1”on the locality column means 100% block locality. Overall Hbase locality is visible on Ambari metrics section of Hbase ( in percentage).
Here, Major compaction tries to bring back all Hfiles related to a region on a single region server thus restoring locality to a great extent.
Locality is generally messed up due to balancer run by HDFS which tries to balance disk space across data nodes OR by Hbase balancer which tries to move regions across region server nodes to balance the number of regions on each server.
Hbase balancer (default is Stochastic Load Balancer ) can be tuned by tweaking various costs ( region load, table load, data locality, MemStore sizes, store file sizes) associated with it and have it run according to our requirements, for example , to have balancer prefer Locality cost more than anything else , we can add following parameter in hbase configs and give it a higher value. (Default value is 25).
( an advanced parameter and an expert must be consulted before such an addition.)
hbase.master.balancer.stochastic.localityCost
To overcome locality harm done by HDFS balancer we have no solution as of date except running compaction immediately after HDFS balancer is run. There are some unfinished JIRAs which once implemented would bring in features like block pinning and favored nodes , once they are available Hbase can configure its favored nodes and writes would be dedicated only to those nodes and HDFS balancer won’t be able to touch its respective blocks. (Refer HBASE-15531 to see all unfinished work on this feature )
Hotspotting
Hotspotting has been discussed quiet a lot but its important to mention it here as its a very crucial aspect to be investigated during performance issues. Basically hotspotting appears when all your write traffic is hitting only on a particular region server. And this might have happened because of the row key design which might be sequential in nature and due to that all writes get landed to a node which has this hot spotted region online.
We can come over this problem using three ways (I mean I know three ways) :
Use random keys - Not so ideal solution as it would not help in range scans with start and stop keys
Use Salt buckets - If you have Phoenix tables on top of hbase tables, use this feature. Read more about this here
Use Pre-splitting - If you know the start and end keys of your sequential keys, you can pre split the table by giving split key points beforehand at the time of creation. This would distribute empty regions across nodes and whenever writes come on for a particular key it would get landed to respective node eventually distributing the write traffic across nodes.Read more about here.
HDFS
HDFS layer is very important layer as no matter how optimized your Hbase is, if datanodes are not responding as expected you would not get performance as expected. Anytime you have latencies on Hbase / Phoenix queries and you observe following messages in region server logs in a large number :
2018-03-29 13:49:20,903 INFO [regionserver/example.com/10.9.2.35:16020] wal.FSHLog: Slow sync cost: 18546 ms, current pipeline: [xyz]
OR
Following messages in datanodes logs at the time of query run:
2018-04-12 08:22:51,910 WARN datanode.DataNode (BlockReceiver.java:receivePacket(571)) - Slow BlockReceiver write packet to mirror took 34548ms (threshold=300ms)
OR
2018-04-12 09:20:57,423 WARN datanode.DataNode (BlockReceiver.java:receivePacket(703)) - Slow BlockReceiver write data to disk cost:3440 ms (threshold=300ms)
If you see such messages in your logs, Its time to investigate things from HDFS side such as if we have sufficient datanode transfer threads , heap , file descriptors, checking logs further to see if there are any GC or Non GC pauses etc. Once confirmed on HDFS side we must also look at underlying infrastructure side (network, disk, OS). This is because these messages mostly convey that HDFS is having hard time receiving / transferring block from/to another node or to sync the data to disk. We will discuss about system side of things in part 4 of this article series.
BlockCache Utilization and hitRatio
When investigating performance issues for read traffic, its worth checking how much your Block Cache and Bucket cache are helpful and whether they are getting utilized or not.
2017-06-12 19:01:48,453 INFO [LruBlockCacheStatsExecutor] hfile.LruBlockCache: totalSize=33.67 MB, freeSize=31.33 GB, max=31.36 GB, blockCount=7, accesses=4626646, hits=4626646, hitRatio=100.00%, , cachingAccesses=4625995, cachingHits=4625995, cachingHitsRatio=100.00%, evictions=24749, evicted=662, evictedPerRun=0.026748554781079292
2017-06-12 19:02:07,429 INFO [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=0, totalSize=46.00 GB, freeSize=45.90 GB, usedSize=106.77 MB, cacheSize=93.21 MB, accesses=9018587, hits=4350242, IOhitsPerSecond=2, IOTimePerHit=0.03, hitRatio=48.24%, cachingAccesses=4354489, cachingHits=4350242, cachingHitsRatio=99.90%, evictions=0, evicted=234, evictedPerRun=Infinity
Flush Queue / Compaction queue
During the crisis hours when you are facing severe write latencies ,its very important to check how memstore flush queue and compaction queue look like. Lets discuss couple of scenarios here and some possible remedies here (need expert consultation )
A. Flush queue not reducing: This leads us to three additional possibilities :
A.1 Flushes have been suspended for some reason , one such reason could be a condition called “too many store files” seen somewhere down in region server logs (dictated by hbase.hstore.blockingStoreFiles). Check my part 2 article to know more about this parameter and how to tune it. Simply put, this parameter blocks flushing temporarily till minor compaction is completed on existing Hfiles. May be increasing this number few folds at the time of heavy write load should help.
Here , we can even help minor compaction by assigning it more threads so that it finishes compaction of these files faster:
hbase.regionserver.thread.compaction.small (default value is 1 , we can tune it to say 3 )
A.2 Another possibility could be that flusher operation itself is slow and not able to cope up with write traffic which lead to slow down of flushes. We can help this flusher by allocating few more handler threads using :
hbase.hstore.flusher.count (default value is 2, we can bump it to say 4 )
A.3 There is another possibility seen in such cases and which is of “flush storm” , this behavior triggers when number of Write ahead log files reach their defined limit (hbase.regionserver.maxlogs) and and region server is forced to trigger flush on all memstores until WAL files are archived and enough room is created to resume write operations. You would see messages like:
2017-09-23 17:43:49,356 INFO[regionserver//10.22.100.5:16020.logRoller] wal.FSHLog:Too many wals: logs=35, maxlogs=32; forcing flush of 20 regions(s): d4kjnfnkf34335666d03cb1f
Such behaviors could be controlled by bumping up:
hbase.regionserver.maxlogs (default value is 32, double or triple up this number if you know you have a heavy write load )
B.Compaction queue growing : compaction_queue=0:30 —> ( meaning 0 major compactions and 30 minor compactions in queue) . Please note that compaction whether minor or major is an additional IO overhead on system, so whenever you are trying to fix a performance problem by making compactions faster or accommodating more Hfiles in one compaction thread, you must remember that this medicine itself has its own side effects. Nevertheless, we can tune to make compaction more efficient by bumping up following parameters:
Lower bound on number of files in any minor compaction
hbase.hstore.compactionThreshold : ( Default3 )
Upper bound on number of files in any minor compaction.
hbase.hstore.compaction.max ( Default10 )
The number of threads to handle a minor compaction.
hbase.regionserver.thread.compaction.small ( Default1 )
The number of threads to handle a major compaction.
hbase.regionserver.thread.compaction.large ( Default => 1 )
JVM metrics
Various performance issues drill down to JVM level issues , most specifically related togarbage collection STW (stop the world) pauses which bring down application / region server performance or some times brings them down to halt / crash.
There are several possibilities under which you can have long GC pauses and I wont be able to consider them all here. But the least you can do is have your region server’s gc.log file analyzed by one of many online tools such as this , this tool would help you analyze what’s going wrong in your JVM or garbage collection behavior specifically.
Also , if you have huge number of regions on every region server (500 + ) , consider moving to G1 GC algorithm, even though Hortonworks does not officially support it, but we are in process of it and many of our customers have implemented it successfully.
In your spare time, also go through this GC tuning presentation.
Without going into too many details, I would like to mentionfew most basic thumb rules:
With CMS GC algorithm , never set region server heap greater than. 36 - 38 GB, if you have more requirements, switch to G1GC.
Young generation should never be more than 1/8th or 1/10 th of total region server heap.
Start your first tweak in reducing GC pauses by changing -XX:ParallelGCThreads , which is 8 by default and you can safely take it till 16 (watch the number of CPU cores you have though )
Check who contributed to GC pause “user” or “sys” or “real”
2017-10-11T14:06:17.492+0530: 646872.162: [GC [1 CMS-initial-mark: 17454832K(29458432K)] 20202988K(32871808K), 74.6880980 secs] [Times: user=37.61 sys=1.96, real=74.67 secs]
‘real’ time is the total elapsed time of the GC event. This is basically the time that you see in the clock.
‘user’ time is the CPU time spent in user-mode(outside the kernel).
‘Sys’ time is the amount of CPU time spent in the kernel. This means CPU time spent in system calls within the kernel.
Thus in above scenario, “sys” time is very small but still “real” time is very high indicating that GC did not get enough CPU cycles as it needed indicating a heavily resource clogged system from CPU side.
There is another category of pause which we see regularly:
2017-10-20 14:03:42,327 INFO [JvmPauseMonitor] util.JvmPauseMonitor: Detected pause in JVM or host machine (eg GC): pause of approximately 9056ms
No GCs detected
This category of Pause indicate that JVM was frozen for about 9 seconds without any GC event. This situation mostly indicate a problem at physical machine side possibly indicating an issue at Memory / CPU or any other OS issue which is causing whole JVM to freeze momentarily.
In part 4 of this series, I will cover some infrastructure level investigation of performance issues.
Also see : PART 1 , PART 2 , PART 4, PART 5.
... View more
Labels:
04-12-2018
05:44 PM
for exporting table to another cluster, snapshot is the best method. https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.2/bk_hbase_snapshots_guide/content/ch_hbase_snapshots_chapter.html
... View more
04-12-2018
05:31 PM
You can check hbase root directory from hbase-site.xml but usually the directory format is somthing like -> /apps/hbase/data/data/schema/table/region/column-family-store/store-files
... View more
04-12-2018
06:40 AM
12 Kudos
As we understood basic parameters of Hbase in Part 1 , lets try and understand some advanced parameters which should be tried after consultation with experts or by those who know what they are doing.
hbase.hstore.blockingStoreFiles
Default value of this parameter is 10. We know that each memstore flush creates an Hfile (hbase.hregion.memstore.flush.size), now the purpose this parameter serves is to send a message along the write pipeline that unless these many Hfiles are compacted using minor compaction, we should not go ahead with any more flushes. One would see messages in logs such as “Too many HFiles, delaying flush” . But like it says, it can only delay flush up to certain seconds, and even if the writes continue to happen, memstore could stretch only up to the size guided by :
hbase.hregion.memstore.flush.size X hbase.hregion.memstore.block.multiplier
Once this limit reaches , no more writes will be accepted by region server for this region and you will see messages like "org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit” in logs.
Situation will come under control and writes will resume once minor compaction gets over.One can always increase this parameter to avoid any potential issues during such heavy write traffic and could in turn make channels more productive.To help further, one could also increase hbase.hstore.compaction.max to a higher value so that more Hfiles are covered in compaction process. Lets discuss it below in details.
hbase.hstore.compaction.max
Default value is 10. Like I said above, under situations of heavy write load , you can tune this parameter and thus have minor compaction cover more Hfiles and help stuck write traffic resume. Please note that compaction itself has its own IO overhead so keep this in mind when you bump up this number.
hbase.hregion.max.filesize
Default value is 10 GB. Virtually can be used to control the rate of splitting of regions in Hbase. Once “any" one of the store (Column family) within a region reaches this value, the whole region would go for split. To disable splitting virtually , keep it to a very high number like ( 100 gb ) and set Splitting policy to be :
hbase.regionserver.region.split.policy = org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy
zookeeper.session.timeout
Default value is 90 seconds. We know that region server maintain a session with zookeeper server to remain active in cluster, for this each one of the region server has its own ephemeral znode in zookeeper. As soon as session gets disconnected or timed out for any reason, this znode get deleted and region server gets crashed. It is zookeeper.session.timeout which “partly” dictates negotiated session timeout with zookeeper server at the time of startup of region server. Now why “partly” ? This is because zookeeper server itself has a minimum and maximum session timeout defined for its clients like hbase, as per following formula :
Minimum session timeout : 2 X tick time
Maximum session timeout : 20 x tick time
Now, no matter what session timeout you set in your client configs, if your zookeeper server timeout is less than that, it would only negotiate that value with client. For example default tick time in zookeeper configuration is 2 seconds , which means maximum session timeout could not be bigger than 40 seconds, so no matter Hbase keeps its timeout to be 90 seconds, the negotiated value would only be 40 seconds. For increasing session timeouts in hbase so that your region servers could tolerate minor fluctuations coming out of GC pauses or due to network or any other transient problems either at hbase or zookeeper, consider increasing “tick time”.Tick time higher than 4 - 5 seconds is not recommended as it could impact the health of your zookeeper quorum.
hbase.rpc.timeout
Default value is 300000 ms. This is the timeout which a client gets for a RPC call which it makes to Hbase. Set it proportional to what your client / job / query requirements are. However don’t keep it too big that clients report issues and subsequently fail after a long time. When we talk about hbase.rpc.timeout, we talk about two more parameters in the same breath. They are right below.
hbase.client.scanner.timeout.period
Default value is 300000 ms.The time which any hbase client scanner gets to perform its task (scan) on hbase tables. It is a replacement of an earlier parameter called “hbase.regionserver.lease.period” . The difference here is, this timeout is specifically for RPCs that come from the HBase Scanner classes (e.g. ClientScanner) while hbase.rpc.timeout is the default timeout for any RPC call.Please also note that hbase.regionserver.lease.period parameter is deprecated now and this parameter replaces it. Thus looks like this parameter is going to take care of lease timeouts as well on scanners.
Rule of thumb is to keep hbase.rpc.timeout to be equal to or higher than hbase.client.scanner.timeout.period , this is because no matter scanner is doing its job scanning rows but if in between hbase.rpc.timeout expires , the client session would be expired.This could result in exceptions such as “ScannerTimeoutException or UnknownScannerException”.
However, there is one more parameter which comes at play when we discuss above timeouts and that I would discuss below:
hbase.client.scanner.caching
Default value is 100 rows. Basically this is the number of rows which a client scanner pulls from Hbase in one round ( before “scanner.next” could trigger ) and transfers back to the client. Multiple performance issues could arise (and in fact scanner timeout Exceptions as well ) if you set this value to a very high number as scanner would be burdened fetching these many rows and transferring them back to client , now assume region server or underlying HDFS or the network between client and server is slow for any reason, then it can very well lead to RPC session getting expired, which eventually leading failure of the job, messages like “ClosedChannel Exception” are seen when hbase tries to send back rows to the client whose session is already expired.
While Keeping a smaller count leaves cluster and scanner under utilized, higher number consumes resources such as region server heap and client memory in large quantity. Thus a good number is dependent upon how good resources like disk / memory / CPU you have on cluster nodes as well as how many million rows you need to scan. Go high if demands are high.
Also see : PART 1 , PART 3, PART 4, PART 5 of this series.
... View more
Labels:
04-10-2018
05:39 PM
23 Kudos
Hbase works smoothly in auto pilot mode if one knows how to tune several of the knobs on its dashboard. Not only its important to understand each knob but also what its dependencies are with other knobs. There are several parameters which require tuning based on your use case or work load to make Hbase work in an optimized way. I will try to explain some of the basic parameters in this article. More advanced parameters would be covered in next article.
1. Hbase_master_heapsize
To many’s surprise , a master in Hbase does not do any heavy lifting and hence never require more than 4 - 8 GB in regular setups. Master is basically responsible for meta operations such as create/ delete of tables , keeping check on region servers’ well being using watchers on zookeeper znodes , re-distribution of regions during startup (balancer) or when a region server shuts down. Please note that master's assignment manager keeps track of region states in this memory only and hence if you have huge number of tables / regions, you ought to have proportional amount of heap for master.
2. hbase_regionserver_heapsize
This is a very crucial parameter for region server as most of the data loading / processing would happen in allocated region server heap. This is the heap memory which would accommodate block cache to make your reads faster, and it is this heap that would have region memstores to hold all the writes coming from your users (until they get flushed to the disk). But what is the best value for this heap size? How do we calculate this number?
Well there is no direct formula for this , but if you are using CMS GC algorithm for JVMs, your hard stop for heap is about 36 - 38 GB , otherwise long "stop the world" GC pauses would turn Hbase not only unusable but bring lot of complications w.r.t. the data being stored. Use your best judgement based on number of regions hosted currently, your future projections, any number between 16 GB - 36 GB is a good number, also, you should have a proper plan to tune this parameter incrementally over time based on cluster usage and number of regions added to nodes. With G1GC algorithm there is no restriction on heap size.
One can always check heap usage from Ambari > Hbase> Master UI > Memory tab, if utilization shoots during peak hours to about 60 - 70 % of total heap , its time to increase the heap size further. (unless its a case of memory leak).
3. hbase_regionserver_xmn_max
This parameter sets upper bound on region server heap’s young generation size. Rule of thumb is to keep 1/8th - 1/10th of total heap and never exceeding 4000 Mb.
4. Number of regions on each region server
Discussing this aspect of tuning here as it would help you figure out the best heap size for region servers, memstore size as well as make you explain how these parameters are all dependent on number of regions and how performance of hbase is dependent on all these numbers. We never recommend more than 200 - 400 regions per region server. One can figure out if the existing count in his cluster is an optimized number or not using below formula :
(regionserver_memory_size) * (memstore_fraction) / ((memstore_size) * (num_column_families))
For example, assume :
region server with 16 Gb RAM (or 16384 Mb)
Memstore fraction of .4
Memstore with 128 Mb RAM
1 column family in table
The formula for this configuration would look as follows:
(16384 Mb * .4) / ((128 Mb * 1) = approximately 51 regions
5. file.block.cache.size
This is the portion of total heap which would be used by block cache to make your reads even faster. The data once accessed from disk gets loaded in this cache and the next time any user requests same data, its served from here which is way faster than being served from disk. Caveat here is, keep this number big only if :
a. You have a heavy read use case.
b. Even in read heavy use case , you have your users requesting same data repetitively.
If both conditions do not match , you will be wasting a whole lot of heap loading unnecessary data blocks. In matching conditions, any value between 20 - 40 % is a good value. Again,need to be tuned using trial and error method and what works best for you.
6. hbase.regionserver.global.memstore.size
Portion of total heap used for all the memstores opened for each column family per region per table. This is where all the edits and mutations get landed first during write operation.For write heavy use cases, any value between 20 - 40 % is a good value. Also note that sum of block cache as explained in point 4 above and global memstore size should never be greater than 70 - 75% of total heap, this is so that we have enough heap available for regular hbase operations apart from read and write caching.
7. hbase.hregion.memstore.flush.size
This is the size of each memstore opened for a single column family , during write operations, when this size is used completely , the memstore gets flushed to disk in the form of a hfile. Also to note here that all memstores for a single region would get flushed even if any one of them reaches this size. Each flush operation creates an hfile , so smaller this number, chances of having more frequent flushes, more IO overhead, greater the number of Hfiles getting created and subsequently, greater the number of Hfiles , quicker the compaction getting triggered. And we understand compaction involves additional round of write operations as it writes smaller Hfiles into a bigger Hfile and hence proving to be significant overhead if getting triggered very frequently.
Thus a significantly bigger flush size would ensure lesser Hfiles and lesser compactions, but caveat here is the total heap size and number of regions and column families on each region server. If you have too many regions and column families, you cannot afford to have a bigger flush size under limited total heap size. Ideal numbers are anything between 128 MB to 256 MB.
8. hbase.hregion.memstore.block.multiplier
This is a simple tuning parameter, allows single memstore to get stretched by this multiplier during heavy bursty writes. Once memstore reaches this size (flush size X multiplier ) , write operations are blocked on this column family until flushes are completed.
9. hbase.regionserver.handler.count
This parameter defines the number of RPC listeners / threads that are spun up to answer incoming requests from users. The default value is 30. Good to keep it higher if more concurrent users are trying to access Hbase , however the value should also be proportional to number of CPU cores and region server heap you have on each region server as each thread consumes some amount memory and CPU cycles.
A rule of thumb is to keep the value low when the payload for each request is large, and keep the value high when the payload is small. Start with a value double the number of cores on the node and increase it as per the requirements further.
More advanced parameters and performance related discussions in my next articles - PART 2, PART3, PART 4, PART5.
... View more
Labels: