Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Super Guru

Introduction

This is a continuation of Apache Storm Tuning Approach for Squirrels article that I published couple weeks ago.

A topology is going to have to coexist on a Storm cluster with a variety of other topologies. Some of those topologies will burn CPU doing heavy calculations, others will consume large amounts of network bandwidth.

No one sane can claim to provide a silver bullet for how to set up your cluster for the best performance. However, I’d like to share a few recipes and guidelines for dealing with issues as they arise.

I listed below 6 categories of contention, describing common problems and strategies to alleviate or eliminate resource contention. I hope that this article at least provides a good map for a deep dive onto Storm tuning. Each resource contention is probably worth it an article.

Worker Processes in a Cluster

A Storm cluster is installed with a fixed number of available worker processes across all worker nodes. Each time you deploy a new topology to the cluster, you specify how many worker processes that topology should consume. The number of worker processes a topology requests is specified in the code for building and submitting your topology to the Storm cluster. It is possible that you could deploy a topology that requires a certain number of worker processes but you can’t acquire the needed worker processes because they’ve all been assigned to existing topologies. This problem is easy to detect; it can be found by looking at the cluster summary page of the Storm UI and identifying the free slots. Slots correspond to worker processes. It’s important to always be aware of the resources available in your cluster when deploying new topologies. If you ignore what’s available within your cluster, you can easily affect every topology in your cluster by deploying something that consumes too many resources.

Let’s assume that you notice a topology isn’t processing any data or has a sudden drop in throughput and zero free slots are available.You have a fixed number of worker processes that can be allocated to the topologies requesting them. You can address this problem with these strategies:

  • Decreasing the number of worker processes in use by existing topologies
  • Increasing the total number of worker processes in the cluster

Decreasing the number of worker processes in use by existing topologies is the quickest and easiest way to free up slots for other topologies in your cluster. But this may or may not be possible depending on the SLAs for your existing topologies. If you can reduce the number of worker processes being used by a topology without violating the SLA, then go for it.

If your SLAs don’t allow you to reduce the number of slots being used by any of the topologies in your cluster, you’ll have to add new worker processes to the cluster.

There are two ways to increase the total number of worker processes in the cluster. One is by adding more worker processes to your worker nodes. But this won’t work if your worker nodes don’t have the resources to support additional JVMs. If this is the case, you’ll need to add more worker nodes to your cluster, thus adding to the pool of worker processes.

Adding new worker nodes has the least impact on existing topologies, because adding worker processes to existing nodes has the potential to cause other types of contention that must then be addressed.

Topology Worker Nodes and Processes

Let’s assume that you have a problematic topology and need to identify the worker nodes and worker processes that topology is executing on.

The way to do this is by looking at the Storm UI. You could start checking out the Bolts section to see if anything looks amiss. Having identified the problematic bolt, you now want to see more details about what’s happening with that bolt. To do so, click on that bolt’s name in the UI to get a more detailed view for that bolt. From here, turn your attention to the Executors and Errors section for the individual bolt.

The Executors section for an individual bolt is of particular interest; this tells you which worker nodes and worker processes the bolt is executing on. From here, given the type of contention being experienced, you can take the necessary steps to identify and solve the problem at hand.

Though a great tool, the Storm UI may not always show you what you need. This is where additional monitoring can help. This can come in the form of monitoring the health of individual worker nodes or custom metrics in your bolt’s code to give you a deeper insight into how well the bolt is performing. The bottom line here is you shouldn’t rely solely on the Storm UI. Put other measures in place to make sure you have coverage everywhere.

Now that you identified the contention, you'd like to change the number of worker processes running on a worker node.

The number of worker processes running on a worker node is defined by thesupervisor.slots.portsproperty in each worker node’s storm.yaml configuration file. This property defines the ports that each worker process will use to listen for messages.

To increase the number of worker processes that can be run on a worker node, add a port to this list for each worker process to be added. The opposite holds true for decreasing the number of worker processes: remove a port for each worker process to be removed. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. Upon restarting, Nimbus will be aware of the updated configuration and send messages to only the ports defined in this list.

Another thing to consider is the number of worker nodes you have in your cluster. If widespread changes are needed, updating the configuration and restarting the Supervisor process across hundreds or even tens of nodes is a tedious and time-consuming task. So try to use tools like Puppet, Chef, Ansible.

Worker Process Memory

Just as you install a Storm cluster with a fixed number of worker processes, you also set up each worker process (JVM) with a fixed amount of memory it can grow to use. The amount of memory limits the number of threads (executors) that can be launched on that JVM—each thread takes a certain amount of memory (the default is 1 MB on a 64-bit Linux JVM).

JVM contention can be a problem on a per-topology basis. The combination of memory used by your bolts, spouts, threads, and so forth might exceed that allocated to the JVMs they’re running on.

JVM contention usually manifests itself as out-of-memory (OOM) errors and/or excessively long garbage collection (GC) pauses. OOM errors will appear in the Storm logs and Storm UI, usually as a stack trace starting withjava.lang.OutOfMemory-Error: Java heap space.

Gaining visibility into GC issues requires a little more setup, but it’s something that’s easily supported by both the JVM and Storm configuration. The JVM offers startup options for tracking and logging GC usage, and Storm provides a way to specify JVM startup options for your worker processes. Theworker.childoptsproperty in storm.yaml is where you’d specify these JVM options.

One interesting item to note is the value for the –Xloggc setting. Remember you can have multiple worker processes per worker node. The worker.childopts property applies to all worker processes on a node, so specifying a regular log filename would produce one log file for all the worker processes combined. A separate log file per worker process would make tracking GC usage per JVM easier. Storm provides a mechanism for logging a specific worker process; the ID variable is unique for each worker process on a worker node. Therefore, you can add a "%ID%" string to the GC log filename and you’ll get a separate GC log file for each worker process.

Let’s assume that your spouts and/or bolts are attempting to consume more memory than what has been allocated to the JVM, resulting in OOM errors and/or long GC pauses.

You can address the problem in a couple of ways:

  • By increasing the number of worker processes being used by the topology in question
  • By increasing the size of your JVMs

By adding a worker process to a topology, you’ll decrease the average load across all worker processes for that topology. This should result in a smaller memory footprint for each worker process (JVM), hopefully eliminating the JVM memory contention.

Because increasing the size of your JVMs could require you to change the size of the machines/VMs they’re running on, we recommend the “increase worker processes” solution if you can.

Swapping and balancing memory across JVMs has been one of our biggest challenges with Storm. Different topologies will have different memory usage patterns. Don’t worry if you don’t get it right initially. This is a never-ending process as the shape of your cluster and topologies changes.

Beware when increasing the memory allocated to a JVM; as a rule of thumb, when you cross certain key points you’ll notice a change in how long garbage collection takes—500 MB, 1 GB, 2 GB, and 4 GB are all around the points when our GC time has taken a jump. It’s more art than science, so bring your patience with you. There’s nothing more frustrating than addressing OOM issues by increasing JVM memory size only to have it noticeably impact GC times.

The amount of memory allocated to all worker processes (JVMs) on a worker node can be changed in theworker.childoptsproperty in each worker node’s storm.yaml configuration file. This property accepts any valid JVM startup option, providing the ability to set the startup options for the initial memory allocation pool (-Xms) and maximum memory allocation pool (-Xmx) for the JVMs on the worker node. Changing this property will update all the worker processes on a particular worker node. Make sure that your worker node has enough resources for the memory increase. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change.

From my old days of J2EE applications tuning, I still recommend setting –Xms and –Xmx to the same value to eliminate heap management overhead. Along with being more efficient, this strategy adds the benefit of making it easier to reason about JVM memory usage, because the heap size is a fixed constant for the life of the JVM.

Worker Node Memory

Much like how an individual JVM has a limited amount of available memory, so does a worker node as a whole. In addition to the memory needed to run your Storm worker processes (JVMs), you need memory to run the Supervisor process and any other processes on your worker node without swapping.

A worker node has a fixed amount of memory that’s being used by its worker processes along with any other processes running on that worker node.

If a worker node is experiencing memory contention, that worker node will be swapping.Swapping is the little death and needs to be avoided if you care about latency and throughput. This is a problem when using Storm; each worker node needs to have enough memory so that the worker processes and OS don’t swap. If you want to maintain consistent performance, you must avoid swapping with Storm’s JVMs.

One way to keep an eye on this in Linux is using System Activity Reporter. Use sar -S for reporting swap space utilization statistics.

If you run a single worker process per worker node, it’s impossible to run into contention between workers on that node. This can make maintaining consistent performance within a cluster much easier. We know of more than one development team that has opted for this approach. If possible, we advise you to seriously consider going this route.

This is a nonstarter if you aren’t running in a virtualized environment. The cost is simply too high to do this if you’re running on “bare metal” with a single OS instance per physical machine. Within a virtualized environment, you’ll use more resources by doing this. Assume for a moment that your OS install requires n GB of disk space and uses 2 GB of memory to run effectively. If you have eight workers running on your cluster and you assign four workers per node, you’d use n* 2 GB of disk and 4 GB of memory to run the OS on your cluster nodes. If you were to run a single worker per node, that would skyrocket to n* 8 GB of disk and 16 GB of memory. That’s a fourfold increase in a rather small cluster. Imagine the additional usage that would result if you had a cluster that was 16, 32, 128, or more nodes in size. If you’re running in an environment such as Amazon Web Services (AWS) where you pay per node, the costs can add up quickly. Therefore, we suggest this approach only if you’re running in a private virtualized environment where the cost of hardware is relatively fixed and you have disk and memory resources to spare.

Let’s assume that your worker node is swapping due to contention for that node’s memory.

Here are a few options:

  • Increase the memory available to each worker node. This would mean giving more memory to the physical machine or VM, depending on how you configured your cluster.
  • Lower the collective memory used by worker processes.

Lowering the collective memory used by worker processes can be done in one of two ways. The first is by reducing the number of worker processes per worker node. Reducing the total number of worker processes will lower the overall memory footprint of the combined remaining processes.

The second way is by reducing the size of your JVMs. Be careful when lowering memory allocated to existing JVMs, though, to avoid introducing memory contention within the JVM.

One safe solution is to always go the route of increasing the memory available to each machine. It’s the simplest solution and its resulting ramifications are the easiest to understand. If you are tight on memory, lowering memory usage can work, but you open yourself up to all the problems we discussed concerning GC and OOM on a per-JVM basis.

Worker Node CPU

Worker node CPU contention occurs when the demand for CPU cycles outstrips the amount available. This is a problem when using Storm and is one of the primary sources of contention in a Storm cluster. If your Storm topology’s throughput is lower than what you expect it to be, you may want to check the worker node(s) running your topology to see if CPU contention exists.

One way to keep an eye on this in Linux is with the sar -u command for displaying real-time CPU usage of all CPUs.

Let’s assume that the throughput of your topologies is low, and based on running the sar command, you see that CPU contention exists.

To address the problem, you have the following options:

  • Increasing the number of CPUs available to the machine. This is only possible in a virtualized environment.
  • Upgrading to a more powerful CPU (Amazon Web Services (AWS) type of environment).
  • Spreading the JVM load across more worker nodes by lowering the number of worker processes per worker node.

To spread worker process (JVM) load across more worker nodes, you need to reduce the number of worker processes running on each worker node. Reducing the number of worker processes per worker node results in less processing (CPU requests) being done on each worker node. There are two scenarios you may find yourself in when attempting this solution. The first is you have unused worker processes in your cluster and can therefore reduce the number of worker processes on your existing nodes, thus spreading the load.

The second scenario is where you don’t have any unused worker processes and therefore need to add worker nodes in order to reduce the number of worker processes per worker node.

Reducing the number of worker processes per worker node is a good way to reduce the number of CPU cycles being requested on each node. You just need to be aware of what resources are available and in use and act appropriately in your given scenario.

Worker Node I/O

I/O contention on a worker node can fall under one of two categories:

  • Disk I/O contention, reading from and writing to the file system
  • Network/socket I/O contention, reading from and writing to a network via a socket

Both types of contention are regularly an issue for certain classes of Storm topologies. The first step in determining if you’re experiencing either of these contentions is to establish whether a worker node is experiencing I/O contention in general. Once you do, you can dive down into the exact type of I/O contention your worker node is suffering from.

One way to determine if a worker node in your cluster is experiencing I/O contention is by running the sar -u command for displaying real-time CPU usage.

A healthy topology that uses a lot of I/O shouldn’t spend a lot of time waiting for the resources to become available. That’s why we use 10.00% as the threshold at which you start experiencing performance degradation.

If you know what topologies are running on a given worker node, you know that they use a lot of network resources or disk I/O, and you see iowait problems, you can probably safely assume which of the two is your issue. If you’re seeing troubling I/O contention signs, first attempt to determine if you’re suffering from socket/network I/O contention. If you aren’t, assume that you’re suffering from disk I/O contention. Although this might not always be the case, it can take you a long way as you learn the tools of the trade.

If your topologies interact over a network with external services, network/socket I/O contention is bound to be a problem for your cluster. In our experience, the main cause for this type of contention is that all of the ports allocated for opening sockets are being used.

