Member since
03-16-2016
707
Posts
1753
Kudos Received
203
Solutions
11-22-2016
10:00 PM
@ambud.sharma Could you develop more on point #4? Latency measures the amount of time between the start of an action and its completion, throughput is the total number of such actions that occur in a given amount of time. I doubt that lower latency is responsible for throughput decrease in a direct correlation. There is more to it. Based on #4, NYSE trading is better running in a batch mode. I believe you forget some other factor in the correlation, for example RESOURCES as a given constant, or single tuple overhead, etc. Please elaborate to increase the value of this article.
... View more
11-16-2016
09:59 PM
11 Kudos
Introduction This article is a continuation of Geo-spatial
Queries with Hive Using ESRI Geometry Libraries article published a few months ago. Objective Demonstrate how to use Hive context and invoke built-in ESRI UDFs for Hive from Spark SQL. Pre-requisites
HDP 2.4.2 Steps documented on Geo-spatial
Queries with Hive Using ESRI Geometry Libraries Steps 1. Launch spark-shell with --jars as its parameter: spark-shell --jars /home/spark/esri/esri-geometry-api.jar,/home/spark/esri/spatial-sdk-hive-1.1.1-SNAPSHOT.jar
I placed the dependency jars to
/home/spark/esri path, but you can store them in hdfs or local filesystem and grant proper privileges to your spark user. 2.
Instantiate sqlContext: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); 3.
From spark-shell, define temporary functions: sqlContext.sql("""create temporary function st_point as 'com.esri.hadoop.hive.ST_Point'""");
sqlContext.sql("""create temporary function st_x as 'com.esri.hadoop.hive.ST_X'"""); 4. From
spark-shell, invoke your UDF: sqlContext.sql("""from geospatial.demo_shape_point select st_x(st_point(shape))""").show; Note: geospatial is the Hive database where demo_shape_point table was created Conclusion The Esri Geometry API for Java and
the Spatial
Framework for Hadoop could be used by developers building geometry
functions for various geo-spatial applications using also Spark, not only Hive.
... View more
Labels:
11-03-2016
07:59 PM
13 Kudos
Introduction This is a continuation of Apache Storm Tuning Approach for Squirrels article that I published couple weeks ago. A topology is going to have to coexist on a
Storm cluster with a variety of other topologies. Some of those topologies will
burn CPU doing heavy calculations, others will consume large amounts of network
bandwidth. No one sane can claim to provide a silver
bullet for how to set up your cluster for the best performance. However, I’d
like to share a few recipes and guidelines for dealing with issues as they
arise. I listed below 6 categories of contention, describing common problems and strategies to alleviate or eliminate resource contention. I hope that this article at least provides a good map for a deep dive onto Storm tuning. Each resource contention is probably worth it an article. Worker Processes in a Cluster A Storm cluster is installed with a fixed number of available worker processes across all worker nodes. Each time you deploy a new topology to the cluster, you specify how many worker processes that topology should consume. The number of worker processes a topology requests is specified in the code for building and submitting your topology to the Storm cluster. It is possible that you could deploy a topology that requires a certain number of worker processes but you can’t acquire the needed worker processes because they’ve all been assigned to existing topologies. This problem is easy to detect; it can be found by looking at the cluster summary page of the Storm UI and identifying the free slots. Slots correspond to worker processes. It’s important to always be aware of the resources available in your cluster when deploying new topologies. If you ignore what’s available within your cluster, you can easily affect every topology in your cluster by deploying something that consumes too many resources. Let’s assume that you notice a topology isn’t processing any data or has a sudden drop in throughput and zero free slots are available.You have a fixed number of worker processes that can be allocated to the topologies requesting them. You can address this problem with these strategies:
Decreasing the number of worker processes in use by existing topologies Increasing the total number of worker processes in the cluster Decreasing the number of worker processes in use by existing topologies is the quickest and easiest way to free up slots for other topologies in your cluster. But this may or may not be possible depending on the SLAs for your existing topologies. If you can reduce the number of worker processes being used by a topology without violating the SLA, then go for it. If your SLAs don’t allow you to reduce the number of slots being used by any of the topologies in your cluster, you’ll have to add new worker processes to the cluster. There are two ways to increase the total number of worker processes in the cluster. One is by adding more worker processes to your worker nodes. But this won’t work if your worker nodes don’t have the resources to support additional JVMs. If this is the case, you’ll need to add more worker nodes to your cluster, thus adding to the pool of worker processes. Adding new worker nodes has the least impact on existing topologies, because adding worker processes to existing nodes has the potential to cause other types of contention that must then be addressed. Topology Worker Nodes and Processes Let’s assume that you have a problematic topology and need to identify the worker nodes and worker processes that topology is executing on. The way to do this is by looking at the Storm UI. You could start checking out the Bolts section to see if anything looks amiss. Having identified the problematic bolt, you now want to see more details about what’s happening with that bolt. To do so, click on that bolt’s name in the UI to get a more detailed view for that bolt. From here, turn your attention to the Executors and Errors section for the individual bolt. The Executors section for an individual bolt is of particular interest; this tells you which worker nodes and worker processes the bolt is executing on. From here, given the type of contention being experienced, you can take the necessary steps to identify and solve the problem at hand. Though a great tool, the Storm UI may not always show you what you need. This is where additional monitoring can help. This can come in the form of monitoring the health of individual worker nodes or custom metrics in your bolt’s code to give you a deeper insight into how well the bolt is performing. The bottom line here is you shouldn’t rely solely on the Storm UI. Put other measures in place to make sure you have coverage everywhere. Now that you identified the contention, you'd like to change the number of worker processes running on a worker node. The number of worker processes running on a worker node is defined by thesupervisor.slots.portsproperty in each worker node’s storm.yaml configuration file. This property defines the ports that each worker process will use to listen for messages. To increase the number of worker processes that can be run on a worker node, add a port to this list for each worker process to be added. The opposite holds true for decreasing the number of worker processes: remove a port for each worker process to be removed. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. Upon restarting, Nimbus will be aware of the updated configuration and send messages to only the ports defined in this list. Another thing to consider is the number of worker nodes you have in your cluster. If widespread changes are needed, updating the configuration and restarting the Supervisor process across hundreds or even tens of nodes is a tedious and time-consuming task. So try to use tools like Puppet, Chef, Ansible. Worker Process Memory Just as you install
a Storm cluster with a fixed number of worker processes, you also set up each
worker process (JVM) with a fixed amount of memory it can grow to use. The
amount of memory limits the number of threads (executors) that can be launched
on that JVM—each thread takes a certain amount of memory (the default is 1 MB on
a 64-bit Linux JVM). JVM contention can
be a problem on a per-topology basis. The combination of memory used by your
bolts, spouts, threads, and so forth might exceed that allocated to the JVMs
they’re running on. JVM contention
usually manifests itself as out-of-memory (OOM) errors and/or excessively long
garbage collection (GC) pauses. OOM errors will appear in the Storm logs and
Storm UI, usually as a stack trace starting withjava.lang.OutOfMemory-Error: Java heap space. Gaining visibility
into GC issues requires a little more setup, but it’s something that’s easily
supported by both the JVM and Storm configuration. The JVM offers startup
options for tracking and logging GC usage, and Storm provides a way to specify
JVM startup options for your worker processes. Theworker.childoptsproperty in storm.yaml is where you’d specify these
JVM options. One interesting item to note is the value for
the –Xloggc setting.
Remember you can have multiple worker processes per worker node. The worker.childopts property
applies to all worker processes on a node, so specifying a regular log filename
would produce one log file for all the worker processes combined. A separate
log file per worker process would make tracking GC usage per JVM easier. Storm
provides a mechanism for logging a specific worker process; the ID variable is
unique for each worker process on a worker node. Therefore, you can add a "%ID%" string to
the GC log filename and you’ll get a separate GC log file for each worker
process. Let’s assume that your
spouts and/or bolts are attempting to consume more memory than what has been
allocated to the JVM, resulting in OOM errors and/or long GC pauses. You can address the
problem in a couple of ways:
By increasing the number
of worker processes being used by the topology in question
By increasing the size
of your JVMs By adding a worker
process to a topology, you’ll decrease the average load across all worker
processes for that topology. This should result in a smaller memory footprint
for each worker process (JVM), hopefully eliminating the JVM memory contention. Because increasing
the size of your JVMs could require you to change the size of the machines/VMs
they’re running on, we recommend the “increase worker processes” solution if
you can. Swapping and
balancing memory across JVMs has been one of our biggest challenges with Storm.
Different topologies will have different memory usage patterns. Don’t worry if
you don’t get it right initially. This is a never-ending process as the shape
of your cluster and topologies changes. Beware when
increasing the memory allocated to a JVM; as a rule of thumb, when you cross
certain key points you’ll notice a change in how long garbage collection
takes—500 MB, 1 GB, 2 GB, and 4 GB are all around the points when our GC time
has taken a jump. It’s more art than science, so bring your patience with you.
There’s nothing more frustrating than addressing OOM issues by increasing JVM
memory size only to have it noticeably impact GC times. The amount of memory allocated to all worker processes (JVMs) on a worker node can be changed in theworker.childoptsproperty in each worker node’s storm.yaml configuration file. This property accepts any valid JVM startup option, providing the ability to set the startup options for the initial memory allocation pool (-Xms) and maximum memory allocation pool (-Xmx) for the JVMs on the worker node. Changing this property will update all the worker processes on a particular worker node. Make sure that your worker node has enough resources for the memory increase. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. From my old days of J2EE applications tuning, I still recommend setting –Xms and –Xmx to the same value to eliminate heap management overhead. Along with being more efficient, this strategy adds the benefit of making it easier to reason about JVM memory usage, because the heap size is a fixed constant for the life of the JVM. Worker Node Memory Much like how an individual JVM has a limited amount of available memory, so does a worker node as a whole. In addition to the memory needed to run your Storm worker processes (JVMs), you need memory to run the Supervisor process and any other processes on your worker node without swapping. A worker node has a fixed amount of memory that’s being used by its worker processes along with any other processes running on that worker node. If a worker node is experiencing memory contention, that worker node will be swapping.Swapping is the little death and needs to be avoided if you care about latency and throughput. This is a problem when using Storm; each worker node needs to have enough memory so that the worker processes and OS don’t swap. If you want to maintain consistent performance, you must avoid swapping with Storm’s JVMs. One way to keep an eye on this in Linux is using System Activity Reporter. Use sar -S for reporting swap space utilization statistics. If you run a single worker process per worker node, it’s impossible to run into contention between workers on that node. This can make maintaining consistent performance within a cluster much easier. We know of more than one development team that has opted for this approach. If possible, we advise you to seriously consider going this route. This is a nonstarter if you aren’t running in a virtualized environment. The cost is simply too high to do this if you’re running on “bare metal” with a single OS instance per physical machine. Within a virtualized environment, you’ll use more resources by doing this. Assume for a moment that your OS install requires n GB of disk space and uses 2 GB of memory to run effectively. If you have eight workers running on your cluster and you assign four workers per node, you’d use n* 2 GB of disk and 4 GB of memory to run the OS on your cluster nodes. If you were to run a single worker per node, that would skyrocket to n* 8 GB of disk and 16 GB of memory. That’s a fourfold increase in a rather small cluster. Imagine the additional usage that would result if you had a cluster that was 16, 32, 128, or more nodes in size. If you’re running in an environment such as Amazon Web Services (AWS) where you pay per node, the costs can add up quickly. Therefore, we suggest this approach only if you’re running in a private virtualized environment where the cost of hardware is relatively fixed and you have disk and memory resources to spare. Let’s assume that your worker node is swapping due to contention for that node’s memory. Here are a few options: Increase the memory available to each worker node. This would mean giving more memory to the physical machine or VM, depending on how you configured your cluster. Lower the collective memory used by worker processes. Lowering the collective memory used by worker processes can be done in one of two ways. The first is by reducing the number of worker processes per worker node. Reducing the total number of worker processes will lower the overall memory footprint of the combined remaining processes. The second way is by reducing the size of your JVMs. Be careful when lowering memory allocated to existing JVMs, though, to avoid introducing memory contention within the JVM. One safe solution is to always go the route of increasing the memory available to each machine. It’s the simplest solution and its resulting ramifications are the easiest to understand. If you are tight on memory, lowering memory usage can work, but you open yourself up to all the problems we discussed concerning GC and OOM on a per-JVM basis. Worker Node CPU Worker node CPU
contention occurs when the demand for CPU cycles outstrips the amount available.
This is a problem when using Storm and is one of the primary sources of
contention in a Storm cluster. If your Storm topology’s throughput is lower
than what you expect it to be, you may want to check the worker node(s) running
your topology to see if CPU contention exists. One way to keep an
eye on this in Linux is with the sar -u command for displaying real-time CPU usage of all
CPUs. Let’s assume that the throughput of your topologies is low, and based
on running the sar command, you see that CPU contention exists. To address the
problem, you have the following options:
Increasing the number of
CPUs available to the machine. This is only possible in a virtualized
environment.
Upgrading to a more
powerful CPU (Amazon Web Services (AWS) type of environment).
Spreading the JVM load
across more worker nodes by lowering the number of worker processes per
worker node. To spread worker
process (JVM) load across more worker nodes, you need to reduce the number of
worker processes running on each worker node. Reducing the number of worker
processes per worker node results in less processing (CPU requests) being done
on each worker node. There are two scenarios you may find yourself in when
attempting this solution. The first is you have unused worker processes in your
cluster and can therefore reduce the number of worker processes on your
existing nodes, thus spreading the load. The second scenario
is where you don’t have any unused worker processes and therefore need to add
worker nodes in order to reduce the number of worker processes per worker node. Reducing the number
of worker processes per worker node is a good way to reduce the number of CPU
cycles being requested on each node. You just need to be aware of what
resources are available and in use and act appropriately in your given
scenario. Worker Node I/O I/O contention on a
worker node can fall under one of two categories:
Disk I/O contention,
reading from and writing to the file system
Network/socket I/O
contention, reading from and writing to a network via a socket Both types of
contention are regularly an issue for certain classes of Storm topologies. The
first step in determining if you’re experiencing either of these contentions is
to establish whether a worker node is experiencing I/O contention in general.
Once you do, you can dive down into the exact type of I/O contention your
worker node is suffering from. One way to
determine if a worker node in your cluster is experiencing I/O contention is by
running the sar -u command for displaying real-time CPU usage. A healthy topology that uses a lot of I/O
shouldn’t spend a lot of time waiting for the resources to become available.
That’s why we use 10.00% as the threshold at which you start experiencing
performance degradation. If you know what topologies are running on a
given worker node, you know that they use a lot of network resources or disk
I/O, and you see iowait problems, you can probably safely assume which of the two
is your issue. If you’re seeing troubling I/O contention signs, first attempt
to determine if you’re suffering from socket/network I/O contention. If you
aren’t, assume that you’re suffering from disk I/O contention. Although this
might not always be the case, it can take you a long way as you learn the tools
of the trade. If your topologies
interact over a network with external services, network/socket I/O contention
is bound to be a problem for your cluster. In our experience, the main cause
for this type of contention is that all of the ports allocated for opening
sockets are being used. Most Linux installs
will default to 1024 maximum open files/sockets per process. In an
I/O-intensive topology, it’s easy to hit that limit quickly. To determine the
limits of yourOS, you can
examine the /proc filesystem to check your processes limits. In order to do
this, you’ll first need to know your process ID. Once you do that, you can get
a listing of all limits for that process. Start by getting the PID (ps aux
| grep MyTopologyName) and then check your process limits from the /proc
filesystem. If you’re hitting
this limit, the Storm UI for your topology should display an exception in the
“Last Error” column that the max open files limit has been reached. This will
most likely be a stack trace starting with java.net.SocketException: Too
many open files. Let’s assume that your topology is experiencing reduced throughput or no throughput
at all, and you’re seeing errors for hitting the limit of open sockets. A couple of ways to
address this problem are as follows:
Increasing the number of
available ports on the worker node
Adding more worker nodes
to the cluster For increasing the
number of available ports, you’ll need to edit the/etc/security/limits.conf file on most Linux distributions. These settings
will set the hard and soft limit on open files per user. The value we’re
concerned with as a Storm user is the soft limit. I don’t advise going higher
than 128k. If you do, then as a rule of thumb (until you learn more about
soft/hard limits for number of files open on Linux), I suggest setting the hard
limit to two times the value of the soft limit. Note that you need super-user
access to change limits.conf and you’re going to need to reboot the system to
make sure they take effect. Increasing the
number of worker nodes in the cluster will give you access to more ports. If
you don’t have the resources to add more physical machines or VMs, you’ll have
to try the first solution. The first real contention
issue was the number of sockets available per machine. Don’t add more workers
on other machines until you’ve increased available sockets on each node as much
as you can. Once you’ve done that, you should also look at your code. Are you opening and
closing sockets all the time? If you can keep connections open, do that.
There’s this wonderful thing calledTCP_WAIT. It’s where a TCP connection will stay open after
you close it waiting for any stray data. If you’re on a slow network link (like
many were when TCP was first designed), this is a wonderful idea that helps
prevent data corruption. If you’re on a fast modern LAN, it’ll drive you
insane. You can tune your TCP stack via various OS-specific means to lower how
long you linger inTCP_WAIT, but when you’re making tons of network calls, even
that won’t be enough. Be smart: open and close connections as little as
possible. Disk I/O contention
affects how quickly you can write to disk. This could be a problem with Storm
but should be exceedingly rare. If you’re writing large volumes of data to your
logs or storing the output of calculations to files on the local filesystem, it
might be an issue, but that should be unlikely. If you have a
topology that’s writing data to disk and notice its throughput is lower than
what you’re expecting, you should check to see if the worker nodes it’s running
on are experiencing disk I/O contention. For Linux installations, you can run a
command called iotop to get a view of the disk I/O usage for the worker
nodes in question. This command displays a table of current I/O usage by processes/threads
in the system, with the most I/O-intensive processes/threads listed first. Let’s assume that you have a topology that reads/writes to/from disk, and it looks
like the worker nodes it’s running on are experiencing disk I/O contention. To address this
problem
Write less data to disk.
This can mean cutting back within your topology. It can also mean putting
fewer worker processes on each worker node if multiple worker processes
are demanding disk on the same worker node.
Get faster disks. This
could include using a RAM disk.
If you’re writing to NFS
or some other network filesystem, stop immediately. Writing to NFS is slow
and you’re setting yourself up for disk I/O contention if you do. If you’re writing
large amounts of data to disk and you don’t have fast disks, you’re going to
have to accept it as a bottleneck. There’s not much you can do without throwing
money at the problem. Summary
Several types of
contention exist above the topology level, so it’s helpful to be able to
monitor things like CPU, I/O, and memory usage for the operating system
your worker nodes are running on.
It is important to have
some level of familiarity with monitoring tools for the operating system
of the machines/VMs in your cluster. In Linux, these include sar, netstat, and iotop.
There’s value in knowing
common JVM startup options, such as -Xms, -Xmx, and those related to GC logging.
Although the Storm UI is
a great tool for the initial diagnosis of many types of contention, it’s
smart to have other types of monitoring at the machine/VM level to let you
know if something is awry.
Including custom
metrics/monitoring in your individual topologies will give you valuable
insights that the Storm UI may not be able to.
Be careful when
increasing the number of worker processes running on a worker node because
you can introduce memory and/or CPU contention at the worker node level.
Be careful when
decreasing the number of worker processes running on a worker node because
you can affect your topologies’ throughput while also introducing contention
for worker processes across your cluster. References Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew Jankowski, and Peter Pathirana
... View more
Labels:
10-28-2016
05:41 PM
@cduby At step 5, you changed the GC mode. The recommended for LLAP is G1GC. Is there a reason for the change you proposed?
... View more
10-28-2016
02:47 AM
@cduby At step 6 you need to be sudo to execute ./tpcds-build.sh Data generation at the next step is fine with hdfs user.
... View more
10-21-2016
09:11 PM
15 Kudos
Introduction How many times we've heard that a problem can't be solved if it isn't
completely understood? However, all of us, even
when having a lot of experience and technical skill, we still tend to jump to the
solution workspace by-passing key steps in the scientific approach. We just don’t have enough patience and, quite often, the business is on our back to deliver a quick fix. We just want to get it done! We found
ourselves like that squirrel in the Ice Age chasing for the acorn just using our
experience and minimal experiment, dreaming that the acorn will fall on
our lap. This article is meant to re-emphasize a repeatable approach that can be applied to tune Storm
topologies starting from understanding the problem,
forming a hypothesis, testing that hypothesis, collecting the data, analyzing
the data, drawing conclusions based on facts and less on empirical. Hopefully it helps to catch more acorns! Understand the Problem Understand the functional problem
resolved by the topology; quite often the best tuning is executing a step
differently in the workflow for the same end-result, maybe apply a filter first
and then execute the step, rather executing the step forever and paying the penalty every time. You can also
prioritize success or learn to lose in order to win. This is the time also to
get some sense of business SLA and forecasted growth. Document the SLA by topology. Try to understand the reasons behind the SLA and identify opportunities for trade-offs. Define SLA for Business Success This is the time also to get some sense of business SLA and forecasted growth. Document the SLA by topology. Try to understand the reasons behind the SLA and identify opportunities for trade-offs. Gather Data Gather
data using Storm UI and other tools specific to the environment. Storm UI is your
first to go-to tool for tuning, most of the time. Another
tool to use is Storm’s builtin Metrics-Collecting API. This is available with
0.9.x. This helps to built-in metrics in your topology. You may have tuned it,
deployed the production, everybody is happy and two days later the issue comes
back. Wouldn’t be nice to have some metrics embedded on your code, already
focused on the usual suspects, e.g. external calls to a web service or a SQL to
your lookup data store, specific customers or some other strong business entity
that due to data and the workflow can become the bottleneck? In a storm topology, you have
spouts, bolts, workers that can play a role in your topology challenges. Learn about
topologies status, uptime, number of workers, executors, tasks, high-level
stats across four time windows, spout statistics, bolt statistics,
visualization of the spouts and the bolts and how they are connected, the flow
of tuples between all of the streams, all configurations for the topology. Define Baseline Numbers Document statistics about how
the topology performs in production. Setup a test environment and
try that topology with different loads aligned to realistic business SLAs. Tuning Options vs. Bottleneck
Increase parallelism (use
rebalance command), analyze
the change in capacity using the same Storm UI – if no improvement, focus on
the bolt, that is the likely bottleneck; if you see backlog in
your upstream (Kafka most likely), focus on the spout. After trying with spouts
and bolts, change focus on workers’ parallelism. Basic principle is to scale on
a single worker with executors until you find increasing executors does not
work anymore. If adjusting spout and bolt parallelism failed to provide additional benefits, play with the number of workers to see if we are now bound by the JVM we were running on and needed to parallelize across JVMs. If you still don’t meet the
SLA by tuning the existent spouts, bolts, and workers, it’s time to start controlling
the rate that flows into topology.
Max spout pending allows you to set a maximum number of tuples
that can be unacked at any given time. If the number of possible unacked tuples is lower than the total
parallelism you’ve set for your topology, then it could be a bottleneck. The goal
is with one or many spouts to assure that the maximum possible unacked tuples is
greater than the maximum number of tuples we can process based on our
parallelization so we can feel safe saying that max spout pending isn’t causing
a bottleneck. Without Max spout pending, tuples will continue to flow into your topology whether or
not you can keep up with processing them. Max spout pending allows to control the
ingest rate. Max spout
pending lets us erect a dam in front of our topology, apply back pressure, and
avoid being overwhelmed. Despite the optional nature of max spout pending, you
always should set it. When attempting to increase
performance to meet an SLA, increase the rate of data ingest by either
increasing spout parallelism or increasing the max spout pending. Assuming a 4x
increase in the maximum number of active tuples allowed, we’d expect to see the
speed of messages leaving our queue increase (maybe not by a factor of four,
but it’d certainly increase). If that caused the capacity metric for any of the
bolts to return to one or near it, tune the bolts again and repeat with the
spout and bolt until hit the SLA. These methods can be applied over and over until meet the
SLAs. This effort is high and automating steps is desirable. Deal with External Systems It’s easy when interacting with external services (such as a SOA
service, database, or filesystem) to ratchet up the parallelism to a high
enough level in a topology that limits in that external service keep your
capacity from going higher. Before you start tuning parallelism in a topology
that interacts with the outside world, be positive you have good metrics on
that service. You could keep turning up the parallelism on bolt to the point
that it brings the external service to its knees, crippling it under a mass of
traffic that it’s not designed to handle. Latency Let’s talk about one of the greatest enemies of fast code:
latency. There’s latency accessing local memory and hard drive, and accessing
another system over the network. Different interactions have different levels
of latency, and understanding the latency in your system is one of the keys to
tuning your topology. You’ll usually get fairly consistent response times and
all of the sudden those response times will vary wildly because of any number
of factors:
The
external service is having a garbage collection event.
A
network switch somewhere is momentarily overloaded.
Your
coworker wrote a runaway query that’s currently hogging most of the
database’s CPU. Something about
the data that’s likely to cause the delay. As the topology interacts with external services, let’s be smarter about our latency. We don't always need to increase parallelism and allocate more resources. We may discover after investigation that your variance is based
on customer or induced by an exceptional event, e.g. elections, Olympics, etc. Certain records
are going to be slower to look up. Sometimes one of those
“fast” lookups might end up taking longer. One
strategy that has worked well with services that exhibit this problem is to
perform initial lookup attempts with a hard ceiling on timeouts, try various
ceilings, if that fails, send it to a less parallelized instance of the same
bolt that will take longer with its timeout. The end result is that the time to
process a large number of messages goes down. Your mileage
will vary with this strategy, so test before you use it. As part of your tuning
strategy, you could end-up to break a bolt into two: one for fast lookups and
the other for slow lookups. At least, what goes fast let it go fast, don’t back
it up with a slow one. Treat the slow ones differently and make sure they are
the exception. You may even decide to address those differently and still get
some value of that data. It is up-to you to determine what is more important:
having an overall bottleneck or mark for later processing or disregard those producing a
bottleneck. Maybe a batch approach for those is more efficient. This is a
design consideration and a reasonable trade-off most of the time. Are you willing
to accept the occasional loss of fidelity but still hit the SLA because that is
more important for your business? Is Perfection helping or killing your business? Summary
All basic timing information for a topology can be found in the
Storm UI. Metrics (both built-in and custom) are essential if you want to
have a true understanding of how your topology is operating. Establishing a baseline
set of performance numbers for your topology is the essential first step
in the tuning process. Bottlenecks are
indicated by a high capacity for a spout/bolt and can be addressed by
increasing parallelism. Increasing parallelism
is best done in small increments so that you can gain a better
understanding of the effects of each increase. Latency that is both
related to the data (intrinsic) and not related to the data (extrinsic)
can reduce your topology’s throughput and may need to be addressed. Be ready for trade-offs
References Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew
Jankowski, and Peter Pathirana
... View more
Labels:
10-21-2016
12:06 AM
13 Kudos
Sizing There's no simple rule of thumb for this, it's as much an art as it is a science, as it depends on the workloads and how chatty they are with your current ZKs. One way to look at this is: If you have 3 ZKs you can
afford to lose one, if you have 5 you can afford to lose two. If your IT is aggressively
applying security patches and other upgrades, like firmware, kernel, Java,
other packages used by Hadoop tools, and taking nodes down to do the job, then
during those upgrades with 3 ZKs, you ZK runs with only two nodes, and if you
are unlucky and one of them goes down, then your whole cluster will go down.
So, in this case 5 are better. Warning: the more ZK nodes you have, the slower
the ZK becomes for writes. Placement Zookeeper is a master node, as such it can be collocated with
other master services. Ideally, you would not want to collocate it with an HA service. It is quite light on memory and CPU requirements, but since is disk intensive, don't collocate it with disk-intensive services like Kafka or HDFS. Storage Requirements In general, Zookeeper doesn't actually require huge drives because it will only store metadata information for many services, It is common to use 100G to 250G for zookeeper data directory and logs which is fine of many cluster deployments. Moreover, it is recommended to set configuration for automatic purging policy of snapshots and logs directories so that it doesn't end up by filling all the local storage. Dedicated or Shared? At Yahoo!, ZooKeeper is usually deployed on
DEDICATED RHEL boxes, with dual-core processors, 2GB of RAM, and 80GB IDE hard
drives. For your
Kafka/Storm cluster, you could consider deploying ZK on DEDICATED physical
hardware (not virtual). The driving force for physical hardware or at
least for the dedicated disk is the transaction log and the high throughput
nature of Kafka and Storm. Since Kafka is usually used with Storm, have a separate
Zookeeper cluster for Kafka and Storm. Kafka and Storm are sharing then, please make
sure that you don’t put the Zookeeper cluster on the Kafka nodes. Put
the Zookeeper on the Storm nodes. Caution
Rather than going to larger clusters of ZKs, it is better to split out certain services to their own ZKs when they're putting more pressure on an otherwise fairly quiet ZK cluster. It is a good thing to have separate set of ZK for each cluster, one quorum for Kafka, one quorum for Storm, one quorum for the rest (YARN, HBase, Hive, HDFS), possibly separate zookeepers for HBase. Challenge is that more hardware is needed and more administration, but it pays off. Be careful where you put that transaction log. The most performance-critical part of ZooKeeper is the transaction log. ZooKeeper must sync transactions to media before it returns a response. A dedicated transaction log device is key to consistent good performance. Putting the log on a busy device will adversely impact performance. If you only have one storage device, put trace files on NFS and increase the snapshotCount; it doesn't eliminate the problem, but it can mitigate it. ZooKeeper's transaction log must be on a dedicated device. A dedicated partition is not enough. ZooKeeper writes the log sequentially, without seeking. Sharing your log device with other processes can cause seeks and contention, which in turn can cause multi-second delays. Do not put ZooKeeper in a situation that can cause a swap. In order for ZooKeeper to function with any sort of timeliness, it simply cannot be allowed to swap. Remember, in ZooKeeper, everything is ordered, so if one request hits the disk, all other queued requests hit the disk. Going to disk unnecessarily will almost certainly degrade your performance unacceptably. Therefore, make certain that the maximum heap size given to ZooKeeper is not bigger than the amount of real memory available to ZooKeeper. Set your Java max heap size correctly. To avoid swapping, try to set the heapsize to the amount of physical memory you have, minus the amount needed by the OS and cache. The best way to determine an optimal heap size for your configurations is to run load tests. If for some reason you can't, be conservative in your estimates and choose a number well below the limit that would cause your machine to swap. For example, on a 4G machine, a 3G heap is a conservative estimate to start with. Best Practices
The ZooKeeper data directory contains the snapshot and transactional log files. It is a good practice to periodically clean up the directory if the auto-purge option is not enabled. Also, an administrator might want to keep a backup of these files, depending on the application needs. However, since ZooKeeper is a replicated service, we need to back up the data of only one of the servers in the ensemble. ZooKeeper uses Apache log4j as its logging infrastructure. As the logfiles grow bigger in size, it is recommended that you set the auto-rollover of the logfiles using the in-built log4j feature for ZooKeeper logs. The list of ZooKeeper servers used by the clients in their connection strings must match the list of ZooKeeper servers that each ZooKeeper server has. Strange behaviors might occur if the lists don't match. The server lists in each Zookeeper server configuration file should be consistent with the other members of the ensemble. The ZooKeeper transaction log must be configured in a dedicated device. This is very important to achieve best performance from ZooKeeper. The Java heap size should be chosen with care. Swapping should never be allowed to happen in the ZooKeeper server. It is better if the ZooKeeper servers have a reasonably high memory (RAM). System monitoring tools such as vmstat can be used to monitor virtual memory statistics and decide on the optimal size of memory needed, depending on the need of the application. In any case, swapping should be avoided. References: https://community.hortonworks.com/questions/2498/best-practices-for-zookeeper-placement.html https://community.hortonworks.com/questions/53025/zookeeper-performance-and-metrics-when-to-resize.html https://community.hortonworks.com/questions/55868/zookeeper-on-even-master-nodes.html Apache ZooKeeper Essentials by Saurav Haloi Published by Packt Publishing, 2015
... View more
10-20-2016
09:00 PM
@Eyad Garelnabi Documentation may need to be updated, but with the new kernels, swappiness does not need to be set to 0. Read this article from @emaxwell: https://community.hortonworks.com/articles/33522/swappiness-setting-recommendation.html
... View more
10-08-2016
01:13 AM
16 Kudos
Introduction The term “Load Testing” has evolved over the years, however
the core meaning still comes down to making sure that your system can handle a
predefined amount of users at the same time. The load test is usually run over
an extended period of time in a staggered manner, slowly ramping up the amount
of users until you hit a predefined maximum that’s usually based on projected
usage levels extrapolated from access logs with a buffer for surges. The goal of load testing is to determine the number of users that the
system can typically cope with, this is called the systems “concurrency level”
and give you a hard number to work with when dealing with capacity planning, performance and
optimization, stability, SLAs etc. JMeter was originally designed
for testing Web Applications but has since expanded to other test functions,
including database via JDBC. As such, it has the ability to
execute SQL queries against a given JDBC Driver. JMeter allows to define
queries to execute against a Hive table. An instance of JMeter can run multiple
threads of queries in parallel, with multiple instances of JMeter capable of
spreading clients across many nodes. The queries can also be parameterized with
pseudo-random data in order to simulate all types of queries to a table. JMeter automates the execution of the queries in parallel.
The results of the queries that JMeter ran are also aggregated and analyzed
together to provide an overall view into the performance. Mean and median are
provided for a simple insight, as well as 90th, 95th, 99th and 99.9th
percentiles to understand the execution tail. This approach is extremely useful to execute read-heavy
workloads. JMeter Setup for Hive Load Testing These steps have been tested on HDP 2.4.2 and OSX and should work similarly on other
Unix-like systems. Step 1. Download, Install
and Setup JMeter
Pre-requisites: http://jmeter.apache.org/usermanual/get-started.html Download JMeter: http://mirror.symnds.com/software/Apache//jmeter/binaries/apache-jmeter-3.0.tgz and unzip it to your preferred location Add the required Hive and logging jars from my
repo (https://github.com/cstanca1/jmeter-hive-hdp
)] to your JMETER_HOME/lib/ext. Start JMeter from $JMETER_HOME/bin directory running jmeter for unix-like systems
In the jdbc connection configuration page set the following:
Auto Commit = true
Database URL = jdbc:hive://hive_ip_address:10000/default
JDBC Driver Class = org.apache.hadoop.hive.jdbc.HiveDriver
Note: Set AutoCommit=true in JMeter configuration is a must. Hive does not support AutoCommit=false. Step 2: Building a Database Test Plan To build a database test plan, consult Instructions to build a database test plan. This example uses MySQL driver, however, the
same approach is applicable to Hive. You will have to provide the Hive database
URL. Note: If you use Hive2 Server the URL uses hive2 instead of hive. The plan includes adding users, JDBC requests
and a listener to view/store the test results Step 3: Build a Jmeter Dashboard JMeter supports dashboard report generation
to get graphs and statistics from a test plan. To build a dashboard follow Instructions to build a JMeter dashboard. This dashboard should include a request summary graph showing
the success and failed transaction percentage, a statistics table providing a
summary of all metrics per transaction including 3 configurable percentiles, an
error table providing a summary of all errors and their proportion in the total
requests, zoomable chart where you can check/uncheck every transaction to
show/hide it for response times over time, bytes throughput over time,
latencies over time, hits per second, response codes per second, transactions
per second, response time vs Request per second, latency vs Request per second,
response times percentiles, active threads over time, times vs threads,
response time distribution. Step 4: Run JMeter To run JMeter, run jmeter (for Unix) file. These
files are found in the bin directory. There are some additional scripts in the bin directory that you may find
useful: jmeter - run JMeter (in GUI mode by default). Defines some JVM
settings which may not work for all JVMs. jmeter-server - start JMeter in server mode (calls jmeter script with
appropriate parameters) jmeter.sh - very basic JMeter script (You may need to adapt JVM options
like memory settings). mirror-server.sh - runs the JMeter Mirror Server in non-GUI mode shutdown.sh - run the Shutdown client to stop a non-GUI instance
gracefully stoptest.sh - run the Shutdown client to stop a non-GUI instance abruptly It may be necessary to edit the jmeter shell script if some of the JVM
options are not supported by the JVM you are using. The JVM_ARGS
environment variable can be used to override or set additional JVM options, which will override the HEAP settings in the script. For
example: JVM_ARGS="-Xms1024m -Xmx1024m" jmeter -t test.jmx [etc.] Findings 1) I recently executed a Hive Load Test with JMeter and learned that maximum 10 connections are possible by 1 YARN queue. That was a big eye opener since the requirement was for 50 concurrent connections, Obviously, multiple queries can be submitted by connection, but also, as resources are available, multiple YARN queues can be created and used to increase number of connections. 2) Creating multiple queues to meet the 50 concurrent connections requirement lead to another finding, response time and scalability were impacted dramatically. If you assume N connections and M number of concurrent executions by connection, the more connections, the more overhead, more resources to do less and slower, For example, assume N = 10 and M =1000. That would be 10,000 concurrent queries. For N = 1 and M = 10000 that would be also 10000 concurrent queries. The same queries with the same resources allocated overall had a significantly better response time for lesser queues. As such, unless there is another reason for multiple queues, my advice would be to limit one queue for a tenant application and limit number of connections per queue as such to reuse one that is already open and better use existent resources. 3) Always question requirements for so many open connections. There is always a proxy application that can use a single connection to serve multiple requests from multiple users.
... View more
08-12-2016
11:03 PM
3 Kudos
Introduction This article is not meant to show how to install or create a
“Hello World” Nifi data flow, but how to resolve a data filtering problem with
NiFi providing two approaches, using a filter list as a file on the disk, which could be
static or dynamic, and a list stored in a distributed cache populated from the
same file. The amount of data used was minimal and simplistic and no
performance difference can be perceived, however, at scale, where memory is
available, a caching implementation should perform better. This article assumes some familiarity with NiFi, knowing
what a processor or a queue is and how to set the basic configurations for a
processor or a queue, also how to visualize the data at various steps throughout
the flow, starting and stopping processors. Since you are somehow familiar with Nifi, you probably know
how to install it and start it, however, I will provide a quick refresher
below. Pre-requisites For this demo, I used the latest version of Nifi available
at the date of working on this demo, 0.6.1.
This version was not part of the HDP 2.4.2 which was available at the
time of this demo, it has also 0.5.1. HDP 2.5 was just launched last month at
the Hadoop Summit in Santa Clara. If you wanted for your OSX installation to use brew install nifi that will only install nifi 0.5.1 which
does not have some of the features needed for the demo, e.g.
PutDistributedCacheMap or FetchDistributedCacheMap. Instead, use the following steps: A reference about downloading and installing on Linux and Mac is here. You can download Nifi 0.6.1 from any of the sites listed
there, for example: https://www.apache.org/dyn/closer.lua?path=/nifi/0.6.1/nifi-0.6.1-bin.tar.gz I prefer wget and installing my apps to /opt cd /opt
wget http://supergsego.com/apache/nifi/0.6.1/nifi-0.6.1-bin.tar.gz That will download a 421.19MB file tar –xvf nifi-0.6.1-bin.tar.gz
ls -l and here is your /opt/nifi-0.6.1 cd /opt/nifi-0.6.1/bin That is your NIFI_HOME. ./nifi.sh start Open a browser and type: http://localhost:8080/nifi You will need to import the NiFi .xml template posted in my github
repo, mentioned earlier. Clone it to your local folder of preference, assuming that you have a
git client installed: git clone https://github.com/cstanca1/nifi-filter.git After importing the model, instantiate it. It will show as the following: Required Changes In order for the template to work for your specific folder structure, you will need to make a few changes to tell GetFile processor (right-click on Get File
processor header, View Configuration, Properties tab, Input Directory, from where
to go to get the data). Keep in mind that the GetFile processor once started it
will read the file and delete it. If you want to re-feed it for test, you just
have to drop it again in the same folder and it will re-ingest it. You can also
place multiple files of the same structure in that folder and they will be
ingested all and every line. In real-life, GetFile can be replaced with a
different processor capable to read from an actual log. For this demo, I used a
static file as an input. Also, enable and start DistributedMapCacheServer Controller Service.This is required for the put and fetch
distributed cache. The DistributedMapCacheServer can be started just like any other Controller Service, by configuring it to be valid and hit the "start" button. The unique thing about the DistributedMapCacheServer is that processors work with the cache by utilizing a DistributedMapCacheClientService. So you will create both a Server and Client Service. Then configure the processor to use the Client Service. Next start both the server and service. Finally start the processor. Test Data For your test, you can use the two files
checked-in to the git repo that you just cloned locally: macaddresses-blacklist.txt and macaddresses-log.txt. macaddresses-blacklist.txt is a list of
blacklisted mac addresses which will be used to filter the incoming stream fed
by macaddresses-log.txt using GetFile ingest, line by line. To understand what happens step-by-step, I
suggest to start each processor and inspect the queue and data lineage. Populating
DistributedMapCache is performed in the flow presented on the right side of the
model that you imported at the previous step. The filtering flow, via Scan Attribute or
FetchDistributedCacheMap: Use of GetFile, SplitText and ExtractText processor is well
documented and a basic Google will return several good examples, however, a
good example of how to use FetchDistributedMapCache and PutDistributedMapCache
is not that well documented. That was the main reason to write this article. I
could not find another good reference. I am sure others felt the same way and
hopefully this helps. ScanAttribute
Approach Before starting this processor, you need to right click on
its header, choose Configuration and go to Properties tab and change the
Dictionary file to be your macaddress-blacklist.txt. This is a clone of the
same file you use in GetFile processor, but I suggest to put it in a separate
folder as such GetFile will not ingest it and delete it after use. This needs to
be permanent like a lookup file on the disk. SplitText is used to split the file line by line. You can
check this property by righ-clicking the header of the processor and choosing “Properties”
tab. Line Split Count is set to 1. ExtractText processor uses a custom regex to extract the mac
address from macaddress-log.txt. You can
find in Properties, last property in the list. ScanAttribute processor sets the Dictionary File to the
folder/file of choice. In this demo, I used macaddresses-blacklist.txt file
included in the repo that you cloned at one of the previous steps. DistributedCache
Approach The right branch of the flow in the left uses the
DistributedCache populated by the flow on the right of the model. Inspect each
processor by checking each processor properties. They are already similar with
the first half of the flow on the left, excepting the use of
PutDistributedCache processor which sets the Cache Entry Identifier for
mac.address value. I’ll refer only to the consumption of the mac.address
property value set by PutDistributedCache. Set Cache Entry Identifier to the same mac.address Please note that DistributedMapCacheClientService is
enabled. You can achieve that by clicking on NiFi Flow Settings icon, fourth on
the right corner, "Controller Services" Learning Technique Don’t forget to start all processors and inspect the queues.
My approach is to start processors one at the time in the order of the data
flow and processing and check all the stats and lineage on connection queues.
This is what I love about NiFi, it is so easy to test and learn. Credit Thanks to Simon Ball for taking a few minutes of his time to
review the model on DistributedCache approach. Conclusion Dynamic filtering has large applicability in any type of simple event processing. I am sure that there are
many other ways to skin the same cat with NiFi. Enjoy!
... View more
Labels: