Member since
11-07-2016
58
Posts
26
Kudos Received
6
Solutions
01-02-2018
09:27 PM
1 Kudo
For certain large environments, it's very easy to for Spark History Server to get overwhelmed by the large number of applications being posted and number of users / developers viewing history data. Spark jobs create an artifact called the history file which is what is parsed by the Spark History Server (SHS) and served via the UI. The size of this file has a huge impact in driving the load on the SHS also note that the size of history file is determined by the number of events generated by the SHS (small executor heart beat interval) Workaround: If you are still interested in analyzing performance issues with these large history files, one option is to download these files and browse them from a locally hosted SHS instance. To run this: Download Spark 1.6 https://spark.apache.org/downloads.html Unpack Create a directory to hold the logs called spark-logs Create a properties file called test.properties Inside test.properties add spark.history.fs.logDirectory=<path to the spark-logs directory> <spark download>/sbin/start-history-server.sh --properties-file <path to test.properties> Open web browser and visit http://localhost:18080 Once done, you can now download Spark History files from HDFS and copy them to this directory. The running Spark History Server will dynamically load the files as they are made available in spark-logs directory.
... View more
Labels:
11-01-2017
12:15 AM
Abstract:
Nimbus metrics are critical to operations as well as development teams for monitoring the performance and stability of Storm applications / topology. Usually most production environments have a metrics / operations monitoring systems including solr, elasticsearch, tsdbs etc. This post shows you; how you can use Collectd to forward these metrics over to your desired metrics environment and alert on them.
Solution:
Collectd is a standard metrics collection tool that can be run natively on linux operating systems. It's capable of capturing a wide variety of metrics, you can find more information on Collectd here: https://collectd.org/
So to capture Storm nimbus metrics, here's a collectd plugin that needs to be complied and built: https://github.com/srotya/storm-collectd (using Maven). Simply run:
mvn clean package assembly:single
In addition, you will need to install collectd and ensure that it has Java plugin capability. Here's a great post on how to do that:
http://blog.asquareb.com/blog/2014/06/09/enabling-java-plugin-for-collectd/ (Please note that the JAR="/path/to/jar" JAVAC="/path/to/javac" variables need to be fixed before you can run it)
Once installed, you will need to configure collectd using the following: (DON'T FORGET TO CONFIGURE OUTPUT PLUGIN)
LoadPlugin java
<Plugin "java">
# required JVM argument is the classpath
# JVMArg "-Djava.class.path=/installpath/collectd/share/collectd/java"
# Since version 4.8.4 (commit c983405) the API and GenericJMX plugin are
# provided as .jar files.
JVMARG "-Djava.class.path=<ABSOLUTE PATH>/lib/collectd-api.jar:<ABSOLUTE PATH>/target/storm-collectd-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
LoadPlugin "com.srotya.collectd.storm.StormNimbusMetrics"
<Plugin "storm">
address "http://localhost:8084/"
kerberos false
jaas "<PATH TO JAAS CONF>"
</Plugin>
</Plugin>
... View more
Labels:
10-31-2017
11:59 PM
2 Kudos
Problem: If you have an AD/LDAP environment and using HDP with Ranger, it's critical to review the case in which usernames and group ids are stored in your Directory Services environment. Ranger authorization is case sensitive therefore if the username / group id doesn't match the one returned from Directory (AD/LDAP) authorization will be denied Solution: To solve this problem Ranger offers 2 parameters that can be set via Ambari. This should ideally be done at install time to avoid the need to re-sync all users. Ranger usersync properties for case conversion are:
ranger.usersync.ldap.username.caseconversion ranger.usersync.ldap.groupname.caseconversion You can set these properties to lower or upper; this will make sure that Ranger will store the usernames and groups in the above specified format in it's local database therefore when users login their authorization parameter will match correctly.
... View more
Labels:
01-05-2017
06:23 PM
5 Kudos
General Recommendations - Only fail tuples if there's a recoverable exception - Use an Exception/Error Logging Bolt/Stream and send the output to a Kafka topic so all logs are consolidated in place - Be careful about supervisor saturation / overallocation i.e. running more threads than available CPU resources - Configure parallelism based on throughput requirements (use Throughput Equation to calculate) - Be aware of data spikes and plan for them in your topology / use cases / operations - If using Kafka, setup Kafka Producer-Consumer Lag analysis and dashboard - Benchmark topologies to assess "health" operations numbers for individual bolt performance
## Troubleshooting topology performance issues: - Check the topology stats from the Nimbus UI - Look for Failures (in spout or specific bolt) if any that module is having issues. - If a topology has Kafka error bolt then open a console consumer to looks at the error logs from Kafka - Now find out what supervisor the worker is running on and start tailing the logs of the topology (log files are named by name of the topology and the deployment number; you can find this from the nimbus ui) If you are seeing lag building there can be 2 major reasons for this: - a topology component is slow (bolt or spout) - the end sink is having issues; again sink specific exceptions will be present in output of the error bolt; follow instructions above to see the logs Topology issues due to slow bolts Topologies can have performance issues because some bolts are running slow. Bolts can be either be running slow due to 2 root causes: 1. the system they interact with is slow 2. the topology received a data spikes Mostly slow bolts are usually the ones interacting with other external systems e.g. lookup bolts or writer bolts.
Such bolts are slow because their external systems are experiencing transient performance slowdowns e.g. elasticsearch recovery, hbase region split, major gc etc. **How to find out what's the root cause of a slow bolt?** Analyze the capacity numbers and execute latencies, a slow bolt is the one with capacity >1.0 meaning there aren't enough bolt instances for the amount of data being processed.
Now next step is to find out if the root cause is external system slowdown or data spike. If execute latencies are reasonable i.e. around your normal value the external system is not the bottleneck therefore the root cause is data spike. Troubleshooting external system bottleneck External system bottleneck should be resolved before topology performance can be restored. Sometimes it may be beneficial to pause or stop the topology in order to recover the external system e.g. topology based data ingestion. ``Note: Sometimes data spike can trigger external system slowdown`` Troubleshooting data spike Data spikes can be resolved by adding more parallelism to the topology. This implies balanced addition of executors and workers i.e. incrementally adding more parallelism to the specific bolt and analyzing capacity numbers. This process should be repeated until the bolt capacity is below 1.0 ``Note: Add more executors only when the bolt latencies are low or code is linearly scaling`` Calculating current throughput Vs. expected throughput is critical for determining the correct configurations for parallelism and worker count. **Throughput = Executor count * 1000/(process latency) * Capacity** ``Note: Adding more executors doesn’t solve the problem if the supervisors are saturated`` Adding more workers to the topology to evenly balance out the load across supervisors is critical for good performance. If there are more executors scheduled per worker than there's CPU capacity for that worker this will result in CPU resource contention leading to cascading performance effects across other topology workers running on that supervisor. A key way to analyzing over allocation is summing the number of executors in the cluster and dividing them by sum of CPU cores across all supervisors. If this number is greater than 2 than you have overallocated resources however this may not result in performance issues as overallocating executors for IO bound bolts is sometimes required for performance and parallelism. Known Issues Ackers Number of ackers in Storm may get bottlenecked if there are too few of them. They may also get bottlenecked if there aren't enough ackers per worker because of network issues (seen this in AWS multi-az deployments) Tuple Failures Tuple failures happen because of 3 reasons: 1. Tuple timeouts Tuple timeouts are the trickiest to resolve. They could happen due to any of the following reasons: - Slow sink and bad max spout pending - Micro-batching and un-tunned batch sizes (data is not coming is fast enough for your batches to fill and you don't have time based acking) - Un-tunned max spout pending setting - In-correct tuple message timeout (default 30 seconds) 2. Bolt Exceptions Bolt exceptions will show in topology worker logs. 3. Explicit failures in the code i.e. calling collector.fail() method Explicit failures will be shown in Nimbus UI, in failure counts corresponding to the respective bolts. Saturated Workers Storm has the concept of slots which can only allow to limit saturation and overallocation on memory but not CPU / threads. This is something that has to be manually maintained by engineers as in be careful not to over provision number of threads (executors) / workers.
Symptoms: topology will appear to run slow without any tuple failures however lag is going to continue to build.
... View more
Labels:
11-23-2016
06:39 PM
@Constantin Stanca thank you for the feedback. I have reworded the point and added some explanations; please let me know your thoughts.
... View more
11-19-2016
09:24 PM
6 Kudos
The objective of this article is to have a discussion and capture critical points for consideration when developing a Streaming Application using Storm, Spark or any other streaming technology. Intent is to add additional points from discussion and comments to evolve this article into a comprehensive guideline for Stream Engineering. Here are a few points synthesized from my experience developing streaming applications using Storm and Flume: 1. Select Streaming Application’s SLAs requirements Most Streaming Applications can be classified based on their SLA requirements into: At-most once (data drops are acceptable) At-least once (data drops are NOT acceptable) Exactly once (idempotent computations) These requirements tend to heavily drive design requirements like whether or not to ack tuples (in Storm), or if downsampling is acceptable (sensor data processing) 2. Minimal and careful data replays In Guaranteed Processing SLA use cases, data replay must be minimized by application logic to avoid situations like replay loops and heavy back pressure. This situation is seen in poorly designed topologies where incorrect exception handling leads to a single datapoint being infinitely replayed. 3. Minimize processing latencies Latencies for processing individual events/tuples adds up to the cumulative processing latencies therefore streaming application must be engineered for low latency using appropriate technologies for local in-memory caching and micro-batching where possible to minimize network latencies and amortize the cost over several calls. 4. Tradeoffs between Throughput and Latency Performance tradeoffs usually are between Throughput Vs. Latency and can be tuned using Micro-batching (Storm Trident or Spark Streaming). Micro-batching approach may use time-based or size-based batches both of which have some caveats. Size-based micro-batching may add latencies since the buffer must fill up before processing applied or transfers are executed and is therefore subject to event velocity. This model can't be used if there are hard latencies limits for the application however, if a sustained minimum event rate (velocity) is guaranteed then micro-batching can be applied while preserving acceptable latencies. Time-based micro-batching is subject to stability and performance issues if spikes (volume and velocity) are not accounted. Time-based micro-batching can satisfy hard-latency constraints of a use case. A hybrid model of micro-batching can also be deployed which uses both size based and time based batching and has hard limits on both to guarantee low latencies and high throughput while providing stability. 5. Aggregation Bottlenecks Use cases where Streaming Aggregations are performed must account for event volume at the pivot point of aggregation (aggregation key) to avoid bottlenecks and pipeline back pressures account for the distribution of event volume across aggregation keys 6. Polling and Event Sourcing Polling and Event Sourcing are two prominent design patterns for updating configuration and logic in a Streaming pipeline. Logic may include but is not limited to: Dynamic Filtering, Caching for data enrichment and Machine Learning Models. In Polling based design these updates are polled for at a pre-determined frequency in an out-of-band fashion (separate update thread). In Event Sourcing based design these updates are delivered to the processing component as Event and the processing component has a separate code branch (if statement) to handle these kinds of events and execute logic update. Using Event Souring approach allows for a lock-free design whereas Polling based design requires locking (at some point) and concurrent data structures to be used to guarantee consistency. 7. Error Handling Error Handling must be given thorough design thought as incorrect error handling will lead to application downtime and performance issues. Errors that are recoverable may allow for replays like network unavailability or failover, however unrecoverable errors must be written to a separate stream without blocking the primary execution pipeline. NiFi handles this very seamlessly using Success and Failure streams.
... View more