Created on 12-12-2015 07:48 AM
Please find the attached "Storm/Kafka Best Practices Guide". This document is intended to be an unofficial guide to developing and deploying streaming applications using Storm and Kafka. This is not the official Hortonworks documentation, but meant to be a collection of best practices from teams implementing Storm and Kafka in Production.
Pre-Reading
If you are new to Storm Development or operations, please review the Tutorial under the Storm Documentation “Read these first” section. This document covers the basics of Storm components and operations.
The Storm developer has much opportunity to make the topology fast, efficient, and easier to run in Production by his Operations team. This section will discuss the best practices from the developer’s perspective to achieve these goals.
Occasionally a live topology may need to be suspended, removed, modified and redeployed against the live stream of messages in order to change its core behavior. To allow for re-deploying an updated topology and avoid having to re-play messages from Kafka the developer should take the following steps when writing their topology:
When using KafkaSpout and preparing to deploy to production, ensure the following:
KafkaSpout stores the offsets into zookeeper. Be very careful during the re-deployment if you set forceFromStart to true ( which can be the case when you first deploy the topology) in KafkaConfig of the KafkaSpout it will ignore stored zookeeper offsets. Setting forceFromStart to true is useful in development when you need to test the same messages multiple times, but make sure you set it to false when deploying to production, else you’ll re-process already consumed messages each time you restart your topology. In some cases, you may want to run multiple versions of the same, or similar topologies. If that is so, you may specify different client ids for each topology version. Kafka will treat different client IDs as a different consumer applications and will track topic offsets independently of one another.
Consider writing your topology so that the KafkaConfig.forceFromStart value is read from a properties file when your Topology’s main() method executes. This will allow your administrators to control whether the Kafka messages are replayed or not.
The following steps will allow you to deactivate and redeploy an existing topology without losing its current place in the message queue. This will help avoid having to replay messages:
To deploy a Storm topology and force it to replay messages from the beginning of the message queue:
Storm Supervisors are recommended to run under “Supervision”. This means that the storm worker processes will be automatically restarted if they fail for any reason. Instructions below to setup Supervision:
To configure Storm for operating under supervision: Follow the documentation located here:
Documentation - How To Configure Storm for Supervision
Before starting the ambari server and storm back up, due to a bug in the storm script that is targeted for HDP 2.2.3 the following change needs to be made manually on all hosts:
- cd /usr/hdp/current/storm-supervisor/bin/storm.distro
- Backup this file cp /usr/hdp/current/storm-storm/bin/storm.distro /usr/hdp/current/storm-storm/bin/storm.distro.orig
- Then edit this file to add the word "exec" to the last line like this so it should like this:
exec $PYTHON ${STORM_BIN_DIR}/storm.py $@
- ambari-server start
- service supervisord restart
- Start Storm through Ambari
Test that this is working:
[root@c6401 storm]# supervisorctl status
storm-supervisor RUNNING pid 8691, uptime 0:01:34
Control the number of workers per topology (post deployment) using “storm rebalance”
Existing workers are not killed to allow new topology submission. If new topology asks for something that can not be allocated, new topology will fail and existing ones will continue to run. You cannot change these jvm options for already running topologies.
Example:
# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
Control the number of workers per machine
The most impactful performance setting in Storm standalone is the number of worker slots available per machine. This setting is controlled with the parameter: “supervisor.slots.ports”. Each worker is assigned a port to use for communication and you can control the number of workers by assigning a new port. This is somewhat analogous to controlling the number of containers which run in a YARN topology.
Size of workers (in memory):
We recommend setting worker.childopts via Ambari similar to the following:
-Xmx2048m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true
Since workers are each allocated their own JVM, you can run many of them to in order to better utilize the available memory.
You can also change the worker settings per topology during submission by setting worker child opts “-c topology.worker.childopts”. Workers are per topology so existing workers for existing topologies will continue to run as is.
Supervisor memory settings
Large Memory allocation for Supervisors are not useful as it does not do much of the heavy lifting of Storm execution. Its responsibilities are to download topology jar when its a assignment and launch worker and monitor worker process. You can allocate more JVM mem for workers where the topology code executes. But even for workers allocating huge JVM not a good idea as the GC can trigger stop-the-world collection which can timeout workers from sending heartbeats telling nimbus/supervisor that the worker is dead causing it to restart.
How to auto-scale a topology while running in Production?
There is no support natively in storm itself for auto-scaling. Work is ongoing in the community with this JIRA https://issues.apache.org/jira/browse/STORM-594.
Kafka exposes JMX interface for monitoring operation of Kafka broker process. In order to connect via JMX:
End to End testing of the Storm/Kafka ecosystem will occasionally require testing Producing messages to Kafka, processing messages in Storm, and then writing downstream to a data store such as HBase or Hive. In this scenario you will first want to test each discrete component’s performance before testing the full end to end performance times. For example you will want to test each of the following individually:
Ambari 2.0
Includes pre-built alerts for Storm services. These can be configured to send email or SNMP notifications when triggered.
Custom monitoring for status of individual topologies
From HDP 2.2 we have rest api on Storm UI. One way I would recommend is to make a call to /api/v1/topology/summary get topology and its status (ACTIVE is good). More info on that apihttps://github.com/apache/storm/blob/master/STORM-UI-REST-API.md. If you are configuring nagios to monitor don’t make these calls too frequent as each call will be going to nimbus.
To check if they are not processing one option is to write a script which calls Nimbus via REST API, stores current total tuples (or at the component level), then in the next call checks if total tuples has increased from the previous state.
Example Script - please find the example below of how to automate checking for topologies which stop processing tuples:
https://gist.github.com/wesfloyd/e662f1d166440f58b...
How to measure and scale when the number of workers (parallelism) for when a given spout or bolt is undersized? Should we be able to infer this from Storm UI stats?
You can monitor the topology from Storm UI and check its capacity at the topology or component (bolt or spout) level. This is percentage if its equal or above 1.00 than that component is the bottleneck and depending what type of spout or bolt you might want to increase the parallelism. Incase of kafkaSpout increasing parallelism means you need to increase the topic partition.
Benchmarking
Running pre-established benchmarks can be a very helpful way to test scaling your cluster without having to develop a Storm topology from scratch.
Storm Benchmark tools authored by Taylor Goetz - https://github.com/ptgoetz/storm-benchmark
Tip: Modify the pom.xml file “storm.version” attribute to match your preferred version of Storm. HDP 2.2 currently ships with Storm version 0.9.3
Storm Benchmark authored by Manu Zhang - https://github.com/manuzhang/storm-benchmark
ScenarioImpactMonitoring NeedsRecovery ActionsNimbus service goes downRunning topologies continue to run.
No new topologies can be submitted.
No lifecycle management activities (get stats, balance, kill etc) can be performed on running topologies
Monitor nimbus process
using either mechanisms below
1. nc -vz <nimbus host> <nimbus port>
2. Ganglia indicating no metrics captured from nimbus in last few minutes
‘supervisord’ is set to auto restart nimbus when it goes downNimbus Host goes downRunning topologies continue to run.
No new topologies can be submitted.
No lifecycle management activities (get stats, balance, kill etc) can be performed on running topologies
-- same as above --Need to bring back nimbus Host. In case of hardware failure it can be started on another host as long as new hosts is assigned same hostname /ip address as before (physically or virtually)Storm supervisor failureHost with supervisor running is not available for processing-- same as above but use supervisor host and port --Storm is self healing. It will reschedule the work to other available supervisors and exiting topologies will continue to workWorker process failureJVM of worker process and all associated memory is lostStorm is self healing.
Supervisor will relaunch the worker process and topology will continue to run.
State maintained by topology components (if any) would need to be recreated
Log dirs fullWorkers will failPer host per disk monitoring for utilization.
Alert if disk usage goes beyond 80%
-- same as above --
Additionally disk monitors should be set to alert on disk usage going beyond 70%
UI service goes downWill not be able to see UI of running topologies in cluster
However running topologies continue to run.
New topologies can be submitted and managed.
Monitor UI process
using either mechanisms below
1. nc -vz <storm ui host> <storm ui port>
2. Ganglia indicating no metrics captured from UI service in last few minutes
‘supervisord’ is set to auto restart Storm UI when it goes downREST service goes downWill not be able to get metrics of running topologies in cluster
However running topologies continue to run.
New topologies can be submitted and managed.
-- same as above --‘supervisord’ should be set to auto restart REST server when it goes downLogviewer UI goes downWill not be able to get see logs in UI
However running topologies continue to run.
New topologies can be submitted and managed.
-- same as above --
Logviewer may be disabled in production
‘supervisord’ should is set to auto restart logviewer service when it goes downTopology fails with Kafka Offset Out of Sync errTopology stallsTopology acked events in last few minutes.
If it is zero, topology is stalled and needs to be investigated.
Proactively compare the event ingest rate in kafka with event processing rate in Storm per topology/topic.
Alert if rate if ingest is higher than the rate of processing.
Topology is running slower than ingest rate. Need to tune topology.
Temporarily ZK node storing kafka spout offset (/usually storm/topology/<topology name>) needs to be cleaned and topology to be restarted.
Consider increasing kafka retention period.
ScenarioImpactMonitoring NeedsRecovery ActionsDisk failureBroker dies. All topics get replicated to another available brokers.
Topics which have only one replication factor and have any partition residing on this disk, would suffer data loss.
Broker going down would result into lowered throughput of the system
Per host per disk utilization monitoring
These metrics are already captured in ganglia.
Alert if no disk stats received for more than few minutes
Kafka recovers data by replication. So no immediate action needed.
Recovery needs disk replacement and restart of Borker. Alternatively that specific brokers’ configuration can be modified to exclude bad disk and Broker can be restarted
Disk full-- same as above ---- same as above --
Alert if disk utilization goes beyond 80%
-- same as above --
Alerts should be set to monitor disk usage above 80%
Broker dead-- same as above --Monitor broker process
using either mechanisms below
1. nc -vz <host> <broker port>
2. Ganglia indicating no metrics captured from broker in last few minutes
Monitors like ‘monit‘or ‘supervisord’ can be set to auto restart broker on failureHost down-- same as above ---- same as above --
Additionally ganglia host level metrics
Need host reboot and broker restart.
Host and process monitors should alert unreachable hosts and unresponsive processes.
ScenarioImpactMonitoring NeedsRecovery ActionsZKeeper slownessKafka and/or Storm services may see timeouts.
They will retry.
ZK 4 letter command such as ‘stat’Need to identify the slowness reason (disk, n/w, workload etc) and then rectifyZKeeper quorum availabilityZK Service is unavailableZK 4 letter command such as ‘ruok’Need to start ZK services on zk hosts to meet quorum requirementsDisk full-- same as above --Disk level stats
Alert if disk utilization goes beyond 80%
ZK is HA enabled and failure of a ZK node does not impact availability of ZK until all available nodes meet the quorum.
Setup log retention period for all ZK nodes to auto rotate and clean old logs.
Neither Kafka nor Storm currently allows rolling upgrades without service interruption. However, the instructions below can be followed to achieve minimal downtime during upgrade and maintenance windows.
Default upgrade approach for major version changes:
Storm Upgrade for Minor Patches/Fixes:
Storm Upgrade with less downtime using Storm on YARN
Default upgrade approach:
Kafka Upgrade for Minor Patches/Fixes:
Includes additional best practices for initial development, performance debugging, and parallelism:
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/index.html#Item1.4
Created on 12-12-2015 07:48 AM
very valuable content! great work.
Created on 12-12-2015 07:48 AM
Great content. For developer productivity we should ask @dkumar@hortonworks.com to share the in-memory dev stack of ZK, Kafka, Storm, the one they are using in the labs and training.
Created on 12-12-2015 07:48 AM
Here is the mini cluster project.
Here is Dhruv's testing project:
Created on 12-15-2015 02:56 AM
Great article Wes!
Created on 02-24-2017 08:40 PM
Great article Wes!