Member since
03-16-2016
707
Posts
1753
Kudos Received
203
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 6961 | 09-21-2018 09:54 PM | |
| 8721 | 03-31-2018 03:59 AM | |
| 2613 | 03-31-2018 03:55 AM | |
| 2754 | 03-31-2018 03:31 AM | |
| 6174 | 03-27-2018 03:46 PM |
12-20-2016
03:14 AM
13 Kudos
Pre-requisites Hortonworks Data Platform 2.5 on CentOS 7.2 Python distribution that comes with HDP 2.5 - Python 2.7.5 Download and install pip #wget https://bootstrap.pypa.io/get-pip.py
Install add-on package #pip install requests Start Python CLI (default version) #python Import pre-reqs >>>import requests >>>import json >>>import sys Environment Variables Set Ambari domain variable to the IP address or FQDN of your Ambari node. >>>AMBARI_DOMAIN = '127.0.0.1' Set Ambari port, Ambari user and password variables to match your specifics. >>>AMBARI_PORT = '8080' >>>AMBARI_USER_ID = 'admin' >>>AMBARI_USER_PW = 'admin' Set the following variable to the IP address or FQDN of your ResourceManager node. >>>RM_DOMAIN = '127.0.0.1' Set Resource Manager port variable >>>RM_PORT = '8088' Ambari REST API Call Examples Let's find Cluster Name, Cluster Version, Stack and Stack Version: >>>restAPI='/api/v1/clusters' >>>url="http://"+AMBARI_DOMAIN+":"+AMBARI_PORT+restAPI >>>r=requests.get(url, auth=(AMBARI_USER_ID, AMBARI_USER_PW)) >>>json_data=json.loads(r.text) >>>CLUSTER_NAME = json_data["items"][0]["Clusters"]["cluster_name"] >>>print(CLUSTER_NAME) >>>CLUSTER_VERSION =json_data["items"][0]["Clusters"]["version"] >>>print(CLUSTER_VERSION) >>>STACK = CLUSTER_VERSION.split('-')[0] >>>print(STACK) >>>STACK_VERSION = CLUSTER_VERSION.split('-')[1] >>>print(STACK_VERSION) >>>CLUSTER_INFO=json_data >>>print(CLUSTER_INFO) Let's find HDP stack repository: >>>restAPI = "/api/v1/stacks/"+STACK+"/versions/"+STACK_VERSION+"/operating_systems/redhat7/repositories/"+CLUSTER_VERSION >>>url = "http://"+AMBARI_DOMAIN+":"+AMBARI_PORT+restAPI >>>r= requests.get(url, auth=(AMBARI_USER_ID, AMBARI_USER_PW)) >>>json_data=json.loads(r.text) >>>print(json_data) >>>REPOSITORY_NAME=json_data["Repositories"]["latest_base_url"] >>>print(REPOSITORY_NAME) A more elegant approach is to create utility functions. See my repo: https://github.com/cstanca1/HDP-restAPI/ restAPIFunctions.py script in the repo defines a number of useful functions that I have collected over time. Run restAPIFunctions.py The same example presented above can now be implemented with a single line call to get CLUSTER_NAME, CLUSTER_VERSION and CLUSTER_INFO using getClusterVersionAndName() function: >>>CLUSTER_NAME,CLUSTER_VERSION,CLUSTER_INFO = getClusterVersionAndName()
>>>print(CLUSTER_NAME)
>>>print(CLUSTER_VERSION)
>>>print(CLUSTER_INFO) Resource Manager REST API Call Examples >>>RM_INFO=getResourceManagerInfo()
>>>RM_SCHEDULER_INFO=getRMschedulerInfo()
>>>print(RM_INFO)
>>>print(RM_SCHEDULER_INFO) Other Functions These are other functions included in restAPIFunctions.py script getServiceActualConfigurations()
getClusterRepository()
getAmbariHosts()
getResourceManagerInfo()
getRMschedulerInfo()
getAppsSummary()
getNodesSummary()
getServiceConfigTypes()
getResourceManagerMetrics()
getCheckClusterForRollingUpgrades()
... View more
Labels:
12-18-2016
10:17 PM
13 Kudos
Background Tungsten became the default in Spark 1.5 and can be enabled in earlier versions by setting spark.sql.tungsten.enabled to true (or disabled in later versions by setting this to false). Even without Tungsten, Spark SQL uses a columnar storage format with Kryo serialization to minimize storage cost. Goal The goal of Project Tungsten is to improve Spark execution by optimizing Spark jobs for CPU and memory efficiency (as opposed to network and disk I/O which are considered fast enough). Scope Tungsten focuses on the hardware architecture of the platform Spark runs on, including but not limited to JVM, LLVM, GPU, NVRAM, etc. Optimization Features Off-Heap Memory Management using binary in-memory data representation aka Tungsten row format and managing memory explicitly, Cache Locality which is about cache-aware computations with cache-aware layout for high cache hit rates, Whole-Stage Code Generation (aka CodeGen). Design Improvements Tungsten includes specialized in-memory data structures
tuned for the type of operations required by Spark, improved code generation,
and a specialized wire protocol. Tungsten’s representation is substantially smaller than
objects serialized using Java or even Kryo serializers. As Tungsten does not depend on Java objects, both on-heap
and off-heap allocations are supported. Not only is the format more compact, serialization times can
be substantially faster than with native serialization. Since Tungsten no longer depends on working with Java
objects, you can use either on-heap (in the JVM) or off-heap storage. If you
use off-heap storage, it is important to leave enough room in your containers
for the off-heap allocations - which you can get an approximate idea for from
the web ui. Tungsten’s data structures are also created closely in mind
with the kind of processing for which they are used. The classic example of this is with sorting, a common and
expensive operation. The on-wire representation is implemented so that sorting
can be done without having to deserialize the data again. By
avoiding the memory and GC overhead of regular Java objects, Tungsten is able
to process larger data sets than the same hand-written aggregations. Benefits The following Spark jobs will benefit from Tungsten: Dataframes: Java, Scala, Python, R SparkSQL queries Some RDD API programs via general serialization and compression optimizations Next Steps In the future Tungsten may make it more feasible to use certain non-JVM libraries. For many simple operations the cost of using BLAS, or similar linear algebra packages, from the JVM is dominated by the cost of copying the data off-heap. References: Project
Tungsten: Bringing Apache Spark Closer to Bare Metal High Performance Spark by Holden Karau; Rachel
Warren Slides: Deep Dive Into Project Tungsten - Josh Rosen Video: Deep Dive into Project Tungsten Bringing Spark Closer to Bare Metal -Josh Rosen (Databricks)
... View more
Labels:
12-17-2016
01:33 AM
11 Kudos
Introduction Many organizations have come to rely on Hadoop for dealing with the ever-increasing quantities of data that they gather. Today, it is clear what problems Hadoop can solve, however, cloud is still not the first choice for Hadoop deployment. Pros and cons for Hadoop in the cloud have been shared across multiple blogs and books, but the question is always coming-up in discussions with enterprises considering Hadoop in the Cloud. Thus, I thought it would be useful to collate together a few pros and cons, as well as mention a pragmatic approach to consider a hybrid cloud for organizations that have made significant investments on-prem. For organizations going to Hadoop for the first time, Cloud is probably a better bet especially if they don't have a lot of IT expertise and a great stream of revenue exists and needs to be exploited immediately. Pro Cloud Lack of space. You don’t have space to keep racks of physical servers, along with the necessary power and cooling. Flexibility. It is much easier to reorganize instances, or expand or contract your footprint, for changing business needs. Everything is controlled through cloud provider APIs and web consoles. Changes can be scripted and put into effect manually or even automatically and dynamically based on current conditions. New usage patterns. Cloud providers abstract computing resources such that they are not tied to physical configurations, which means they can be managed in ways that are otherwise impractical. For example, individuals could have their own instances, clusters, and even networks to work with, without much managerial overhead. The overall budget for CPU cores in your cloud provider account can be concentrated in a set of large instances, a larger set of smaller instances, or some mixture, and can even change over time. When an instance malfunctions, instead of troubleshooting what went wrong, you can just tear it down and replace it. Worldwide availability. The largest cloud providers have data centers around the world. You can use resources close to where you work, or close to where your customers are, for the best performance. You can set up redundant clusters, or even entire computing environments, in multiple data centers, so that if local problems occur in one data center, you can shift to working elsewhere. Data retention restrictions. If you have data that is required by law to be stored within specific geographic areas, you can keep it in clusters that are hosted in data centers in those areas. Cloud provider features. Each major cloud provider offers an ecosystem of features to support the core functions of computing, networking, and storage. To use those features most effectively, your clusters should run in the cloud provider as well. Capacity. Very few customers tax the infrastructure of a major cloud provider. You can establish large systems in the cloud that are not nearly as easy to put together, not to mention maintain, on-prem. Pro On-Prem Simplicity. Cloud providers start you off with reasonable defaults, but then it is up to you to figure out how all of their features work and when they are appropriate. It takes a lot of experience to become proficient at picking the right types of instances and arranging networks properly. High levels of control. Beyond the general geographic locations of cloud provider data centers and the hardware specifications that providers reveal for their resources, it is not possible to have exacting, precise control over your cloud architecture. You cannot tell exactly where the physical devices sit, or what the devices near them are doing, or how data across them shares the same physical network1. When the cloud provider has internal problems such as network outages, there’s not much you can do but wait. Unique hardware needs. You cannot have cloud providers attach specialized peripherals or dongles to their hardware for you. If your application requires resources that exceed what a cloud provider offers, you will need to host that part on-prem away from your Hadoop clusters. Saving money. For one thing, you are still paying for the resources you use. The hope is that the economy of scale that a cloud provider can achieve makes it more economical for you to pay to “rent” their hardware than to run your own. You will also still need people who understand system administration and networking to take care of your cloud infrastructure. Inefficient architectures can cost a lot of money in storage and data transfer costs, or instances that are running idle. Best of Both Instead of running your clusters and associated applications completely in the cloud or completely on-prem, the overall system is split between the two - Hybrid Cloud. Data channels are established between the cloud and on-prem worlds to connect the components needed to perform work. Examples Suppose there is a large, existing on-prem data processing system, perhaps using Hadoop clusters, which works well. In order to expand its capacity for running new analyses, rather than adding more on-prem hardware, Hadoop clusters can be created in the cloud. Data needed for the analyses is copied up to the Hadoop clusters where it is analyzed, and the results are sent back on-prem. The cloud clusters can be brought up and torn down in response to demand, which helps to keep costs lower. Assume the management of vast amounts of incoming data that needs to be centralized and processed. To avoid having one single choke point where all of the raw data is sent, a set of cloud clusters can share the load, perhaps each in a geographic location convenient to where the data is generated. These clusters can perform pre-processing of the data, such as cleaning and summarization, to reduce the work that the final centralized system must perform. References Moving Hadoop to the Cloud by Bill Havanki, published by O'Reilly Media, Inc., 2017.
... View more
Labels:
11-30-2016
02:57 PM
Thank you, Constantin!
... View more
11-24-2016
02:10 AM
10 Kudos
Behavior The number of cells returned to the client are
normally filtered based on the table configuration; however, when using the RAW
=> true parameter, you can retrieve all of the versions kept by HBase, unless
there was a major compaction or a flush to disk event in meanwhile. Demonstration Create a table with a single column family: create 't1', 'f1' Configure it to retain a maximum version count of 3: alter 't1',NAME=>'f1',VERSIONS=>3 Perform 4 puts: put 't1','r1','f1:c1',1
put 't1','r1','f1:c1',2
put 't1','r1','f1:c1',3
put 't1','r1','f1:c1',4 Scan with RAW=>true. I used VERSIONS as 100 for a catch-all. It could have been anything greater than 3 (number of versions set previously). Unless specified, only the latest version is returned by scan command. scan 't1',{RAW=>true,VERSIONS=>100} The above scan returns all four versions. ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479950685181, value=4
r1 column=f1:c1,timestamp=1479950685155, value=3
r1 column=f1:c1,timestamp=1479950685132, value=2
r1 column=f1:c1,timestamp=1479950627736, value=1 Flush to disk: flush ‘t1’ Then scan: scan 't1',{RAW=>true,VERSIONS=>100} Three versions are returned. ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479952079260, value=4
r1 column=f1:c1,timestamp=1479952079234, value=3
r1 column=f1:c1,timestamp=1479952079209, value=2
Do four more puts: put 't1','r1','f1:c1',5
put 't1','r1','f1:c1',6
put 't1','r1','f1:c1',7
put 't1','r1','f1:c1',8
Flush to disk: flush ‘t1’ Scan: scan 't1',{RAW=>true,VERSIONS=>100} Six versions are returned: ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479952349970, value=8
r1 column=f1:c1,timestamp=1479952349925, value=7
r1 column=f1:c1,timestamp=1479952349895, value=6
r1 column=f1:c1,timestamp=1479952079260, value=4
r1 column=f1:c1,timestamp=1479952079234, value=3
r1 column=f1:c1,timestamp=1479952079209, value=2
Force major compaction: major_compact ‘t1’ Scan: scan 't1',{RAW=>true,VERSIONS=>100} Three versions are returned: ROW COLUMN+CELL
r1 column=f1:c1,timestamp=1479952349970, value=8
r1 column=f1:c1,timestamp=1479952349925, value=7
r1 column=f1:c1,timestamp=1479952349895, value=6
Conclusion When deciding the number of versions to retain,
it is best to treat that number as the minimum version count available at a
given time and not as a given constant. Until a flush to disk and a major
compaction, number of versions available is higher than the configured for the
table.
... View more
Labels:
12-18-2016
09:15 PM
@Kaliyug Antagonist I like Tom's suggestion and will it try myself. Otherwise, if you wish to create your local cluster with Vagrant: https://community.hortonworks.com/articles/39156/setup-hortonworks-data-platform-using-vagrant-virt.html I use Eclipse and Vagrant cluster. I share a folder between my local machine and the cluster where I place the output jars and then submit them for execution. I followed instructions published here: here: https://community.hortonworks.com/articles/43269/intellij-eclipse-usage-against-hdp-25-sandbox.html I am not sure why you are against of the idea to use the sandbox. The code you develop can be at most tested functionally, locally. I get it that you want more debugging capabilities locally. A true load testing still needs to happen in a full scale environment.
... View more
11-16-2016
09:59 PM
11 Kudos
Introduction This article is a continuation of Geo-spatial
Queries with Hive Using ESRI Geometry Libraries article published a few months ago. Objective Demonstrate how to use Hive context and invoke built-in ESRI UDFs for Hive from Spark SQL. Pre-requisites
HDP 2.4.2 Steps documented on Geo-spatial
Queries with Hive Using ESRI Geometry Libraries Steps 1. Launch spark-shell with --jars as its parameter: spark-shell --jars /home/spark/esri/esri-geometry-api.jar,/home/spark/esri/spatial-sdk-hive-1.1.1-SNAPSHOT.jar
I placed the dependency jars to
/home/spark/esri path, but you can store them in hdfs or local filesystem and grant proper privileges to your spark user. 2.
Instantiate sqlContext: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); 3.
From spark-shell, define temporary functions: sqlContext.sql("""create temporary function st_point as 'com.esri.hadoop.hive.ST_Point'""");
sqlContext.sql("""create temporary function st_x as 'com.esri.hadoop.hive.ST_X'"""); 4. From
spark-shell, invoke your UDF: sqlContext.sql("""from geospatial.demo_shape_point select st_x(st_point(shape))""").show; Note: geospatial is the Hive database where demo_shape_point table was created Conclusion The Esri Geometry API for Java and
the Spatial
Framework for Hadoop could be used by developers building geometry
functions for various geo-spatial applications using also Spark, not only Hive.
... View more
Labels:
12-16-2016
09:12 PM
@Gurpreet Singh @Greg Keys has provided the link you requested (ref REL_SUCCESS): http://funnifi.blogspot.com/2016/02/executescript-processor-hello-world.html
... View more
11-03-2016
07:59 PM
13 Kudos
Introduction This is a continuation of Apache Storm Tuning Approach for Squirrels article that I published couple weeks ago. A topology is going to have to coexist on a
Storm cluster with a variety of other topologies. Some of those topologies will
burn CPU doing heavy calculations, others will consume large amounts of network
bandwidth. No one sane can claim to provide a silver
bullet for how to set up your cluster for the best performance. However, I’d
like to share a few recipes and guidelines for dealing with issues as they
arise. I listed below 6 categories of contention, describing common problems and strategies to alleviate or eliminate resource contention. I hope that this article at least provides a good map for a deep dive onto Storm tuning. Each resource contention is probably worth it an article. Worker Processes in a Cluster A Storm cluster is installed with a fixed number of available worker processes across all worker nodes. Each time you deploy a new topology to the cluster, you specify how many worker processes that topology should consume. The number of worker processes a topology requests is specified in the code for building and submitting your topology to the Storm cluster. It is possible that you could deploy a topology that requires a certain number of worker processes but you can’t acquire the needed worker processes because they’ve all been assigned to existing topologies. This problem is easy to detect; it can be found by looking at the cluster summary page of the Storm UI and identifying the free slots. Slots correspond to worker processes. It’s important to always be aware of the resources available in your cluster when deploying new topologies. If you ignore what’s available within your cluster, you can easily affect every topology in your cluster by deploying something that consumes too many resources. Let’s assume that you notice a topology isn’t processing any data or has a sudden drop in throughput and zero free slots are available.You have a fixed number of worker processes that can be allocated to the topologies requesting them. You can address this problem with these strategies:
Decreasing the number of worker processes in use by existing topologies Increasing the total number of worker processes in the cluster Decreasing the number of worker processes in use by existing topologies is the quickest and easiest way to free up slots for other topologies in your cluster. But this may or may not be possible depending on the SLAs for your existing topologies. If you can reduce the number of worker processes being used by a topology without violating the SLA, then go for it. If your SLAs don’t allow you to reduce the number of slots being used by any of the topologies in your cluster, you’ll have to add new worker processes to the cluster. There are two ways to increase the total number of worker processes in the cluster. One is by adding more worker processes to your worker nodes. But this won’t work if your worker nodes don’t have the resources to support additional JVMs. If this is the case, you’ll need to add more worker nodes to your cluster, thus adding to the pool of worker processes. Adding new worker nodes has the least impact on existing topologies, because adding worker processes to existing nodes has the potential to cause other types of contention that must then be addressed. Topology Worker Nodes and Processes Let’s assume that you have a problematic topology and need to identify the worker nodes and worker processes that topology is executing on. The way to do this is by looking at the Storm UI. You could start checking out the Bolts section to see if anything looks amiss. Having identified the problematic bolt, you now want to see more details about what’s happening with that bolt. To do so, click on that bolt’s name in the UI to get a more detailed view for that bolt. From here, turn your attention to the Executors and Errors section for the individual bolt. The Executors section for an individual bolt is of particular interest; this tells you which worker nodes and worker processes the bolt is executing on. From here, given the type of contention being experienced, you can take the necessary steps to identify and solve the problem at hand. Though a great tool, the Storm UI may not always show you what you need. This is where additional monitoring can help. This can come in the form of monitoring the health of individual worker nodes or custom metrics in your bolt’s code to give you a deeper insight into how well the bolt is performing. The bottom line here is you shouldn’t rely solely on the Storm UI. Put other measures in place to make sure you have coverage everywhere. Now that you identified the contention, you'd like to change the number of worker processes running on a worker node. The number of worker processes running on a worker node is defined by thesupervisor.slots.portsproperty in each worker node’s storm.yaml configuration file. This property defines the ports that each worker process will use to listen for messages. To increase the number of worker processes that can be run on a worker node, add a port to this list for each worker process to be added. The opposite holds true for decreasing the number of worker processes: remove a port for each worker process to be removed. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. Upon restarting, Nimbus will be aware of the updated configuration and send messages to only the ports defined in this list. Another thing to consider is the number of worker nodes you have in your cluster. If widespread changes are needed, updating the configuration and restarting the Supervisor process across hundreds or even tens of nodes is a tedious and time-consuming task. So try to use tools like Puppet, Chef, Ansible. Worker Process Memory Just as you install
a Storm cluster with a fixed number of worker processes, you also set up each
worker process (JVM) with a fixed amount of memory it can grow to use. The
amount of memory limits the number of threads (executors) that can be launched
on that JVM—each thread takes a certain amount of memory (the default is 1 MB on
a 64-bit Linux JVM). JVM contention can
be a problem on a per-topology basis. The combination of memory used by your
bolts, spouts, threads, and so forth might exceed that allocated to the JVMs
they’re running on. JVM contention
usually manifests itself as out-of-memory (OOM) errors and/or excessively long
garbage collection (GC) pauses. OOM errors will appear in the Storm logs and
Storm UI, usually as a stack trace starting withjava.lang.OutOfMemory-Error: Java heap space. Gaining visibility
into GC issues requires a little more setup, but it’s something that’s easily
supported by both the JVM and Storm configuration. The JVM offers startup
options for tracking and logging GC usage, and Storm provides a way to specify
JVM startup options for your worker processes. Theworker.childoptsproperty in storm.yaml is where you’d specify these
JVM options. One interesting item to note is the value for
the –Xloggc setting.
Remember you can have multiple worker processes per worker node. The worker.childopts property
applies to all worker processes on a node, so specifying a regular log filename
would produce one log file for all the worker processes combined. A separate
log file per worker process would make tracking GC usage per JVM easier. Storm
provides a mechanism for logging a specific worker process; the ID variable is
unique for each worker process on a worker node. Therefore, you can add a "%ID%" string to
the GC log filename and you’ll get a separate GC log file for each worker
process. Let’s assume that your
spouts and/or bolts are attempting to consume more memory than what has been
allocated to the JVM, resulting in OOM errors and/or long GC pauses. You can address the
problem in a couple of ways:
By increasing the number
of worker processes being used by the topology in question
By increasing the size
of your JVMs By adding a worker
process to a topology, you’ll decrease the average load across all worker
processes for that topology. This should result in a smaller memory footprint
for each worker process (JVM), hopefully eliminating the JVM memory contention. Because increasing
the size of your JVMs could require you to change the size of the machines/VMs
they’re running on, we recommend the “increase worker processes” solution if
you can. Swapping and
balancing memory across JVMs has been one of our biggest challenges with Storm.
Different topologies will have different memory usage patterns. Don’t worry if
you don’t get it right initially. This is a never-ending process as the shape
of your cluster and topologies changes. Beware when
increasing the memory allocated to a JVM; as a rule of thumb, when you cross
certain key points you’ll notice a change in how long garbage collection
takes—500 MB, 1 GB, 2 GB, and 4 GB are all around the points when our GC time
has taken a jump. It’s more art than science, so bring your patience with you.
There’s nothing more frustrating than addressing OOM issues by increasing JVM
memory size only to have it noticeably impact GC times. The amount of memory allocated to all worker processes (JVMs) on a worker node can be changed in theworker.childoptsproperty in each worker node’s storm.yaml configuration file. This property accepts any valid JVM startup option, providing the ability to set the startup options for the initial memory allocation pool (-Xms) and maximum memory allocation pool (-Xmx) for the JVMs on the worker node. Changing this property will update all the worker processes on a particular worker node. Make sure that your worker node has enough resources for the memory increase. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. From my old days of J2EE applications tuning, I still recommend setting –Xms and –Xmx to the same value to eliminate heap management overhead. Along with being more efficient, this strategy adds the benefit of making it easier to reason about JVM memory usage, because the heap size is a fixed constant for the life of the JVM. Worker Node Memory Much like how an individual JVM has a limited amount of available memory, so does a worker node as a whole. In addition to the memory needed to run your Storm worker processes (JVMs), you need memory to run the Supervisor process and any other processes on your worker node without swapping. A worker node has a fixed amount of memory that’s being used by its worker processes along with any other processes running on that worker node. If a worker node is experiencing memory contention, that worker node will be swapping.Swapping is the little death and needs to be avoided if you care about latency and throughput. This is a problem when using Storm; each worker node needs to have enough memory so that the worker processes and OS don’t swap. If you want to maintain consistent performance, you must avoid swapping with Storm’s JVMs. One way to keep an eye on this in Linux is using System Activity Reporter. Use sar -S for reporting swap space utilization statistics. If you run a single worker process per worker node, it’s impossible to run into contention between workers on that node. This can make maintaining consistent performance within a cluster much easier. We know of more than one development team that has opted for this approach. If possible, we advise you to seriously consider going this route. This is a nonstarter if you aren’t running in a virtualized environment. The cost is simply too high to do this if you’re running on “bare metal” with a single OS instance per physical machine. Within a virtualized environment, you’ll use more resources by doing this. Assume for a moment that your OS install requires n GB of disk space and uses 2 GB of memory to run effectively. If you have eight workers running on your cluster and you assign four workers per node, you’d use n* 2 GB of disk and 4 GB of memory to run the OS on your cluster nodes. If you were to run a single worker per node, that would skyrocket to n* 8 GB of disk and 16 GB of memory. That’s a fourfold increase in a rather small cluster. Imagine the additional usage that would result if you had a cluster that was 16, 32, 128, or more nodes in size. If you’re running in an environment such as Amazon Web Services (AWS) where you pay per node, the costs can add up quickly. Therefore, we suggest this approach only if you’re running in a private virtualized environment where the cost of hardware is relatively fixed and you have disk and memory resources to spare. Let’s assume that your worker node is swapping due to contention for that node’s memory. Here are a few options: Increase the memory available to each worker node. This would mean giving more memory to the physical machine or VM, depending on how you configured your cluster. Lower the collective memory used by worker processes. Lowering the collective memory used by worker processes can be done in one of two ways. The first is by reducing the number of worker processes per worker node. Reducing the total number of worker processes will lower the overall memory footprint of the combined remaining processes. The second way is by reducing the size of your JVMs. Be careful when lowering memory allocated to existing JVMs, though, to avoid introducing memory contention within the JVM. One safe solution is to always go the route of increasing the memory available to each machine. It’s the simplest solution and its resulting ramifications are the easiest to understand. If you are tight on memory, lowering memory usage can work, but you open yourself up to all the problems we discussed concerning GC and OOM on a per-JVM basis. Worker Node CPU Worker node CPU
contention occurs when the demand for CPU cycles outstrips the amount available.
This is a problem when using Storm and is one of the primary sources of
contention in a Storm cluster. If your Storm topology’s throughput is lower
than what you expect it to be, you may want to check the worker node(s) running
your topology to see if CPU contention exists. One way to keep an
eye on this in Linux is with the sar -u command for displaying real-time CPU usage of all
CPUs. Let’s assume that the throughput of your topologies is low, and based
on running the sar command, you see that CPU contention exists. To address the
problem, you have the following options:
Increasing the number of
CPUs available to the machine. This is only possible in a virtualized
environment.
Upgrading to a more
powerful CPU (Amazon Web Services (AWS) type of environment).
Spreading the JVM load
across more worker nodes by lowering the number of worker processes per
worker node. To spread worker
process (JVM) load across more worker nodes, you need to reduce the number of
worker processes running on each worker node. Reducing the number of worker
processes per worker node results in less processing (CPU requests) being done
on each worker node. There are two scenarios you may find yourself in when
attempting this solution. The first is you have unused worker processes in your
cluster and can therefore reduce the number of worker processes on your
existing nodes, thus spreading the load. The second scenario
is where you don’t have any unused worker processes and therefore need to add
worker nodes in order to reduce the number of worker processes per worker node. Reducing the number
of worker processes per worker node is a good way to reduce the number of CPU
cycles being requested on each node. You just need to be aware of what
resources are available and in use and act appropriately in your given
scenario. Worker Node I/O I/O contention on a
worker node can fall under one of two categories:
Disk I/O contention,
reading from and writing to the file system
Network/socket I/O
contention, reading from and writing to a network via a socket Both types of
contention are regularly an issue for certain classes of Storm topologies. The
first step in determining if you’re experiencing either of these contentions is
to establish whether a worker node is experiencing I/O contention in general.
Once you do, you can dive down into the exact type of I/O contention your
worker node is suffering from. One way to
determine if a worker node in your cluster is experiencing I/O contention is by
running the sar -u command for displaying real-time CPU usage. A healthy topology that uses a lot of I/O
shouldn’t spend a lot of time waiting for the resources to become available.
That’s why we use 10.00% as the threshold at which you start experiencing
performance degradation. If you know what topologies are running on a
given worker node, you know that they use a lot of network resources or disk
I/O, and you see iowait problems, you can probably safely assume which of the two
is your issue. If you’re seeing troubling I/O contention signs, first attempt
to determine if you’re suffering from socket/network I/O contention. If you
aren’t, assume that you’re suffering from disk I/O contention. Although this
might not always be the case, it can take you a long way as you learn the tools
of the trade. If your topologies
interact over a network with external services, network/socket I/O contention
is bound to be a problem for your cluster. In our experience, the main cause
for this type of contention is that all of the ports allocated for opening
sockets are being used. Most Linux installs
will default to 1024 maximum open files/sockets per process. In an
I/O-intensive topology, it’s easy to hit that limit quickly. To determine the
limits of yourOS, you can
examine the /proc filesystem to check your processes limits. In order to do
this, you’ll first need to know your process ID. Once you do that, you can get
a listing of all limits for that process. Start by getting the PID (ps aux
| grep MyTopologyName) and then check your process limits from the /proc
filesystem. If you’re hitting
this limit, the Storm UI for your topology should display an exception in the
“Last Error” column that the max open files limit has been reached. This will
most likely be a stack trace starting with java.net.SocketException: Too
many open files. Let’s assume that your topology is experiencing reduced throughput or no throughput
at all, and you’re seeing errors for hitting the limit of open sockets. A couple of ways to
address this problem are as follows:
Increasing the number of
available ports on the worker node
Adding more worker nodes
to the cluster For increasing the
number of available ports, you’ll need to edit the/etc/security/limits.conf file on most Linux distributions. These settings
will set the hard and soft limit on open files per user. The value we’re
concerned with as a Storm user is the soft limit. I don’t advise going higher
than 128k. If you do, then as a rule of thumb (until you learn more about
soft/hard limits for number of files open on Linux), I suggest setting the hard
limit to two times the value of the soft limit. Note that you need super-user
access to change limits.conf and you’re going to need to reboot the system to
make sure they take effect. Increasing the
number of worker nodes in the cluster will give you access to more ports. If
you don’t have the resources to add more physical machines or VMs, you’ll have
to try the first solution. The first real contention
issue was the number of sockets available per machine. Don’t add more workers
on other machines until you’ve increased available sockets on each node as much
as you can. Once you’ve done that, you should also look at your code. Are you opening and
closing sockets all the time? If you can keep connections open, do that.
There’s this wonderful thing calledTCP_WAIT. It’s where a TCP connection will stay open after
you close it waiting for any stray data. If you’re on a slow network link (like
many were when TCP was first designed), this is a wonderful idea that helps
prevent data corruption. If you’re on a fast modern LAN, it’ll drive you
insane. You can tune your TCP stack via various OS-specific means to lower how
long you linger inTCP_WAIT, but when you’re making tons of network calls, even
that won’t be enough. Be smart: open and close connections as little as
possible. Disk I/O contention
affects how quickly you can write to disk. This could be a problem with Storm
but should be exceedingly rare. If you’re writing large volumes of data to your
logs or storing the output of calculations to files on the local filesystem, it
might be an issue, but that should be unlikely. If you have a
topology that’s writing data to disk and notice its throughput is lower than
what you’re expecting, you should check to see if the worker nodes it’s running
on are experiencing disk I/O contention. For Linux installations, you can run a
command called iotop to get a view of the disk I/O usage for the worker
nodes in question. This command displays a table of current I/O usage by processes/threads
in the system, with the most I/O-intensive processes/threads listed first. Let’s assume that you have a topology that reads/writes to/from disk, and it looks
like the worker nodes it’s running on are experiencing disk I/O contention. To address this
problem
Write less data to disk.
This can mean cutting back within your topology. It can also mean putting
fewer worker processes on each worker node if multiple worker processes
are demanding disk on the same worker node.
Get faster disks. This
could include using a RAM disk.
If you’re writing to NFS
or some other network filesystem, stop immediately. Writing to NFS is slow
and you’re setting yourself up for disk I/O contention if you do. If you’re writing
large amounts of data to disk and you don’t have fast disks, you’re going to
have to accept it as a bottleneck. There’s not much you can do without throwing
money at the problem. Summary
Several types of
contention exist above the topology level, so it’s helpful to be able to
monitor things like CPU, I/O, and memory usage for the operating system
your worker nodes are running on.
It is important to have
some level of familiarity with monitoring tools for the operating system
of the machines/VMs in your cluster. In Linux, these include sar, netstat, and iotop.
There’s value in knowing
common JVM startup options, such as -Xms, -Xmx, and those related to GC logging.
Although the Storm UI is
a great tool for the initial diagnosis of many types of contention, it’s
smart to have other types of monitoring at the machine/VM level to let you
know if something is awry.
Including custom
metrics/monitoring in your individual topologies will give you valuable
insights that the Storm UI may not be able to.
Be careful when
increasing the number of worker processes running on a worker node because
you can introduce memory and/or CPU contention at the worker node level.
Be careful when
decreasing the number of worker processes running on a worker node because
you can affect your topologies’ throughput while also introducing contention
for worker processes across your cluster. References Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew Jankowski, and Peter Pathirana
... View more
Labels:
10-27-2016
04:39 AM
@Constantin Stanca thanks for your response. Yes, this is for one time fix only. Cheers
... View more