Member since
03-16-2016
707
Posts
1752
Kudos Received
203
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
3732 | 09-21-2018 09:54 PM | |
4725 | 03-31-2018 03:59 AM | |
1490 | 03-31-2018 03:55 AM | |
1701 | 03-31-2018 03:31 AM | |
3767 | 03-27-2018 03:46 PM |
11-16-2016
10:27 PM
4 Kudos
@Gundrathi babu https://spark-packages.org/package/HyukjinKwon/spark-xml has been moved to databricks: https://github.com/databricks/spark-xml for Spark 2.0 or for older: https://github.com/databricks/spark-xml/tree/branch-0.3 You should start the shell like this (check the proper version of spark-xml package): spark-shell --packages com.databricks:spark-xml:0.1.1-s_2.10 +++ If this helped, please vote/accept best answer
... View more
11-16-2016
09:59 PM
11 Kudos
Introduction This article is a continuation of Geo-spatial
Queries with Hive Using ESRI Geometry Libraries article published a few months ago. Objective Demonstrate how to use Hive context and invoke built-in ESRI UDFs for Hive from Spark SQL. Pre-requisites
HDP 2.4.2 Steps documented on Geo-spatial
Queries with Hive Using ESRI Geometry Libraries Steps 1. Launch spark-shell with --jars as its parameter: spark-shell --jars /home/spark/esri/esri-geometry-api.jar,/home/spark/esri/spatial-sdk-hive-1.1.1-SNAPSHOT.jar
I placed the dependency jars to
/home/spark/esri path, but you can store them in hdfs or local filesystem and grant proper privileges to your spark user. 2.
Instantiate sqlContext: val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc); 3.
From spark-shell, define temporary functions: sqlContext.sql("""create temporary function st_point as 'com.esri.hadoop.hive.ST_Point'""");
sqlContext.sql("""create temporary function st_x as 'com.esri.hadoop.hive.ST_X'"""); 4. From
spark-shell, invoke your UDF: sqlContext.sql("""from geospatial.demo_shape_point select st_x(st_point(shape))""").show; Note: geospatial is the Hive database where demo_shape_point table was created Conclusion The Esri Geometry API for Java and
the Spatial
Framework for Hadoop could be used by developers building geometry
functions for various geo-spatial applications using also Spark, not only Hive.
... View more
Labels:
11-15-2016
12:11 AM
@Marcia Hon You mention that you installed R, but then your error point to rstudio-server. That is a different piece of software. I don't think that error is related to your R installation, but to your rstudio-server which points to R.
... View more
11-15-2016
12:08 AM
4 Kudos
@Marcia Hon Look at this question response: https://community.hortonworks.com/questions/63705/installing-r-as-ambari-service-on-hortonworks-sand.html
... View more
11-11-2016
05:24 PM
5 Kudos
@Daniel Scheiner It is not recommend nor supported to have NiFi co-existing on the same infrastructure with HDP2.5, especially using the same Ambari instance. There are ways to install it on the same infrastructure, but not using the Ambari instance for HDP 2.5. +++ If any response from this thread helped, please vote and accept best answer.
... View more
11-11-2016
05:21 PM
5 Kudos
@Daniel Scheiner QlikView override the execution engine at the runtime. It uses a cost-based optimizer and based on its stats running the query with MR makes more sense. Reach to you QlikView support to learn the reasoning. If this response helped, please vote and accept it as a best answer.
... View more
11-03-2016
07:59 PM
13 Kudos
Introduction This is a continuation of Apache Storm Tuning Approach for Squirrels article that I published couple weeks ago. A topology is going to have to coexist on a
Storm cluster with a variety of other topologies. Some of those topologies will
burn CPU doing heavy calculations, others will consume large amounts of network
bandwidth. No one sane can claim to provide a silver
bullet for how to set up your cluster for the best performance. However, I’d
like to share a few recipes and guidelines for dealing with issues as they
arise. I listed below 6 categories of contention, describing common problems and strategies to alleviate or eliminate resource contention. I hope that this article at least provides a good map for a deep dive onto Storm tuning. Each resource contention is probably worth it an article. Worker Processes in a Cluster A Storm cluster is installed with a fixed number of available worker processes across all worker nodes. Each time you deploy a new topology to the cluster, you specify how many worker processes that topology should consume. The number of worker processes a topology requests is specified in the code for building and submitting your topology to the Storm cluster. It is possible that you could deploy a topology that requires a certain number of worker processes but you can’t acquire the needed worker processes because they’ve all been assigned to existing topologies. This problem is easy to detect; it can be found by looking at the cluster summary page of the Storm UI and identifying the free slots. Slots correspond to worker processes. It’s important to always be aware of the resources available in your cluster when deploying new topologies. If you ignore what’s available within your cluster, you can easily affect every topology in your cluster by deploying something that consumes too many resources. Let’s assume that you notice a topology isn’t processing any data or has a sudden drop in throughput and zero free slots are available.You have a fixed number of worker processes that can be allocated to the topologies requesting them. You can address this problem with these strategies:
Decreasing the number of worker processes in use by existing topologies Increasing the total number of worker processes in the cluster Decreasing the number of worker processes in use by existing topologies is the quickest and easiest way to free up slots for other topologies in your cluster. But this may or may not be possible depending on the SLAs for your existing topologies. If you can reduce the number of worker processes being used by a topology without violating the SLA, then go for it. If your SLAs don’t allow you to reduce the number of slots being used by any of the topologies in your cluster, you’ll have to add new worker processes to the cluster. There are two ways to increase the total number of worker processes in the cluster. One is by adding more worker processes to your worker nodes. But this won’t work if your worker nodes don’t have the resources to support additional JVMs. If this is the case, you’ll need to add more worker nodes to your cluster, thus adding to the pool of worker processes. Adding new worker nodes has the least impact on existing topologies, because adding worker processes to existing nodes has the potential to cause other types of contention that must then be addressed. Topology Worker Nodes and Processes Let’s assume that you have a problematic topology and need to identify the worker nodes and worker processes that topology is executing on. The way to do this is by looking at the Storm UI. You could start checking out the Bolts section to see if anything looks amiss. Having identified the problematic bolt, you now want to see more details about what’s happening with that bolt. To do so, click on that bolt’s name in the UI to get a more detailed view for that bolt. From here, turn your attention to the Executors and Errors section for the individual bolt. The Executors section for an individual bolt is of particular interest; this tells you which worker nodes and worker processes the bolt is executing on. From here, given the type of contention being experienced, you can take the necessary steps to identify and solve the problem at hand. Though a great tool, the Storm UI may not always show you what you need. This is where additional monitoring can help. This can come in the form of monitoring the health of individual worker nodes or custom metrics in your bolt’s code to give you a deeper insight into how well the bolt is performing. The bottom line here is you shouldn’t rely solely on the Storm UI. Put other measures in place to make sure you have coverage everywhere. Now that you identified the contention, you'd like to change the number of worker processes running on a worker node. The number of worker processes running on a worker node is defined by thesupervisor.slots.portsproperty in each worker node’s storm.yaml configuration file. This property defines the ports that each worker process will use to listen for messages. To increase the number of worker processes that can be run on a worker node, add a port to this list for each worker process to be added. The opposite holds true for decreasing the number of worker processes: remove a port for each worker process to be removed. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. Upon restarting, Nimbus will be aware of the updated configuration and send messages to only the ports defined in this list. Another thing to consider is the number of worker nodes you have in your cluster. If widespread changes are needed, updating the configuration and restarting the Supervisor process across hundreds or even tens of nodes is a tedious and time-consuming task. So try to use tools like Puppet, Chef, Ansible. Worker Process Memory Just as you install
a Storm cluster with a fixed number of worker processes, you also set up each
worker process (JVM) with a fixed amount of memory it can grow to use. The
amount of memory limits the number of threads (executors) that can be launched
on that JVM—each thread takes a certain amount of memory (the default is 1 MB on
a 64-bit Linux JVM). JVM contention can
be a problem on a per-topology basis. The combination of memory used by your
bolts, spouts, threads, and so forth might exceed that allocated to the JVMs
they’re running on. JVM contention
usually manifests itself as out-of-memory (OOM) errors and/or excessively long
garbage collection (GC) pauses. OOM errors will appear in the Storm logs and
Storm UI, usually as a stack trace starting withjava.lang.OutOfMemory-Error: Java heap space. Gaining visibility
into GC issues requires a little more setup, but it’s something that’s easily
supported by both the JVM and Storm configuration. The JVM offers startup
options for tracking and logging GC usage, and Storm provides a way to specify
JVM startup options for your worker processes. Theworker.childoptsproperty in storm.yaml is where you’d specify these
JVM options. One interesting item to note is the value for
the –Xloggc setting.
Remember you can have multiple worker processes per worker node. The worker.childopts property
applies to all worker processes on a node, so specifying a regular log filename
would produce one log file for all the worker processes combined. A separate
log file per worker process would make tracking GC usage per JVM easier. Storm
provides a mechanism for logging a specific worker process; the ID variable is
unique for each worker process on a worker node. Therefore, you can add a "%ID%" string to
the GC log filename and you’ll get a separate GC log file for each worker
process. Let’s assume that your
spouts and/or bolts are attempting to consume more memory than what has been
allocated to the JVM, resulting in OOM errors and/or long GC pauses. You can address the
problem in a couple of ways:
By increasing the number
of worker processes being used by the topology in question
By increasing the size
of your JVMs By adding a worker
process to a topology, you’ll decrease the average load across all worker
processes for that topology. This should result in a smaller memory footprint
for each worker process (JVM), hopefully eliminating the JVM memory contention. Because increasing
the size of your JVMs could require you to change the size of the machines/VMs
they’re running on, we recommend the “increase worker processes” solution if
you can. Swapping and
balancing memory across JVMs has been one of our biggest challenges with Storm.
Different topologies will have different memory usage patterns. Don’t worry if
you don’t get it right initially. This is a never-ending process as the shape
of your cluster and topologies changes. Beware when
increasing the memory allocated to a JVM; as a rule of thumb, when you cross
certain key points you’ll notice a change in how long garbage collection
takes—500 MB, 1 GB, 2 GB, and 4 GB are all around the points when our GC time
has taken a jump. It’s more art than science, so bring your patience with you.
There’s nothing more frustrating than addressing OOM issues by increasing JVM
memory size only to have it noticeably impact GC times. The amount of memory allocated to all worker processes (JVMs) on a worker node can be changed in theworker.childoptsproperty in each worker node’s storm.yaml configuration file. This property accepts any valid JVM startup option, providing the ability to set the startup options for the initial memory allocation pool (-Xms) and maximum memory allocation pool (-Xmx) for the JVMs on the worker node. Changing this property will update all the worker processes on a particular worker node. Make sure that your worker node has enough resources for the memory increase. After updating this property, you’ll need to restart the Supervisor process on the worker node to apply the change. From my old days of J2EE applications tuning, I still recommend setting –Xms and –Xmx to the same value to eliminate heap management overhead. Along with being more efficient, this strategy adds the benefit of making it easier to reason about JVM memory usage, because the heap size is a fixed constant for the life of the JVM. Worker Node Memory Much like how an individual JVM has a limited amount of available memory, so does a worker node as a whole. In addition to the memory needed to run your Storm worker processes (JVMs), you need memory to run the Supervisor process and any other processes on your worker node without swapping. A worker node has a fixed amount of memory that’s being used by its worker processes along with any other processes running on that worker node. If a worker node is experiencing memory contention, that worker node will be swapping.Swapping is the little death and needs to be avoided if you care about latency and throughput. This is a problem when using Storm; each worker node needs to have enough memory so that the worker processes and OS don’t swap. If you want to maintain consistent performance, you must avoid swapping with Storm’s JVMs. One way to keep an eye on this in Linux is using System Activity Reporter. Use sar -S for reporting swap space utilization statistics. If you run a single worker process per worker node, it’s impossible to run into contention between workers on that node. This can make maintaining consistent performance within a cluster much easier. We know of more than one development team that has opted for this approach. If possible, we advise you to seriously consider going this route. This is a nonstarter if you aren’t running in a virtualized environment. The cost is simply too high to do this if you’re running on “bare metal” with a single OS instance per physical machine. Within a virtualized environment, you’ll use more resources by doing this. Assume for a moment that your OS install requires n GB of disk space and uses 2 GB of memory to run effectively. If you have eight workers running on your cluster and you assign four workers per node, you’d use n* 2 GB of disk and 4 GB of memory to run the OS on your cluster nodes. If you were to run a single worker per node, that would skyrocket to n* 8 GB of disk and 16 GB of memory. That’s a fourfold increase in a rather small cluster. Imagine the additional usage that would result if you had a cluster that was 16, 32, 128, or more nodes in size. If you’re running in an environment such as Amazon Web Services (AWS) where you pay per node, the costs can add up quickly. Therefore, we suggest this approach only if you’re running in a private virtualized environment where the cost of hardware is relatively fixed and you have disk and memory resources to spare. Let’s assume that your worker node is swapping due to contention for that node’s memory. Here are a few options: Increase the memory available to each worker node. This would mean giving more memory to the physical machine or VM, depending on how you configured your cluster. Lower the collective memory used by worker processes. Lowering the collective memory used by worker processes can be done in one of two ways. The first is by reducing the number of worker processes per worker node. Reducing the total number of worker processes will lower the overall memory footprint of the combined remaining processes. The second way is by reducing the size of your JVMs. Be careful when lowering memory allocated to existing JVMs, though, to avoid introducing memory contention within the JVM. One safe solution is to always go the route of increasing the memory available to each machine. It’s the simplest solution and its resulting ramifications are the easiest to understand. If you are tight on memory, lowering memory usage can work, but you open yourself up to all the problems we discussed concerning GC and OOM on a per-JVM basis. Worker Node CPU Worker node CPU
contention occurs when the demand for CPU cycles outstrips the amount available.
This is a problem when using Storm and is one of the primary sources of
contention in a Storm cluster. If your Storm topology’s throughput is lower
than what you expect it to be, you may want to check the worker node(s) running
your topology to see if CPU contention exists. One way to keep an
eye on this in Linux is with the sar -u command for displaying real-time CPU usage of all
CPUs. Let’s assume that the throughput of your topologies is low, and based
on running the sar command, you see that CPU contention exists. To address the
problem, you have the following options:
Increasing the number of
CPUs available to the machine. This is only possible in a virtualized
environment.
Upgrading to a more
powerful CPU (Amazon Web Services (AWS) type of environment).
Spreading the JVM load
across more worker nodes by lowering the number of worker processes per
worker node. To spread worker
process (JVM) load across more worker nodes, you need to reduce the number of
worker processes running on each worker node. Reducing the number of worker
processes per worker node results in less processing (CPU requests) being done
on each worker node. There are two scenarios you may find yourself in when
attempting this solution. The first is you have unused worker processes in your
cluster and can therefore reduce the number of worker processes on your
existing nodes, thus spreading the load. The second scenario
is where you don’t have any unused worker processes and therefore need to add
worker nodes in order to reduce the number of worker processes per worker node. Reducing the number
of worker processes per worker node is a good way to reduce the number of CPU
cycles being requested on each node. You just need to be aware of what
resources are available and in use and act appropriately in your given
scenario. Worker Node I/O I/O contention on a
worker node can fall under one of two categories:
Disk I/O contention,
reading from and writing to the file system
Network/socket I/O
contention, reading from and writing to a network via a socket Both types of
contention are regularly an issue for certain classes of Storm topologies. The
first step in determining if you’re experiencing either of these contentions is
to establish whether a worker node is experiencing I/O contention in general.
Once you do, you can dive down into the exact type of I/O contention your
worker node is suffering from. One way to
determine if a worker node in your cluster is experiencing I/O contention is by
running the sar -u command for displaying real-time CPU usage. A healthy topology that uses a lot of I/O
shouldn’t spend a lot of time waiting for the resources to become available.
That’s why we use 10.00% as the threshold at which you start experiencing
performance degradation. If you know what topologies are running on a
given worker node, you know that they use a lot of network resources or disk
I/O, and you see iowait problems, you can probably safely assume which of the two
is your issue. If you’re seeing troubling I/O contention signs, first attempt
to determine if you’re suffering from socket/network I/O contention. If you
aren’t, assume that you’re suffering from disk I/O contention. Although this
might not always be the case, it can take you a long way as you learn the tools
of the trade. If your topologies
interact over a network with external services, network/socket I/O contention
is bound to be a problem for your cluster. In our experience, the main cause
for this type of contention is that all of the ports allocated for opening
sockets are being used. Most Linux installs
will default to 1024 maximum open files/sockets per process. In an
I/O-intensive topology, it’s easy to hit that limit quickly. To determine the
limits of yourOS, you can
examine the /proc filesystem to check your processes limits. In order to do
this, you’ll first need to know your process ID. Once you do that, you can get
a listing of all limits for that process. Start by getting the PID (ps aux
| grep MyTopologyName) and then check your process limits from the /proc
filesystem. If you’re hitting
this limit, the Storm UI for your topology should display an exception in the
“Last Error” column that the max open files limit has been reached. This will
most likely be a stack trace starting with java.net.SocketException: Too
many open files. Let’s assume that your topology is experiencing reduced throughput or no throughput
at all, and you’re seeing errors for hitting the limit of open sockets. A couple of ways to
address this problem are as follows:
Increasing the number of
available ports on the worker node
Adding more worker nodes
to the cluster For increasing the
number of available ports, you’ll need to edit the/etc/security/limits.conf file on most Linux distributions. These settings
will set the hard and soft limit on open files per user. The value we’re
concerned with as a Storm user is the soft limit. I don’t advise going higher
than 128k. If you do, then as a rule of thumb (until you learn more about
soft/hard limits for number of files open on Linux), I suggest setting the hard
limit to two times the value of the soft limit. Note that you need super-user
access to change limits.conf and you’re going to need to reboot the system to
make sure they take effect. Increasing the
number of worker nodes in the cluster will give you access to more ports. If
you don’t have the resources to add more physical machines or VMs, you’ll have
to try the first solution. The first real contention
issue was the number of sockets available per machine. Don’t add more workers
on other machines until you’ve increased available sockets on each node as much
as you can. Once you’ve done that, you should also look at your code. Are you opening and
closing sockets all the time? If you can keep connections open, do that.
There’s this wonderful thing calledTCP_WAIT. It’s where a TCP connection will stay open after
you close it waiting for any stray data. If you’re on a slow network link (like
many were when TCP was first designed), this is a wonderful idea that helps
prevent data corruption. If you’re on a fast modern LAN, it’ll drive you
insane. You can tune your TCP stack via various OS-specific means to lower how
long you linger inTCP_WAIT, but when you’re making tons of network calls, even
that won’t be enough. Be smart: open and close connections as little as
possible. Disk I/O contention
affects how quickly you can write to disk. This could be a problem with Storm
but should be exceedingly rare. If you’re writing large volumes of data to your
logs or storing the output of calculations to files on the local filesystem, it
might be an issue, but that should be unlikely. If you have a
topology that’s writing data to disk and notice its throughput is lower than
what you’re expecting, you should check to see if the worker nodes it’s running
on are experiencing disk I/O contention. For Linux installations, you can run a
command called iotop to get a view of the disk I/O usage for the worker
nodes in question. This command displays a table of current I/O usage by processes/threads
in the system, with the most I/O-intensive processes/threads listed first. Let’s assume that you have a topology that reads/writes to/from disk, and it looks
like the worker nodes it’s running on are experiencing disk I/O contention. To address this
problem
Write less data to disk.
This can mean cutting back within your topology. It can also mean putting
fewer worker processes on each worker node if multiple worker processes
are demanding disk on the same worker node.
Get faster disks. This
could include using a RAM disk.
If you’re writing to NFS
or some other network filesystem, stop immediately. Writing to NFS is slow
and you’re setting yourself up for disk I/O contention if you do. If you’re writing
large amounts of data to disk and you don’t have fast disks, you’re going to
have to accept it as a bottleneck. There’s not much you can do without throwing
money at the problem. Summary
Several types of
contention exist above the topology level, so it’s helpful to be able to
monitor things like CPU, I/O, and memory usage for the operating system
your worker nodes are running on.
It is important to have
some level of familiarity with monitoring tools for the operating system
of the machines/VMs in your cluster. In Linux, these include sar, netstat, and iotop.
There’s value in knowing
common JVM startup options, such as -Xms, -Xmx, and those related to GC logging.
Although the Storm UI is
a great tool for the initial diagnosis of many types of contention, it’s
smart to have other types of monitoring at the machine/VM level to let you
know if something is awry.
Including custom
metrics/monitoring in your individual topologies will give you valuable
insights that the Storm UI may not be able to.
Be careful when
increasing the number of worker processes running on a worker node because
you can introduce memory and/or CPU contention at the worker node level.
Be careful when
decreasing the number of worker processes running on a worker node because
you can affect your topologies’ throughput while also introducing contention
for worker processes across your cluster. References Storm Applied: Strategies for real-time event processing by Sean T. Allen, Matthew Jankowski, and Peter Pathirana
... View more
Labels:
11-01-2016
03:32 PM
1 Kudo
@Hanife Shaik This is a long discussion for a single question in HCC. Thanks for additional clarification, but the response is the same I provided earlier: HBase. I already covered the Cassandra vs. HBase. It may be true that Cassandra may allow faster writes than HBase and that HBase may allow faster scans, but at the end of the day within your Hadoop ecosystem HBase is better supported and you can take advantage of HBase snapshots and TTL capabilities which will prove key for your analytics. Here is the approach, at high level: Your web app writes to HBase. HBase keeps version of your data and provides TTL (time to live) capabilities. As such, insert/update/delete is just another version of your data. You can then use HBase snapshots for analytics using Spark and Phoenix. HBase snapshots do not impact region servers and as such your writes are not impacted and you have a nice isolation between your fast and many writes (OLTP) and reads needed for analytics (OLAP).
... View more
11-01-2016
03:24 PM
4 Kudos
@Hanife Shaik This question seems to be duplicate of this question: https://community.hortonworks.com/questions/64251/can-we-use-hbasecassandra-as-database-for-aspnet-w.html#answer-64317
... View more
11-01-2016
01:25 AM
9 Kudos
@Hanife Shaik Based on data growth you mentioned, you definitely need to pick on database that can deal with BIG DATA. I would like to point-out that if you plan to use an Hadoop ecosystem to take advantage of other tools like Spark, Storm, Kafka, Hive, HDFS etc. the logical choice would be HBase. Don't forget that all the data that your web application generates may be subject of some machine learning for some recommendation engine that may add even more value. Having capacity to store so much data in HDFS and having the power of Spark to process it, could be differentiator for your business. You will see how valuable are those tools and how important is to have a solid integration in a single platform like Hortonworks Data Platform. Rather than going with some exotic frameworks, I suggest to use REST: http://hbase.apache.org/book.html#_rest. If you Google you will see a few frameworks for .NET, but none is consistently developed. However, REST is the universal method to decouple systems and technologies. @Umair Khan mentions Redis. While Redis is good for low latency key-value in-memory processing, it is not the usual choice for your data growth as presented, 120-160 TB in 2 years. I can't image a Redis Cluster dealing with 120-160 TB, to not mention year 3 - 240 - 320 TB. Redis could be used as an accelerator for some of the use cases applicable to your application. +++ If any response in this thread helped with your question, please vote and accept best answer.
... View more