Most Linux installs will default to 1024 maximum open files/sockets per process. In an I/O-intensive topology, it’s easy to hit that limit quickly. To determine the limits of yourOS, you can examine the /proc filesystem to check your processes limits. In order to do this, you’ll first need to know your process ID. Once you do that, you can get a listing of all limits for that process. Start by getting the PID (ps aux | grep MyTopologyName) and then check your process limits from the /proc filesystem.

If you’re hitting this limit, the Storm UI for your topology should display an exception in the “Last Error” column that the max open files limit has been reached. This will most likely be a stack trace starting with java.net.SocketException: Too many open files.

Let’s assume that your topology is experiencing reduced throughput or no throughput at all, and you’re seeing errors for hitting the limit of open sockets.

A couple of ways to address this problem are as follows:

  • Increasing the number of available ports on the worker node
  • Adding more worker nodes to the cluster

For increasing the number of available ports, you’ll need to edit the/etc/security/limits.conf file on most Linux distributions. These settings will set the hard and soft limit on open files per user. The value we’re concerned with as a Storm user is the soft limit. I don’t advise going higher than 128k. If you do, then as a rule of thumb (until you learn more about soft/hard limits for number of files open on Linux), I suggest setting the hard limit to two times the value of the soft limit. Note that you need super-user access to change limits.conf and you’re going to need to reboot the system to make sure they take effect.

Increasing the number of worker nodes in the cluster will give you access to more ports. If you don’t have the resources to add more physical machines or VMs, you’ll have to try the first solution.

The first real contention issue was the number of sockets available per machine. Don’t add more workers on other machines until you’ve increased available sockets on each node as much as you can. Once you’ve done that, you should also look at your code.

Are you opening and closing sockets all the time? If you can keep connections open, do that. There’s this wonderful thing calledTCP_WAIT. It’s where a TCP connection will stay open after you close it waiting for any stray data. If you’re on a slow network link (like many were when TCP was first designed), this is a wonderful idea that helps prevent data corruption. If you’re on a fast modern LAN, it’ll drive you insane. You can tune your TCP stack via various OS-specific means to lower how long you linger inTCP_WAIT, but when you’re making tons of network calls, even that won’t be enough. Be smart: open and close connections as little as possible.

Disk I/O contention affects how quickly you can write to disk. This could be a problem with Storm but should be exceedingly rare. If you’re writing large volumes of data to your logs or storing the output of calculations to files on the local filesystem, it might be an issue, but that should be unlikely.

If you have a topology that’s writing data to disk and notice its throughput is lower than what you’re expecting, you should check to see if the worker nodes it’s running on are experiencing disk I/O contention. For Linux installations, you can run a command called iotop to get a view of the disk I/O usage for the worker nodes in question. This command displays a table of current I/O usage by processes/threads in the system, with the most I/O-intensive processes/threads listed first.

Let’s assume that you have a topology that reads/writes to/from disk, and it looks like the worker nodes it’s running on are experiencing disk I/O contention.

To address this problem

  • Write less data to disk. This can mean cutting back within your topology. It can also mean putting fewer worker processes on each worker node if multiple worker processes are demanding disk on the same worker node.
  • Get faster disks. This could include using a RAM disk.
  • If you’re writing to NFS or some other network filesystem, stop immediately. Writing to NFS is slow and you’re setting yourself up for disk I/O contention if you do.

If you’re writing large amounts of data to disk and you don’t have fast disks, you’re going to have to accept it as a bottleneck. There’s not much you can do without throwing money at the problem.

Summary

  • Several types of contention exist above the topology level, so it’s helpful to be able to monitor things like CPU, I/O, and memory usage for the operating system your worker nodes are running on.
  • It is important to have some level of familiarity with monitoring tools for the operating system of the machines/VMs in your cluster. In Linux, these include sar, netstat, and iotop.
  • There’s value in knowing common JVM startup options, such as -Xms, -Xmx, and those related to GC logging.
  • Although the Storm UI is a great tool for the initial diagnosis of many types of contention, it’s smart to have other types of monitoring at the machine/VM level to let you know if something is awry.
  • Including custom metrics/monitoring in your individual topologies will give you valuable insights that the Storm UI may not be able to.
  • Be careful when increasing the number of worker processes running on a worker node because you can introduce memory and/or CPU contention at the worker node level.
  • Be careful when decreasing the number of worker processes running on a worker node because you can affect your topologies’ throughput while also introducing contention for worker processes across your cluster.

References

Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew Jankowski, and Peter Pathirana

4,124 Views