Member since
03-16-2016
707
Posts
1753
Kudos Received
203
Solutions
12-26-2016
09:02 PM
2 Kudos
Introduction
h2o is a package for running H2O via its REST API from within R. This package allows the user to run basic H2O commands using R commands. No actual data is stored in the R workspace; and no actual work is carried out by R. R only saves the named objects, which uniquely identify the data set, model, etc. on the server. When the user makes a request, R queries the server via the REST API, which returns a JSON file with the relevant information that R then displays in the console.
Scope
I tested this installation guide on CentOS 7.2, but it
should work on similar RedHat/Fedora/Centos…
Steps
1. Install R
sudo yum install R
2. Install Java
https://www.java.com/en/download/help/linux_x64rpm_install.xml
3. Start R and install dependencies
install.packages(RCurl)
install.packages(bitops)
install.packages(rjson)
install.packages(statmod)
install.packages(tools)
4. Install h20 package and load library for use
install.packages("h2o").
library(h2o)
If this is your first time using CRAN4 it will ask for a
mirror to use. If you want H2O installed site-wide (i.e., usable by all users
on that machine), run R as root, sudo R, then type
install.packages("h2o").
5. Test H2O installation
Type:
library(h2o)
If nothing complains, launch h2o:
h2o.init().
If all went well then you’ll see lots of output about how it
is starting up H2O on your behalf, and then it should tell you all about your
cluster. If not, the error message should be telling you what dependency is
missing, or what the problem is. Post a note to this article and I will get
back to you.
Tips
#1 - The version of H2O on CRAN might be up to a month or two
behind the latest and greatest. Unless you are affected by a bug that you know
has been fixed, don’t worry about it.
#2- h2o.init() will only use two cores on your machine and maybe
a quarter of your system memory, 6 by default. To resize resource, use h2o.shutdown() and start it again:
a) using all your cores:
h2o.init(nthreads = -1)
b) using all your cores and 4 GB:
h2o.init(nthreads = -1, max_mem_size = "4g")
#3 - To run H2O on your local machine, you could call h2o.init without any
arguments, and H2O will be automatically launched at localhost:54321, where the
IP is "127.0.0.1" and the port is 54321.
#4 - If H2O is running on a
cluster, you must provide the IP and port of the remote machine as arguments to
the h2o.init() call. The operation will be done on the server associated with
the data object where H2O is running, not within the R environment. Tutorials
H2O Tutorial on the Hortonworks Data Platform Sandbox:
http://hortonworks.com/blog/oxdata-h2o-tutorial-hortonworks-sandbox/
Walk-Though Tutorials for Web UI:
http://h2o-release.s3.amazonaws.com/h2o/rel-lambert/5/docs-website/tutorial/top.html
... View more
Labels:
12-23-2016
02:59 AM
12 Kudos
Introduction The producer sends data directly to the broker that is the leader for the partition without any intervening routing tier. Optimization Approach Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer will attempt to accumulate data in memory and to send out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 64k or 10 ms). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. This buffering is configurable and gives a mechanism to trade off a small amount of additional latency for better throughput. In order to find the optimal batch size and latency, iterative test supported by producer statistics monitoring is needed. Enable Monitoring Start the producer with the JMX parameters enabled: JMX_PORT=10102 bin/kafka-console-producer.sh --broker-list localhost:9092--topic testtopic Producer Metrics Use jconsole application via JMX at port number 10102. Tip: run jconsole application remotely to avoid impact on broker machine. See metrics in MBeans tab. The <strong>clientId</strong> parameter is the producer client ID for which you want the statistics. <strong>kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=console-producer</strong> This MBean give values for the rate of producer requests taking place as well as latencies involved in that process. It gives latencies as a mean, the 50th, 75th, 95th, 98th, 99th, and 99.9thlatency percentiles. It also gives the time taken to produce the data as a mean, one minute average, five minute average, and fifteen minute average. It gives the count as well. <strong>kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestSize,clientId=console-producer</strong> This MBean gives the request size for the producer. It gives the count, mean, max, min, standard deviation, and the 50th, 75th, 95th, 98th, 99th, and 99.9thpercentile of request sizes. <strong>kafka.producer:type=ProducerStats,name=FailedSendsPerSec,clientId=console-producer</strong> This gives the number of failed sends per second. It gives this value of counts, the mean rate, one minute average, five minute average, and fifteen minute average value for the failed requests per second. <strong>kafka.producer:type=ProducerStats,name=SerializationErrorsPerSec,clientId=console-producer</strong> This gives the number of serialization errors per second. It gives this value of counts, mean rate, one minute average, five minute average, and fifteen minute average value for the serialization errors per second. <strong>kafka.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=console-producer</strong> This gives the number of messages produced per second. It gives this value of counts, mean rate, one-minute average, five-minute average, and fifteen-minute average for the messages produced per second. References
https://kafka.apache.org/documentation.html#monitoring
Apache Kafka Cookbook by Saurabh Minni, 2015
... View more
Labels:
12-20-2016
03:14 AM
13 Kudos
Pre-requisites Hortonworks Data Platform 2.5 on CentOS 7.2 Python distribution that comes with HDP 2.5 - Python 2.7.5 Download and install pip #wget https://bootstrap.pypa.io/get-pip.py
Install add-on package #pip install requests Start Python CLI (default version) #python Import pre-reqs >>>import requests >>>import json >>>import sys Environment Variables Set Ambari domain variable to the IP address or FQDN of your Ambari node. >>>AMBARI_DOMAIN = '127.0.0.1' Set Ambari port, Ambari user and password variables to match your specifics. >>>AMBARI_PORT = '8080' >>>AMBARI_USER_ID = 'admin' >>>AMBARI_USER_PW = 'admin' Set the following variable to the IP address or FQDN of your ResourceManager node. >>>RM_DOMAIN = '127.0.0.1' Set Resource Manager port variable >>>RM_PORT = '8088' Ambari REST API Call Examples Let's find Cluster Name, Cluster Version, Stack and Stack Version: >>>restAPI='/api/v1/clusters' >>>url="http://"+AMBARI_DOMAIN+":"+AMBARI_PORT+restAPI >>>r=requests.get(url, auth=(AMBARI_USER_ID, AMBARI_USER_PW)) >>>json_data=json.loads(r.text) >>>CLUSTER_NAME = json_data["items"][0]["Clusters"]["cluster_name"] >>>print(CLUSTER_NAME) >>>CLUSTER_VERSION =json_data["items"][0]["Clusters"]["version"] >>>print(CLUSTER_VERSION) >>>STACK = CLUSTER_VERSION.split('-')[0] >>>print(STACK) >>>STACK_VERSION = CLUSTER_VERSION.split('-')[1] >>>print(STACK_VERSION) >>>CLUSTER_INFO=json_data >>>print(CLUSTER_INFO) Let's find HDP stack repository: >>>restAPI = "/api/v1/stacks/"+STACK+"/versions/"+STACK_VERSION+"/operating_systems/redhat7/repositories/"+CLUSTER_VERSION >>>url = "http://"+AMBARI_DOMAIN+":"+AMBARI_PORT+restAPI >>>r= requests.get(url, auth=(AMBARI_USER_ID, AMBARI_USER_PW)) >>>json_data=json.loads(r.text) >>>print(json_data) >>>REPOSITORY_NAME=json_data["Repositories"]["latest_base_url"] >>>print(REPOSITORY_NAME) A more elegant approach is to create utility functions. See my repo: https://github.com/cstanca1/HDP-restAPI/ restAPIFunctions.py script in the repo defines a number of useful functions that I have collected over time. Run restAPIFunctions.py The same example presented above can now be implemented with a single line call to get CLUSTER_NAME, CLUSTER_VERSION and CLUSTER_INFO using getClusterVersionAndName() function: >>>CLUSTER_NAME,CLUSTER_VERSION,CLUSTER_INFO = getClusterVersionAndName()
>>>print(CLUSTER_NAME)
>>>print(CLUSTER_VERSION)
>>>print(CLUSTER_INFO) Resource Manager REST API Call Examples >>>RM_INFO=getResourceManagerInfo()
>>>RM_SCHEDULER_INFO=getRMschedulerInfo()
>>>print(RM_INFO)
>>>print(RM_SCHEDULER_INFO) Other Functions These are other functions included in restAPIFunctions.py script getServiceActualConfigurations()
getClusterRepository()
getAmbariHosts()
getResourceManagerInfo()
getRMschedulerInfo()
getAppsSummary()
getNodesSummary()
getServiceConfigTypes()
getResourceManagerMetrics()
getCheckClusterForRollingUpgrades()
... View more
Labels:
12-18-2016
10:17 PM
13 Kudos
Background Tungsten became the default in Spark 1.5 and can be enabled in earlier versions by setting spark.sql.tungsten.enabled to true (or disabled in later versions by setting this to false). Even without Tungsten, Spark SQL uses a columnar storage format with Kryo serialization to minimize storage cost. Goal The goal of Project Tungsten is to improve Spark execution by optimizing Spark jobs for CPU and memory efficiency (as opposed to network and disk I/O which are considered fast enough). Scope Tungsten focuses on the hardware architecture of the platform Spark runs on, including but not limited to JVM, LLVM, GPU, NVRAM, etc. Optimization Features Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly, Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates, Whole-Stage Code Generation (aka CodeGen). Design Improvements Tungsten includes specialized in-memory data structures
tuned for the type of operations required by Spark, improved code generation,
and a specialized wire protocol. Tungsten’s representation is substantially smaller than
objects serialized using Java or even Kryo serializers. As Tungsten does not depend on Java objects, both on-heap
and off-heap allocations are supported. Not only is the format more compact, serialization times can
be substantially faster than with native serialization. Since Tungsten no longer depends on working with Java
objects, you can use either on-heap (in the JVM) or off-heap storage. If you
use off-heap storage, it is important to leave enough room in your containers
for the off-heap allocations - which you can get an approximate idea for from
the web ui. Tungsten’s data structures are also created closely in mind
with the kind of processing for which they are used. The classic example of this is with sorting, a common and
expensive operation. The on-wire representation is implemented so that sorting
can be done without having to deserialize the data again. By
avoiding the memory and GC overhead of regular Java objects, Tungsten is able
to process larger data sets than the same hand-written aggregations. Benefits The following Spark jobs will benefit from Tungsten: Dataframes: Java, Scala, Python, R SparkSQL queries Some RDD API programs via general serialization and compression optimizations Next Steps In the future Tungsten may make it more feasible to use certain non-JVM libraries. For many simple operations the cost of using BLAS, or similar linear algebra packages, from the JVM is dominated by the cost of copying the data off-heap. References: Project
Tungsten: Bringing Apache Spark Closer to Bare Metal High Performance Spark by Holden Karau; Rachel
Warren Slides: Deep Dive Into Project Tungsten - Josh Rosen Video: Deep Dive into Project Tungsten Bringing Spark Closer to Bare Metal -Josh Rosen (Databricks)
... View more
Labels:
12-17-2016
01:33 AM
11 Kudos
Introduction Many organizations have come to rely on Hadoop for dealing with the ever-increasing quantities of data that they gather. Today, it is clear what problems Hadoop can solve, however, cloud is still not the first choice for Hadoop deployment. Pros and cons for Hadoop in the cloud have been shared across multiple blogs and books, but the question is always coming-up in discussions with enterprises considering Hadoop in the Cloud. Thus, I thought it would be useful to collate together a few pros and cons, as well as mention a pragmatic approach to consider a hybrid cloud for organizations that have made significant investments on-prem. For organizations going to Hadoop for the first time, Cloud is probably a better bet especially if they don't have a lot of IT expertise and a great stream of revenue exists and needs to be exploited immediately. Pro Cloud Lack of space. You don’t have space to keep racks of physical servers, along with the necessary power and cooling. Flexibility. It is much easier to reorganize instances, or expand or contract your footprint, for changing business needs. Everything is controlled through cloud provider APIs and web consoles. Changes can be scripted and put into effect manually or even automatically and dynamically based on current conditions. New usage patterns. Cloud providers abstract computing resources such that they are not tied to physical configurations, which means they can be managed in ways that are otherwise impractical. For example, individuals could have their own instances, clusters, and even networks to work with, without much managerial overhead. The overall budget for CPU cores in your cloud provider account can be concentrated in a set of large instances, a larger set of smaller instances, or some mixture, and can even change over time. When an instance malfunctions, instead of troubleshooting what went wrong, you can just tear it down and replace it. Worldwide availability. The largest cloud providers have data centers around the world. You can use resources close to where you work, or close to where your customers are, for the best performance. You can set up redundant clusters, or even entire computing environments, in multiple data centers, so that if local problems occur in one data center, you can shift to working elsewhere. Data retention restrictions. If you have data that is required by law to be stored within specific geographic areas, you can keep it in clusters that are hosted in data centers in those areas. Cloud provider features. Each major cloud provider offers an ecosystem of features to support the core functions of computing, networking, and storage. To use those features most effectively, your clusters should run in the cloud provider as well. Capacity. Very few customers tax the infrastructure of a major cloud provider. You can establish large systems in the cloud that are not nearly as easy to put together, not to mention maintain, on-prem. Pro On-Prem Simplicity. Cloud providers start you off with reasonable defaults, but then it is up to you to figure out how all of their features work and when they are appropriate. It takes a lot of experience to become proficient at picking the right types of instances and arranging networks properly. High levels of control. Beyond the general geographic locations of cloud provider data centers and the hardware specifications that providers reveal for their resources, it is not possible to have exacting, precise control over your cloud architecture. You cannot tell exactly where the physical devices sit, or what the devices near them are doing, or how data across them shares the same physical network1. When the cloud provider has internal problems such as network outages, there’s not much you can do but wait. Unique hardware needs. You cannot have cloud providers attach specialized peripherals or dongles to their hardware for you. If your application requires resources that exceed what a cloud provider offers, you will need to host that part on-prem away from your Hadoop clusters. Saving money. For one thing, you are still paying for the resources you use. The hope is that the economy of scale that a cloud provider can achieve makes it more economical for you to pay to “rent” their hardware than to run your own. You will also still need people who understand system administration and networking to take care of your cloud infrastructure. Inefficient architectures can cost a lot of money in storage and data transfer costs, or instances that are running idle. Best of Both Instead of running your clusters and associated applications completely in the cloud or completely on-prem, the overall system is split between the two - Hybrid Cloud. Data channels are established between the cloud and on-prem worlds to connect the components needed to perform work. Examples Suppose there is a large, existing on-prem data processing system, perhaps using Hadoop clusters, which works well. In order to expand its capacity for running new analyses, rather than adding more on-prem hardware, Hadoop clusters can be created in the cloud. Data needed for the analyses is copied up to the Hadoop clusters where it is analyzed, and the results are sent back on-prem. The cloud clusters can be brought up and torn down in response to demand, which helps to keep costs lower. Assume the management of vast amounts of incoming data that needs to be centralized and processed. To avoid having one single choke point where all of the raw data is sent, a set of cloud clusters can share the load, perhaps each in a geographic location convenient to where the data is generated. These clusters can perform pre-processing of the data, such as cleaning and summarization, to reduce the work that the final centralized system must perform. References Moving Hadoop to the Cloud by Bill Havanki, published by O'Reilly Media, Inc., 2017.
... View more
Labels:
11-24-2016
02:10 AM
10 Kudos
Behavior The number of cells returned to the client are
normally filtered based on the table configuration; however, when using the RAW
=> true parameter, you can retrieve all of the versions kept by HBase, unless
there was a major compaction or a flush to disk event in meanwhile. Demonstration Create a table with a single column family: create 't1', 'f1' Configure it to retain a maximum version count of 3: alter 't1',NAME=>'f1',VERSIONS=>3 Perform 4 puts: put 't1','r1','f1:c1',1
put 't1','r1','f1:c1',2
put 't1','r1','f1:c1',3
put 't1','r1','f1:c1',4 Scan with RAW=>true. I used VERSIONS as 100 for a catch-all. It could have been anything greater than 3 (number of versions set previously). Unless specified, only the latest version is returned by scan command. scan 't1',{RAW=>true,VERSIONS=>100} The above scan returns all four versions. ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479950685181, value=4
r1 column=f1:c1,timestamp=1479950685155, value=3
r1 column=f1:c1,timestamp=1479950685132, value=2
r1 column=f1:c1,timestamp=1479950627736, value=1 Flush to disk: flush ‘t1’ Then scan: scan 't1',{RAW=>true,VERSIONS=>100} Three versions are returned. ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479952079260, value=4
r1 column=f1:c1,timestamp=1479952079234, value=3
r1 column=f1:c1,timestamp=1479952079209, value=2
Do four more puts: put 't1','r1','f1:c1',5
put 't1','r1','f1:c1',6
put 't1','r1','f1:c1',7
put 't1','r1','f1:c1',8
Flush to disk: flush ‘t1’ Scan: scan 't1',{RAW=>true,VERSIONS=>100} Six versions are returned: ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479952349970, value=8
r1 column=f1:c1,timestamp=1479952349925, value=7
r1 column=f1:c1,timestamp=1479952349895, value=6
r1 column=f1:c1,timestamp=1479952079260, value=4
r1 column=f1:c1,timestamp=1479952079234, value=3
r1 column=f1:c1,timestamp=1479952079209, value=2
Force major compaction: major_compact ‘t1’ Scan: scan 't1',{RAW=>true,VERSIONS=>100} Three versions are returned: ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479952349970, value=8
r1 column=f1:c1,timestamp=1479952349925, value=7
r1 column=f1:c1,timestamp=1479952349895, value=6
Conclusion When deciding the number of versions to retain,
it is best to treat that number as the minimum version count available at a
given time and not as a given constant. Until a flush to disk and a major
compaction, number of versions available is higher than the configured for the
table.
... View more
Labels:
11-16-2016
09:59 PM
11 Kudos
Introduction This article is a continuation of Geo-spatial
Queries with Hive Using ESRI Geometry Libraries article published a few months ago. Objective Demonstrate how to use Hive context and invoke built-in ESRI UDFs for Hive from Spark SQL. Pre-requisites
HDP 2.4.2 Steps documented on Geo-spatial
Queries with Hive Using ESRI Geometry Libraries Steps 1. Launch spark-shell with --jars as its parameter: spark-shell --jars /home/spark/esri/esri-geometry-api.jar,/home/spark/esri/spatial-sdk-hive-1.1.1-SNAPSHOT.jar
I placed the dependency jars to
/home/spark/esri path, but you can store them in hdfs or local filesystem and grant proper privileges to your spark user. 2.
Instantiate sqlContext: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); 3.
From spark-shell, define temporary functions: sqlContext.sql("""create temporary function st_point as 'com.esri.hadoop.hive.ST_Point'""");
sqlContext.sql("""create temporary function st_x as 'com.esri.hadoop.hive.ST_X'"""); 4. From
spark-shell, invoke your UDF: sqlContext.sql("""from geospatial.demo_shape_point select st_x(st_point(shape))""").show; Note: geospatial is the Hive database where demo_shape_point table was created Conclusion The Esri Geometry API for Java and
the Spatial
Framework for Hadoop could be used by developers building geometry
functions for various geo-spatial applications using also Spark, not only Hive.
... View more
Labels:
11-03-2016
07:59 PM
13 Kudos
Introduction This is a continuation of Apache Storm Tuning Approach for Squirrels article that I published couple weeks ago. A topology is going to have to coexist on a
Storm cluster with a variety of other topologies. Some of those topologies will
burn CPU doing heavy calculations, others will consume large amounts of network
bandwidth. No one sane can claim to provide a silver
bullet for how to set up your cluster for the best performance. However, I’d
like to share a few recipes and guidelines for dealing with issues as they
arise. I listed below 6 categories of contention, describing common problems and strategies to alleviate or eliminate resource contention. I hope that this article at least provides a good map for a deep dive onto Storm tuning. Each resource contention is probably worth it an article. Worker Processes in a Cluster A Storm cluster is installed with a fixed number of available worker processes across all worker nodes. Each time you deploy a new topology to the cluster, you specify how many worker processes that topology should consume. The number of worker processes a topology requests is specified in the code for building and submitting your topology to the Storm cluster. It is possible that you could deploy a topology that requires a certain number of worker processes but you can’t acquire the needed worker processes because they’ve all been assigned to existing topologies. This problem is easy to detect; it can be found by looking at the cluster summary page of the Storm UI and identifying the free slots. Slots correspond to worker processes. It’s important to always be aware of the resources available in your cluster when deploying new topologies. If you ignore what’s available within your cluster, you can easily affect every topology in your cluster by deploying something that consumes too many resources. Let’s assume that you notice a topology isn’t processing any data or has a sudden drop in throughput and zero free slots are available.You have a fixed number of worker processes that can be allocated to the topologies requesting them. You can address this problem with these strategies:
Decreasing the number of worker processes in use by existing topologies Increasing the total number of worker processes in the cluster Decreasing the number of worker processes in use by existing topologies is the quickest and easiest way to free up slots for other topologies in your cluster. But this may or may not be possible depending on the SLAs for your existing topologies. If you can reduce the number of worker processes being used by a topology without violating the SLA, then go for it. If your SLAs don’t allow you to reduce the number of slots being used by any of the topologies in your cluster, you’ll have to add new worker processes to the cluster. There are two ways to increase the total number of worker processes in the cluster. One is by adding more worker processes to your worker nodes. But this won’t work if your worker nodes don’t have the resources to support additional JVMs. If this is the case, you’ll need to add more worker nodes to your cluster, thus adding to the pool of worker processes. Adding new worker nodes has the least impact on existing topologies, because adding worker processes to existing nodes has the potential to cause other types of contention that must then be addressed. Topology Worker Nodes and Processes Let’s assume that you have a problematic topology and need to identify the worker nodes and worker processes that topology is executing on. The way to do this is by looking at the Storm UI. You could start checking out the Bolts section to see if anything looks amiss. Having identified the problematic bolt, you now want to see more details about what’s happening with that bolt. To do so, click on that bolt’s name in the UI to get a more detailed view for that bolt. From here, turn your attention to the Executors and Errors section for the individual bolt. The Executors section for an individual bolt is of particular interest; this tells you which worker nodes and worker processes the bolt is executing on. From here, given the type of contention being experienced, you can take the necessary steps to identify and solve the problem at hand. Though a great tool, the Storm UI may not always show you what you need. This is where additional monitoring can help. This can come in the form of monitoring the health of individual worker nodes or custom metrics in your bolt’s code to give you a deeper insight into how well the bolt is performing. The bottom line here is you shouldn’t rely solely on the Storm UI. Put other measures in place to make sure you have coverage everywhere. Now that you identified the contention, you'd like to change the number of worker processes running on a worker node. The number of worker processes running on a worker node is defined by thesupervisor.slots.portsproperty in each worker node’s storm.yaml configuration file. This property defines the ports that each worker process will use to listen for messages. To increase the number of worker processes that can be run on a worker node, add a port to this list for each worker process to be added. The opposite holds true for decreasing the number of worker processes: remove a port for each worker process to be removed. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. Upon restarting, Nimbus will be aware of the updated configuration and send messages to only the ports defined in this list. Another thing to consider is the number of worker nodes you have in your cluster. If widespread changes are needed, updating the configuration and restarting the Supervisor process across hundreds or even tens of nodes is a tedious and time-consuming task. So try to use tools like Puppet, Chef, Ansible. Worker Process Memory Just as you install
a Storm cluster with a fixed number of worker processes, you also set up each
worker process (JVM) with a fixed amount of memory it can grow to use. The
amount of memory limits the number of threads (executors) that can be launched
on that JVM—each thread takes a certain amount of memory (the default is 1 MB on
a 64-bit Linux JVM). JVM contention can
be a problem on a per-topology basis. The combination of memory used by your
bolts, spouts, threads, and so forth might exceed that allocated to the JVMs
they’re running on. JVM contention
usually manifests itself as out-of-memory (OOM) errors and/or excessively long
garbage collection (GC) pauses. OOM errors will appear in the Storm logs and
Storm UI, usually as a stack trace starting withjava.lang.OutOfMemory-Error: Java heap space. Gaining visibility
into GC issues requires a little more setup, but it’s something that’s easily
supported by both the JVM and Storm configuration. The JVM offers startup
options for tracking and logging GC usage, and Storm provides a way to specify
JVM startup options for your worker processes. Theworker.childoptsproperty in storm.yaml is where you’d specify these
JVM options. One interesting item to note is the value for
the –Xloggc setting.
Remember you can have multiple worker processes per worker node. The worker.childopts property
applies to all worker processes on a node, so specifying a regular log filename
would produce one log file for all the worker processes combined. A separate
log file per worker process would make tracking GC usage per JVM easier. Storm
provides a mechanism for logging a specific worker process; the ID variable is
unique for each worker process on a worker node. Therefore, you can add a "%ID%" string to
the GC log filename and you’ll get a separate GC log file for each worker
process. Let’s assume that your
spouts and/or bolts are attempting to consume more memory than what has been
allocated to the JVM, resulting in OOM errors and/or long GC pauses. You can address the
problem in a couple of ways:
By increasing the number
of worker processes being used by the topology in question
By increasing the size
of your JVMs By adding a worker
process to a topology, you’ll decrease the average load across all worker
processes for that topology. This should result in a smaller memory footprint
for each worker process (JVM), hopefully eliminating the JVM memory contention. Because increasing
the size of your JVMs could require you to change the size of the machines/VMs
they’re running on, we recommend the “increase worker processes” solution if
you can. Swapping and
balancing memory across JVMs has been one of our biggest challenges with Storm.
Different topologies will have different memory usage patterns. Don’t worry if
you don’t get it right initially. This is a never-ending process as the shape
of your cluster and topologies changes. Beware when
increasing the memory allocated to a JVM; as a rule of thumb, when you cross
certain key points you’ll notice a change in how long garbage collection
takes—500 MB, 1 GB, 2 GB, and 4 GB are all around the points when our GC time
has taken a jump. It’s more art than science, so bring your patience with you.
There’s nothing more frustrating than addressing OOM issues by increasing JVM
memory size only to have it noticeably impact GC times. The amount of memory allocated to all worker processes (JVMs) on a worker node can be changed in theworker.childoptsproperty in each worker node’s storm.yaml configuration file. This property accepts any valid JVM startup option, providing the ability to set the startup options for the initial memory allocation pool (-Xms) and maximum memory allocation pool (-Xmx) for the JVMs on the worker node. Changing this property will update all the worker processes on a particular worker node. Make sure that your worker node has enough resources for the memory increase. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. From my old days of J2EE applications tuning, I still recommend setting –Xms and –Xmx to the same value to eliminate heap management overhead. Along with being more efficient, this strategy adds the benefit of making it easier to reason about JVM memory usage, because the heap size is a fixed constant for the life of the JVM. Worker Node Memory Much like how an individual JVM has a limited amount of available memory, so does a worker node as a whole. In addition to the memory needed to run your Storm worker processes (JVMs), you need memory to run the Supervisor process and any other processes on your worker node without swapping. A worker node has a fixed amount of memory that’s being used by its worker processes along with any other processes running on that worker node. If a worker node is experiencing memory contention, that worker node will be swapping.Swapping is the little death and needs to be avoided if you care about latency and throughput. This is a problem when using Storm; each worker node needs to have enough memory so that the worker processes and OS don’t swap. If you want to maintain consistent performance, you must avoid swapping with Storm’s JVMs. One way to keep an eye on this in Linux is using System Activity Reporter. Use sar -S for reporting swap space utilization statistics. If you run a single worker process per worker node, it’s impossible to run into contention between workers on that node. This can make maintaining consistent performance within a cluster much easier. We know of more than one development team that has opted for this approach. If possible, we advise you to seriously consider going this route. This is a nonstarter if you aren’t running in a virtualized environment. The cost is simply too high to do this if you’re running on “bare metal” with a single OS instance per physical machine. Within a virtualized environment, you’ll use more resources by doing this. Assume for a moment that your OS install requires n GB of disk space and uses 2 GB of memory to run effectively. If you have eight workers running on your cluster and you assign four workers per node, you’d use n* 2 GB of disk and 4 GB of memory to run the OS on your cluster nodes. If you were to run a single worker per node, that would skyrocket to n* 8 GB of disk and 16 GB of memory. That’s a fourfold increase in a rather small cluster. Imagine the additional usage that would result if you had a cluster that was 16, 32, 128, or more nodes in size. If you’re running in an environment such as Amazon Web Services (AWS) where you pay per node, the costs can add up quickly. Therefore, we suggest this approach only if you’re running in a private virtualized environment where the cost of hardware is relatively fixed and you have disk and memory resources to spare. Let’s assume that your worker node is swapping due to contention for that node’s memory. Here are a few options: Increase the memory available to each worker node. This would mean giving more memory to the physical machine or VM, depending on how you configured your cluster. Lower the collective memory used by worker processes. Lowering the collective memory used by worker processes can be done in one of two ways. The first is by reducing the number of worker processes per worker node. Reducing the total number of worker processes will lower the overall memory footprint of the combined remaining processes. The second way is by reducing the size of your JVMs. Be careful when lowering memory allocated to existing JVMs, though, to avoid introducing memory contention within the JVM. One safe solution is to always go the route of increasing the memory available to each machine. It’s the simplest solution and its resulting ramifications are the easiest to understand. If you are tight on memory, lowering memory usage can work, but you open yourself up to all the problems we discussed concerning GC and OOM on a per-JVM basis. Worker Node CPU Worker node CPU
contention occurs when the demand for CPU cycles outstrips the amount available.
This is a problem when using Storm and is one of the primary sources of
contention in a Storm cluster. If your Storm topology’s throughput is lower
than what you expect it to be, you may want to check the worker node(s) running
your topology to see if CPU contention exists. One way to keep an
eye on this in Linux is with the sar -u command for displaying real-time CPU usage of all
CPUs. Let’s assume that the throughput of your topologies is low, and based
on running the sar command, you see that CPU contention exists. To address the
problem, you have the following options:
Increasing the number of
CPUs available to the machine. This is only possible in a virtualized
environment.
Upgrading to a more
powerful CPU (Amazon Web Services (AWS) type of environment).
Spreading the JVM load
across more worker nodes by lowering the number of worker processes per
worker node. To spread worker
process (JVM) load across more worker nodes, you need to reduce the number of
worker processes running on each worker node. Reducing the number of worker
processes per worker node results in less processing (CPU requests) being done
on each worker node. There are two scenarios you may find yourself in when
attempting this solution. The first is you have unused worker processes in your
cluster and can therefore reduce the number of worker processes on your
existing nodes, thus spreading the load. The second scenario
is where you don’t have any unused worker processes and therefore need to add
worker nodes in order to reduce the number of worker processes per worker node. Reducing the number
of worker processes per worker node is a good way to reduce the number of CPU
cycles being requested on each node. You just need to be aware of what
resources are available and in use and act appropriately in your given
scenario. Worker Node I/O I/O contention on a
worker node can fall under one of two categories:
Disk I/O contention,
reading from and writing to the file system
Network/socket I/O
contention, reading from and writing to a network via a socket Both types of
contention are regularly an issue for certain classes of Storm topologies. The
first step in determining if you’re experiencing either of these contentions is
to establish whether a worker node is experiencing I/O contention in general.
Once you do, you can dive down into the exact type of I/O contention your
worker node is suffering from. One way to
determine if a worker node in your cluster is experiencing I/O contention is by
running the sar -u command for displaying real-time CPU usage. A healthy topology that uses a lot of I/O
shouldn’t spend a lot of time waiting for the resources to become available.
That’s why we use 10.00% as the threshold at which you start experiencing
performance degradation. If you know what topologies are running on a
given worker node, you know that they use a lot of network resources or disk
I/O, and you see iowait problems, you can probably safely assume which of the two
is your issue. If you’re seeing troubling I/O contention signs, first attempt
to determine if you’re suffering from socket/network I/O contention. If you
aren’t, assume that you’re suffering from disk I/O contention. Although this
might not always be the case, it can take you a long way as you learn the tools
of the trade. If your topologies
interact over a network with external services, network/socket I/O contention
is bound to be a problem for your cluster. In our experience, the main cause
for this type of contention is that all of the ports allocated for opening
sockets are being used. Most Linux installs
will default to 1024 maximum open files/sockets per process. In an
I/O-intensive topology, it’s easy to hit that limit quickly. To determine the
limits of yourOS, you can
examine the /proc filesystem to check your processes limits. In order to do
this, you’ll first need to know your process ID. Once you do that, you can get
a listing of all limits for that process. Start by getting the PID (ps aux
| grep MyTopologyName) and then check your process limits from the /proc
filesystem. If you’re hitting
this limit, the Storm UI for your topology should display an exception in the
“Last Error” column that the max open files limit has been reached. This will
most likely be a stack trace starting with java.net.SocketException: Too
many open files. Let’s assume that your topology is experiencing reduced throughput or no throughput
at all, and you’re seeing errors for hitting the limit of open sockets. A couple of ways to
address this problem are as follows:
Increasing the number of
available ports on the worker node
Adding more worker nodes
to the cluster For increasing the
number of available ports, you’ll need to edit the/etc/security/limits.conf file on most Linux distributions. These settings
will set the hard and soft limit on open files per user. The value we’re
concerned with as a Storm user is the soft limit. I don’t advise going higher
than 128k. If you do, then as a rule of thumb (until you learn more about
soft/hard limits for number of files open on Linux), I suggest setting the hard
limit to two times the value of the soft limit. Note that you need super-user
access to change limits.conf and you’re going to need to reboot the system to
make sure they take effect. Increasing the
number of worker nodes in the cluster will give you access to more ports. If
you don’t have the resources to add more physical machines or VMs, you’ll have
to try the first solution. The first real contention
issue was the number of sockets available per machine. Don’t add more workers
on other machines until you’ve increased available sockets on each node as much
as you can. Once you’ve done that, you should also look at your code. Are you opening and
closing sockets all the time? If you can keep connections open, do that.
There’s this wonderful thing calledTCP_WAIT. It’s where a TCP connection will stay open after
you close it waiting for any stray data. If you’re on a slow network link (like
many were when TCP was first designed), this is a wonderful idea that helps
prevent data corruption. If you’re on a fast modern LAN, it’ll drive you
insane. You can tune your TCP stack via various OS-specific means to lower how
long you linger inTCP_WAIT, but when you’re making tons of network calls, even
that won’t be enough. Be smart: open and close connections as little as
possible. Disk I/O contention
affects how quickly you can write to disk. This could be a problem with Storm
but should be exceedingly rare. If you’re writing large volumes of data to your
logs or storing the output of calculations to files on the local filesystem, it
might be an issue, but that should be unlikely. If you have a
topology that’s writing data to disk and notice its throughput is lower than
what you’re expecting, you should check to see if the worker nodes it’s running
on are experiencing disk I/O contention. For Linux installations, you can run a
command called iotop to get a view of the disk I/O usage for the worker
nodes in question. This command displays a table of current I/O usage by processes/threads
in the system, with the most I/O-intensive processes/threads listed first. Let’s assume that you have a topology that reads/writes to/from disk, and it looks
like the worker nodes it’s running on are experiencing disk I/O contention. To address this
problem
Write less data to disk.
This can mean cutting back within your topology. It can also mean putting
fewer worker processes on each worker node if multiple worker processes
are demanding disk on the same worker node.
Get faster disks. This
could include using a RAM disk.
If you’re writing to NFS
or some other network filesystem, stop immediately. Writing to NFS is slow
and you’re setting yourself up for disk I/O contention if you do. If you’re writing
large amounts of data to disk and you don’t have fast disks, you’re going to
have to accept it as a bottleneck. There’s not much you can do without throwing
money at the problem. Summary
Several types of
contention exist above the topology level, so it’s helpful to be able to
monitor things like CPU, I/O, and memory usage for the operating system
your worker nodes are running on.
It is important to have
some level of familiarity with monitoring tools for the operating system
of the machines/VMs in your cluster. In Linux, these include sar, netstat, and iotop.
There’s value in knowing
common JVM startup options, such as -Xms, -Xmx, and those related to GC logging.
Although the Storm UI is
a great tool for the initial diagnosis of many types of contention, it’s
smart to have other types of monitoring at the machine/VM level to let you
know if something is awry.
Including custom
metrics/monitoring in your individual topologies will give you valuable
insights that the Storm UI may not be able to.
Be careful when
increasing the number of worker processes running on a worker node because
you can introduce memory and/or CPU contention at the worker node level.
Be careful when
decreasing the number of worker processes running on a worker node because
you can affect your topologies’ throughput while also introducing contention
for worker processes across your cluster. References Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew Jankowski, and Peter Pathirana
... View more
Labels:
10-21-2016
09:11 PM
15 Kudos
Introduction How many times we've heard that a problem can't be solved if it isn't
completely understood? However, all of us, even
when having a lot of experience and technical skill, we still tend to jump to the
solution workspace by-passing key steps in the scientific approach. We just don’t have enough patience and, quite often, the business is on our back to deliver a quick fix. We just want to get it done! We found
ourselves like that squirrel in the Ice Age chasing for the acorn just using our
experience and minimal experiment, dreaming that the acorn will fall on
our lap. This article is meant to re-emphasize a repeatable approach that can be applied to tune Storm
topologies starting from understanding the problem,
forming a hypothesis, testing that hypothesis, collecting the data, analyzing
the data, drawing conclusions based on facts and less on empirical. Hopefully it helps to catch more acorns! Understand the Problem Understand the functional problem
resolved by the topology; quite often the best tuning is executing a step
differently in the workflow for the same end-result, maybe apply a filter first
and then execute the step, rather executing the step forever and paying the penalty every time. You can also
prioritize success or learn to lose in order to win. This is the time also to
get some sense of business SLA and forecasted growth. Document the SLA by topology. Try to understand the reasons behind the SLA and identify opportunities for trade-offs. Define SLA for Business Success This is the time also to get some sense of business SLA and forecasted growth. Document the SLA by topology. Try to understand the reasons behind the SLA and identify opportunities for trade-offs. Gather Data Gather
data using Storm UI and other tools specific to the environment. Storm UI is your
first to go-to tool for tuning, most of the time. Another
tool to use is Storm’s builtin Metrics-Collecting API. This is available with
0.9.x. This helps to built-in metrics in your topology. You may have tuned it,
deployed the production, everybody is happy and two days later the issue comes
back. Wouldn’t be nice to have some metrics embedded on your code, already
focused on the usual suspects, e.g. external calls to a web service or a SQL to
your lookup data store, specific customers or some other strong business entity
that due to data and the workflow can become the bottleneck? In a storm topology, you have
spouts, bolts, workers that can play a role in your topology challenges. Learn about
topologies status, uptime, number of workers, executors, tasks, high-level
stats across four time windows, spout statistics, bolt statistics,
visualization of the spouts and the bolts and how they are connected, the flow
of tuples between all of the streams, all configurations for the topology. Define Baseline Numbers Document statistics about how
the topology performs in production. Setup a test environment and
try that topology with different loads aligned to realistic business SLAs. Tuning Options vs. Bottleneck
Increase parallelism (use
rebalance command), analyze
the change in capacity using the same Storm UI – if no improvement, focus on
the bolt, that is the likely bottleneck; if you see backlog in
your upstream (Kafka most likely), focus on the spout. After trying with spouts
and bolts, change focus on workers’ parallelism. Basic principle is to scale on
a single worker with executors until you find increasing executors does not
work anymore. If adjusting spout and bolt parallelism failed to provide additional benefits, play with the number of workers to see if we are now bound by the JVM we were running on and needed to parallelize across JVMs. If you still don’t meet the
SLA by tuning the existent spouts, bolts, and workers, it’s time to start controlling
the rate that flows into topology.
Max spout pending allows you to set a maximum number of tuples
that can be unacked at any given time. If the number of possible unacked tuples is lower than the total
parallelism you’ve set for your topology, then it could be a bottleneck. The goal
is with one or many spouts to assure that the maximum possible unacked tuples is
greater than the maximum number of tuples we can process based on our
parallelization so we can feel safe saying that max spout pending isn’t causing
a bottleneck. Without Max spout pending, tuples will continue to flow into your topology whether or
not you can keep up with processing them. Max spout pending allows to control the
ingest rate. Max spout
pending lets us erect a dam in front of our topology, apply back pressure, and
avoid being overwhelmed. Despite the optional nature of max spout pending, you
always should set it. When attempting to increase
performance to meet an SLA, increase the rate of data ingest by either
increasing spout parallelism or increasing the max spout pending. Assuming a 4x
increase in the maximum number of active tuples allowed, we’d expect to see the
speed of messages leaving our queue increase (maybe not by a factor of four,
but it’d certainly increase). If that caused the capacity metric for any of the
bolts to return to one or near it, tune the bolts again and repeat with the
spout and bolt until hit the SLA. These methods can be applied over and over until meet the
SLAs. This effort is high and automating steps is desirable. Deal with External Systems It’s easy when interacting with external services (such as a SOA
service, database, or filesystem) to ratchet up the parallelism to a high
enough level in a topology that limits in that external service keep your
capacity from going higher. Before you start tuning parallelism in a topology
that interacts with the outside world, be positive you have good metrics on
that service. You could keep turning up the parallelism on bolt to the point
that it brings the external service to its knees, crippling it under a mass of
traffic that it’s not designed to handle. Latency Let’s talk about one of the greatest enemies of fast code:
latency. There’s latency accessing local memory and hard drive, and accessing
another system over the network. Different interactions have different levels
of latency, and understanding the latency in your system is one of the keys to
tuning your topology. You’ll usually get fairly consistent response times and
all of the sudden those response times will vary wildly because of any number
of factors:
The
external service is having a garbage collection event.
A
network switch somewhere is momentarily overloaded.
Your
coworker wrote a runaway query that’s currently hogging most of the
database’s CPU. Something about
the data that’s likely to cause the delay. As the topology interacts with external services, let’s be smarter about our latency. We don't always need to increase parallelism and allocate more resources. We may discover after investigation that your variance is based
on customer or induced by an exceptional event, e.g. elections, Olympics, etc. Certain records
are going to be slower to look up. Sometimes one of those
“fast” lookups might end up taking longer. One
strategy that has worked well with services that exhibit this problem is to
perform initial lookup attempts with a hard ceiling on timeouts, try various
ceilings, if that fails, send it to a less parallelized instance of the same
bolt that will take longer with its timeout. The end result is that the time to
process a large number of messages goes down. Your mileage
will vary with this strategy, so test before you use it. As part of your tuning
strategy, you could end-up to break a bolt into two: one for fast lookups and
the other for slow lookups. At least, what goes fast let it go fast, don’t back
it up with a slow one. Treat the slow ones differently and make sure they are
the exception. You may even decide to address those differently and still get
some value of that data. It is up-to you to determine what is more important:
having an overall bottleneck or mark for later processing or disregard those producing a
bottleneck. Maybe a batch approach for those is more efficient. This is a
design consideration and a reasonable trade-off most of the time. Are you willing
to accept the occasional loss of fidelity but still hit the SLA because that is
more important for your business? Is Perfection helping or killing your business? Summary
All basic timing information for a topology can be found in the
Storm UI. Metrics (both built-in and custom) are essential if you want to
have a true understanding of how your topology is operating. Establishing a baseline
set of performance numbers for your topology is the essential first step
in the tuning process. Bottlenecks are
indicated by a high capacity for a spout/bolt and can be addressed by
increasing parallelism. Increasing parallelism
is best done in small increments so that you can gain a better
understanding of the effects of each increase. Latency that is both
related to the data (intrinsic) and not related to the data (extrinsic)
can reduce your topology’s throughput and may need to be addressed. Be ready for trade-offs
References Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew
Jankowski, and Peter Pathirana
... View more
Labels:
10-21-2016
12:06 AM
13 Kudos
Sizing There's no simple rule of thumb for this, it's as much an art as it is a science, as it depends on the workloads and how chatty they are with your current ZKs. One way to look at this is: If you have 3 ZKs you can
afford to lose one, if you have 5 you can afford to lose two. If your IT is aggressively
applying security patches and other upgrades, like firmware, kernel, Java,
other packages used by Hadoop tools, and taking nodes down to do the job, then
during those upgrades with 3 ZKs, you ZK runs with only two nodes, and if you
are unlucky and one of them goes down, then your whole cluster will go down.
So, in this case 5 are better. Warning: the more ZK nodes you have, the slower
the ZK becomes for writes. Placement Zookeeper is a master node, as such it can be collocated with
other master services. Ideally, you would not want to collocate it with an HA service. It is quite light on memory and CPU requirements, but since is disk intensive, don't collocate it with disk-intensive services like Kafka or HDFS. Storage Requirements In general, Zookeeper doesn't actually require huge drives because it will only store metadata information for many services, It is common to use 100G to 250G for zookeeper data directory and logs which is fine of many cluster deployments. Moreover, it is recommended to set configuration for automatic purging policy of snapshots and logs directories so that it doesn't end up by filling all the local storage. Dedicated or Shared? At Yahoo!, ZooKeeper is usually deployed on
DEDICATED RHEL boxes, with dual-core processors, 2GB of RAM, and 80GB IDE hard
drives. For your
Kafka/Storm cluster, you could consider deploying ZK on DEDICATED physical
hardware (not virtual). The driving force for physical hardware or at
least for the dedicated disk is the transaction log and the high throughput
nature of Kafka and Storm. Since Kafka is usually used with Storm, have a separate
Zookeeper cluster for Kafka and Storm. Kafka and Storm are sharing then, please make
sure that you don’t put the Zookeeper cluster on the Kafka nodes. Put
the Zookeeper on the Storm nodes. Caution
Rather than going to larger clusters of ZKs, it is better to split out certain services to their own ZKs when they're putting more pressure on an otherwise fairly quiet ZK cluster. It is a good thing to have separate set of ZK for each cluster, one quorum for Kafka, one quorum for Storm, one quorum for the rest (YARN, HBase, Hive, HDFS), possibly separate zookeepers for HBase. Challenge is that more hardware is needed and more administration, but it pays off. Be careful where you put that transaction log. The most performance-critical part of ZooKeeper is the transaction log. ZooKeeper must sync transactions to media before it returns a response. A dedicated transaction log device is key to consistent good performance. Putting the log on a busy device will adversely impact performance. If you only have one storage device, put trace files on NFS and increase the snapshotCount; it doesn't eliminate the problem, but it can mitigate it. ZooKeeper's transaction log must be on a dedicated device. A dedicated partition is not enough. ZooKeeper writes the log sequentially, without seeking. Sharing your log device with other processes can cause seeks and contention, which in turn can cause multi-second delays. Do not put ZooKeeper in a situation that can cause a swap. In order for ZooKeeper to function with any sort of timeliness, it simply cannot be allowed to swap. Remember, in ZooKeeper, everything is ordered, so if one request hits the disk, all other queued requests hit the disk. Going to disk unnecessarily will almost certainly degrade your performance unacceptably. Therefore, make certain that the maximum heap size given to ZooKeeper is not bigger than the amount of real memory available to ZooKeeper. Set your Java max heap size correctly. To avoid swapping, try to set the heapsize to the amount of physical memory you have, minus the amount needed by the OS and cache. The best way to determine an optimal heap size for your configurations is to run load tests. If for some reason you can't, be conservative in your estimates and choose a number well below the limit that would cause your machine to swap. For example, on a 4G machine, a 3G heap is a conservative estimate to start with. Best Practices
The ZooKeeper data directory contains the snapshot and transactional log files. It is a good practice to periodically clean up the directory if the auto-purge option is not enabled. Also, an administrator might want to keep a backup of these files, depending on the application needs. However, since ZooKeeper is a replicated service, we need to back up the data of only one of the servers in the ensemble. ZooKeeper uses Apache log4j as its logging infrastructure. As the logfiles grow bigger in size, it is recommended that you set the auto-rollover of the logfiles using the in-built log4j feature for ZooKeeper logs. The list of ZooKeeper servers used by the clients in their connection strings must match the list of ZooKeeper servers that each ZooKeeper server has. Strange behaviors might occur if the lists don't match. The server lists in each Zookeeper server configuration file should be consistent with the other members of the ensemble. The ZooKeeper transaction log must be configured in a dedicated device. This is very important to achieve best performance from ZooKeeper. The Java heap size should be chosen with care. Swapping should never be allowed to happen in the ZooKeeper server. It is better if the ZooKeeper servers have a reasonably high memory (RAM). System monitoring tools such as vmstat can be used to monitor virtual memory statistics and decide on the optimal size of memory needed, depending on the need of the application. In any case, swapping should be avoided. References: https://community.hortonworks.com/questions/2498/best-practices-for-zookeeper-placement.html https://community.hortonworks.com/questions/53025/zookeeper-performance-and-metrics-when-to-resize.html https://community.hortonworks.com/questions/55868/zookeeper-on-even-master-nodes.html Apache ZooKeeper Essentials by Saurav Haloi Published by Packt Publishing, 2015
... View more