Member since
09-23-2015
88
Posts
109
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
7059 | 08-24-2016 09:13 PM |
01-06-2016
08:59 PM
Super helpful - thanks!
... View more
01-06-2016
04:04 PM
2 Kudos
Two questions on Solr: Sizing for Solr Cores (collections): If I need to index 1 TB of data via Solr – do we have any knowledge on how large the Solr data footprint would be? At what point does it make sense to store the cores in HDFS vs local disk to Solr server? Response latency expectations - Will solr always return indexed fields quickly ( < 1 s) regardless of the data size? Or should we think of it more like HBase where fast results depend on memory cache strategy and overall data size?
... View more
Labels:
- Labels:
-
Apache Solr
12-16-2015
06:56 PM
1 Kudo
Does anyone know of a way to view the statistics which are created after the command "analyze table [myTable] compute statistics;" is executed? Referenced from here: http://hortonworks.com/blog/5-ways-make-hive-queries-run-faster/
... View more
Labels:
- Labels:
-
Apache Hive
12-12-2015
07:48 AM
41 Kudos
Please find the attached "Storm/Kafka Best Practices Guide". This document is intended to be an unofficial guide to developing and deploying streaming applications using Storm and Kafka. This is not the official Hortonworks documentation, but meant to be a collection of best practices from teams implementing Storm and Kafka in Production.
Pre-Reading If you are new to Storm Development or operations, please review the Tutorial under the Storm Documentation “Read these first” section. This document covers the basics of Storm components and operations. Development The Storm developer has much opportunity to make the topology fast, efficient, and easier to run in Production by his Operations team. This section will discuss the best practices from the developer’s perspective to achieve these goals. Developing for highest Performance Keep the Tuple processing code as light as possible. Code so that the execute() method in your Bolts can run as quickly as possible. This will have multiplier effects when you deploy at scale. For bolt-local cache’s consider using Google’s Guava cache if the memory requirements are small (less than 1GB). For cache’s which share data across multiple bolts consider using HBase, Phoenix, Redis, or MemcacheD. Externalize as much of your Storm Topology configuration as possible to a properties file that is read when the topology is launched. This allows operations to help turn knobs to improve performance without re-compiling code. Allowing for ease of re-deployment Occasionally a live topology may need to be suspended, removed, modified and redeployed against the live stream of messages in order to change its core behavior. To allow for re-deploying an updated topology and avoid having to re-play messages from Kafka the developer should take the following steps when writing their topology: When using KafkaSpout and preparing to deploy to production, ensure the following: In your SpoutConfig “id” and “ zkroot" do NOT change after redeploying the new version of the topology. Storm uses the“ zkroot”, “id” to store the topic offset into zookeeper. KafkaConfig.forceFromStart is set to false. KafkaSpout stores the offsets into zookeeper. Be very careful during the re-deployment if you set forceFromStart to true ( which can be the case when you first deploy the topology) in KafkaConfig of the KafkaSpout it will ignore stored zookeeper offsets. Setting forceFromStart to true is useful in development when you need to test the same messages multiple times, but make sure you set it to false when deploying to production, else you’ll re-process already consumed messages each time you restart your topology. In some cases, you may want to run multiple versions of the same, or similar topologies. If that is so, you may specify different client ids for each topology version. Kafka will treat different client IDs as a different consumer applications and will track topic offsets independently of one another. Consider writing your topology so that the KafkaConfig.forceFromStart value is read from a properties file when your Topology’s main() method executes. This will allow your administrators to control whether the Kafka messages are replayed or not. How to redeploy an updated topology The following steps will allow you to deactivate and redeploy an existing topology without losing its current place in the message queue. This will help avoid having to replay messages: Deactivate the topology: storm deactivate [topology name] Kill the topology: storm kill [topology name] Deploy the new version of the topology with the same SpoutConfig “id” and “zkroot” value. To deploy a Storm topology and force it to replay messages from the beginning of the message queue: Deactivate the topology Kill the topology Option 1: Remove ZKRoot for your consumer group
Run zookeeper CLI: /usr/hdp/current/zookeeper-client/bin/zkCli.sh Navigate to the the zkroot: ls /storm/consumers/[my_consumer_id]/offsets/[my_topic] Remove the zookeeper node for zkroot: rmr /storm/[value of zkRoot] Start topology per nomal Option 2: start topology starting from 0 offset
Start topology with Kafka Spout config.forceFromStart=true. This is most often set via command line parameter into the application. Initial Storm Setup Storm Supervisors are recommended to run under “Supervision”. This means that the storm worker processes will be automatically restarted if they fail for any reason. Instructions below to setup Supervision: To configure Storm for operating under supervision: Follow the documentation located here: Documentation - How To Configure Storm for Supervision Before starting the ambari server and storm back up, due to a bug in the storm script that is targeted for HDP 2.2.3 the following change needs to be made manually on all hosts: - cd /usr/hdp/current/storm-supervisor/bin/storm.distro - Backup this file cp /usr/hdp/current/storm-storm/bin/storm.distro /usr/hdp/current/storm-storm/bin/storm.distro.orig - Then edit this file to add the word "exec" to the last line like this so it should like this: exec $PYTHON ${STORM_BIN_DIR}/storm.py $@ - ambari-server start - service supervisord restart - Start Storm through Ambari Test that this is working: [root@c6401 storm]# supervisorctl status storm-supervisor RUNNING pid 8691, uptime 0:01:34 Sizing and Resizing Storm Resources Control the number of workers per topology (post deployment) using “storm rebalance” Existing workers are not killed to allow new topology submission. If new topology asks for something that can not be allocated, new topology will fail and existing ones will continue to run. You cannot change these jvm options for already running topologies. Example: # Reconfigure the topology "mytopology" to use 5 worker processes, # the spout "blue-spout" to use 3 executors and # the bolt "yellow-bolt" to use 10 executors. storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 Control the number of workers per machine The most impactful performance setting in Storm standalone is the number of worker slots available per machine. This setting is controlled with the parameter: “supervisor.slots.ports”. Each worker is assigned a port to use for communication and you can control the number of workers by assigning a new port. This is somewhat analogous to controlling the number of containers which run in a YARN topology. Size of workers (in memory): We recommend setting worker.childopts via Ambari similar to the following: -Xmx2048m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled -Djava.net.preferIPv4Stack=true Since workers are each allocated their own JVM, you can run many of them to in order to better utilize the available memory. You can also change the worker settings per topology during submission by setting worker child opts “-c topology.worker.childopts”. Workers are per topology so existing workers for existing topologies will continue to run as is. Supervisor memory settings Large Memory allocation for Supervisors are not useful as it does not do much of the heavy lifting of Storm execution. Its responsibilities are to download topology jar when its a assignment and launch worker and monitor worker process. You can allocate more JVM mem for workers where the topology code executes. But even for workers allocating huge JVM not a good idea as the GC can trigger stop-the-world collection which can timeout workers from sending heartbeats telling nimbus/supervisor that the worker is dead causing it to restart. How to auto-scale a topology while running in Production? There is no support natively in storm itself for auto-scaling. Work is ongoing in the community with this JIRA https://issues.apache.org/jira/browse/STORM-594. Kafka Tuning Recommendations Kafka Brokers per Server
Recommend 1 Kafka broker per server- Kafka not only disk-intensive but can be network intensive so if you run multiple broker in a single host network I/O can be the bottleneck . Running single broker per host and having a cluster will give you better availability. Increase Disks allocated to Kafka Broker
Kafka parallelism is largely driven by the number of disks and partitions per topic. From the Kafka documentation: “We recommend using multiple drives to get good throughput and not sharing the same drives used for Kafka data with application logs or other OS filesystem activity to ensure good latency. As of 0.8 you can format and mount each drive as its own directory. If you configure multiple data directories partitions will be assigned round-robin to data directories. Each partition will be entirely in one of the data directories. If data is not well balanced among partitions this can lead to load imbalance between disks.” Number of Threads
Make sure you set num.io.threads to at least no.of disks you are going to use by default its 8. It be can higher than the number of disks. Set num.network.threads higher based on number of concurrent producers, consumers, and replication factor. Number of partitions
Ideally you want to assign the default number of partitions (num.partitions) to at least n-1 servers. This can break up the write workload and it allows for greater parallelism on the consumer side. Remember that Kafka does total ordering within a partition, not over multiple partitions, so make sure you partition intelligently on the producer side to parcel up units of work that might span multiple messages/events. Message Size
Kafka is designed for small messages. I recommend you to avoid using kafka for larger messages. If thats not avoidable there are several ways to go about sending larger messages like 1MB. Use compression if the original message is json, xml or text using compression is the best option to reduce the size. Large messages will affect your performance and throughput. Check your topic partitions and replica.fetch.size to make sure it doesn’t go over your physical ram. Large Messages
Another approach is to break the message into smaller chunks and use the same message key to send it same partition. This way you are sending small messages and these can be re-assembled at the consumer side. Broker side: message.max.bytes defaults to 1000000 . This indicates the maximum size of message that a kafka broker will accept. replica.fetch.max.bytes defaults to 1MB . This has to be bigger than message.max.bytes otherwise brokers will not be able to replicate messages. Consumer side:
fetch.message.max.bytes defaults to 1MB. This indicates maximum size of a message that a consumer can read. This should be equal or larger than message.max.bytes. Kafka Heap Size
By default kafka-broker jvm is set to 1Gb this can be increased using Ambari kafka-env template. When you are sending large messages JVM garbage collection can be an issue. Try to keep the Kafka Heap size below 4GB. Example: In kafka-env.sh add following settings. export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g" export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80" Dedicated Zookeeper
Have a separate zookeeper cluster dedicated to Storm/Kafka operations. This will improve Storm/Kafka’s performance for writing offsets to Zookeeper, it will not be competing with HBase or other components for read/write access. ZK on separate nodes from Kafka Broker
Do Not Install zk nodes on the same node as kafka broker if you want optimal Kafka performance. Disk I/O both kafka and zk are disk I/O intensive. Disk Tuning sections
Please review the Kafka documentation on filesystem tuning parameters here. Disable THP according to documentation here. Either ext4 or xfs filesystems are recommended for performance benefit. Minimal replication
If you are doing replication, start with 2x rather than 3x for Kafka clusters larger than 3 machines. Alternatively, use 2x even if a 3 node cluster if you are able to reprocess upstream from your source. Avoid Cross Rack Kafka deployments
Avoid cross-rack Kafka deployments for now until Kafka 0.8.2 - see: https://issues.apache.org/jira/browse/KAFKA-1215 Kafka Monitoring via JMX Kafka exposes JMX interface for monitoring operation of Kafka broker process. In order to connect via JMX: Modify kafka-env.sh to add line: export JMX_PORT=9999 Restart Kafka broker Connect to using JMX client of choice. Example service URL: service:jmx:rmi:///jndi/rmi://[your_Kafka_host]:9999/jmxrmi Storm and Kafka Performance Testing Recommendations on End to End testing End to End testing of the Storm/Kafka ecosystem will occasionally require testing Producing messages to Kafka, processing messages in Storm, and then writing downstream to a data store such as HBase or Hive. In this scenario you will first want to test each discrete component’s performance before testing the full end to end performance times. For example you will want to test each of the following individually: Kafka performance when producing (writing) directly to Kafka Storm performance with a dummy spout which creates fake messages and writes downstream to your data store Storm performance with a live Kafka spout which reads messages, connected to a bolt which only acks the messages from Kafka. Storm Monitoring and Alerting Ambari 2.0 Includes pre-built alerts for Storm services. These can be configured to send email or SNMP notifications when triggered. Custom monitoring for status of individual topologies From HDP 2.2 we have rest api on Storm UI. One way I would recommend is to make a call to /api/v1/topology/summary get topology and its status (ACTIVE is good). More info on that apihttps://github.com/apache/storm/blob/master/STORM-UI-REST-API.md. If you are configuring nagios to monitor don’t make these calls too frequent as each call will be going to nimbus. To check if they are not processing one option is to write a script which calls Nimbus via REST API, stores current total tuples (or at the component level), then in the next call checks if total tuples has increased from the previous state. Example Script - please find the example below of how to automate checking for topologies which stop processing tuples: https://gist.github.com/wesfloyd/e662f1d166440f58b... How to measure and scale when the number of workers (parallelism) for when a given spout or bolt is undersized? Should we be able to infer this from Storm UI stats? You can monitor the topology from Storm UI and check its capacity at the topology or component (bolt or spout) level. This is percentage if its equal or above 1.00 than that component is the bottleneck and depending what type of spout or bolt you might want to increase the parallelism. Incase of kafkaSpout increasing parallelism means you need to increase the topic partition. Benchmarking Running pre-established benchmarks can be a very helpful way to test scaling your cluster without having to develop a Storm topology from scratch. Storm Benchmark tools authored by Taylor Goetz - https://github.com/ptgoetz/storm-benchmark Tip: Modify the pom.xml file “storm.version” attribute to match your preferred version of Storm. HDP 2.2 currently ships with Storm version 0.9.3 Storm Benchmark authored by Manu Zhang - https://github.com/manuzhang/storm-benchmark Failure Scenarios Storm ScenarioImpactMonitoring NeedsRecovery ActionsNimbus service goes downRunning topologies continue to run. No new topologies can be submitted. No lifecycle management activities (get stats, balance, kill etc) can be performed on running topologies Monitor nimbus process using either mechanisms below 1. nc -vz <nimbus host> <nimbus port> 2. Ganglia indicating no metrics captured from nimbus in last few minutes ‘supervisord’ is set to auto restart nimbus when it goes downNimbus Host goes downRunning topologies continue to run. No new topologies can be submitted. No lifecycle management activities (get stats, balance, kill etc) can be performed on running topologies -- same as above --Need to bring back nimbus Host. In case of hardware failure it can be started on another host as long as new hosts is assigned same hostname /ip address as before (physically or virtually)Storm supervisor failureHost with supervisor running is not available for processing-- same as above but use supervisor host and port --Storm is self healing. It will reschedule the work to other available supervisors and exiting topologies will continue to workWorker process failureJVM of worker process and all associated memory is lostStorm is self healing. Supervisor will relaunch the worker process and topology will continue to run. State maintained by topology components (if any) would need to be recreated Log dirs fullWorkers will failPer host per disk monitoring for utilization. Alert if disk usage goes beyond 80% -- same as above -- Additionally disk monitors should be set to alert on disk usage going beyond 70% UI service goes downWill not be able to see UI of running topologies in cluster However running topologies continue to run. New topologies can be submitted and managed. Monitor UI process using either mechanisms below 1. nc -vz <storm ui host> <storm ui port> 2. Ganglia indicating no metrics captured from UI service in last few minutes ‘supervisord’ is set to auto restart Storm UI when it goes downREST service goes downWill not be able to get metrics of running topologies in cluster However running topologies continue to run. New topologies can be submitted and managed. -- same as above --‘supervisord’ should be set to auto restart REST server when it goes downLogviewer UI goes downWill not be able to get see logs in UI However running topologies continue to run. New topologies can be submitted and managed. -- same as above -- Logviewer may be disabled in production ‘supervisord’ should is set to auto restart logviewer service when it goes downTopology fails with Kafka Offset Out of Sync errTopology stallsTopology acked events in last few minutes. If it is zero, topology is stalled and needs to be investigated. Proactively compare the event ingest rate in kafka with event processing rate in Storm per topology/topic. Alert if rate if ingest is higher than the rate of processing. Topology is running slower than ingest rate. Need to tune topology. Temporarily ZK node storing kafka spout offset (/usually storm/topology/<topology name>) needs to be cleaned and topology to be restarted. Consider increasing kafka retention period. Kafka ScenarioImpactMonitoring NeedsRecovery ActionsDisk failureBroker dies. All topics get replicated to another available brokers. Topics which have only one replication factor and have any partition residing on this disk, would suffer data loss. Broker going down would result into lowered throughput of the system Per host per disk utilization monitoring These metrics are already captured in ganglia. Alert if no disk stats received for more than few minutes Kafka recovers data by replication. So no immediate action needed. Recovery needs disk replacement and restart of Borker. Alternatively that specific brokers’ configuration can be modified to exclude bad disk and Broker can be restarted Disk full-- same as above ---- same as above -- Alert if disk utilization goes beyond 80% -- same as above -- Alerts should be set to monitor disk usage above 80% Broker dead-- same as above --Monitor broker process using either mechanisms below 1. nc -vz <host> <broker port> 2. Ganglia indicating no metrics captured from broker in last few minutes Monitors like ‘monit‘or ‘supervisord’ can be set to auto restart broker on failureHost down-- same as above ---- same as above -- Additionally ganglia host level metrics Need host reboot and broker restart. Host and process monitors should alert unreachable hosts and unresponsive processes. Zookeeper ScenarioImpactMonitoring NeedsRecovery ActionsZKeeper slownessKafka and/or Storm services may see timeouts. They will retry. ZK 4 letter command such as ‘stat’Need to identify the slowness reason (disk, n/w, workload etc) and then rectifyZKeeper quorum availabilityZK Service is unavailableZK 4 letter command such as ‘ruok’Need to start ZK services on zk hosts to meet quorum requirementsDisk full-- same as above --Disk level stats Alert if disk utilization goes beyond 80% ZK is HA enabled and failure of a ZK node does not impact availability of ZK until all available nodes meet the quorum. Setup log retention period for all ZK nodes to auto rotate and clean old logs. Maintenance Storm Upgrade Paths and Maintenance Outages Neither Kafka nor Storm currently allows rolling upgrades without service interruption. However, the instructions below can be followed to achieve minimal downtime during upgrade and maintenance windows. Default upgrade approach for major version changes: Shutdown (Kill) all storm topologies Shut down storm cluster Upgrade the storm software via Ambari or manually, or apply any patches needed Start storm cluster Re-submit storm topology Storm Upgrade for Minor Patches/Fixes: Apply new software on all Storm nodes Restart supervisors,workers, nimbus etc one by one in rolling fashion. Storm Upgrade with less downtime using Storm on YARN Stand Up a second storm cluster using Storm on YARN Start topologies on new cluster (in inactive mode) Deactivate all old topologies and then kill them Activate topologies on new cluster Shutdown old storm topology Shutdown old Storm cluster Kafka Upgrade Paths and Maintenance Outages Default upgrade approach: Shutdown external kafka producers. (shut down access to kafka cluster or block proxy producers if in place) Let Storm topology process all backlog data in kafka topics Shutdown storm topology Shut down kafka cluster Upgrade the kafka software or apply any patches needed Restart Kafka Brokers Restart storm topology Restart kafka producers Kafka Upgrade for Minor Patches/Fixes: Apply new software on all kafka nodes Restart brokers one by one in rolling fashion. References Storm Documentation (docs.hortonworks.com) Includes additional best practices for initial development, performance debugging, and parallelism: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/index.html#Item1.4 Web links Scaling Apache Storm - http://www.slideshare.net/ptgoetz/scaling-apache-storm-strata-hadoopworld-2014#45 Understanding the Parallelism of a Storm Topology - http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/#how-to-change-the-parallelism-of-a-running-topology
... View more
Labels:
12-07-2015
03:15 PM
2 Kudos
MapReduce allowed us to set a unique job name with mapred.job.name. This allowed us to easily track jobs after execution via searching for this name in JobTracker UI and logs. With Tez there doesn't appear to be a tez.job.name which can be set. Is there another similar config setting we can use to set a unique Tez job name/tag? Related Jira: https://issues.apache.org/jira/browse/HIVE-12357 Thanks
... View more
Labels:
- Labels:
-
Apache Tez
12-02-2015
09:15 PM
7 Kudos
Wanted to see how many people have clusters where the HDFS DataNodes are running on XFS vs Ext4 filesystems? I'm trying to get a sense for which filesystem is chosen most often in the wild. Feel free to comment if you have a preference for one vs the other. Thanks!
... View more
Labels:
- Labels:
-
Apache Hadoop
11-19-2015
08:57 PM
If we set a custom value for dfs.permissions.superusergroup do we need to add any other Hadoop services to that group in order for the cluster to function normally? For example, if we set "dfs.permissions.superusergroup=blueManGroup" would we need to add service accounts such as hive, hdfs, oozie, etc. to the "blueManGroup"?
Couldn't find the answer in the Hadoop documentation: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsPermissionsGuide.html#The_Super-User
... View more
Labels:
- Labels:
-
Apache Hadoop
11-19-2015
03:36 PM
1 Kudo
Can HDP support clusters which have a mix of Operating System releases? For example can it support the following scenario: Some nodes in the cluster running RHEL 6.4 Some nodes in the cluster running RHEL 7.0 (Assuming this example is running HDP 2.3) Or do we need to limit the cluster to using OS's of the same release. E.g. all nodes should run same major and minor OS release? I'm assuming we must use OS's of the same Product family, such as all RHEL, all CentOS, or all SUSE, etc. Thanks!
... View more
Labels:
- Labels:
-
Hortonworks Data Platform (HDP)
11-19-2015
03:25 PM
1 Kudo
Thanks Andrew. As an alternative, would it be safe to deploy to running instances of NCM on a single server? Perhaps binding them to separate ports? Or conflicts could arise?
... View more
11-18-2015
10:08 PM
1 Kudo
Is it possible to configure a NCM to manage multiple distinct sets of slave (nodes)? For example could one NCM coordinate the following scenario: Flow1 deployed to Nodes 1-3 by NodeControlManager 1 Flow2 deployed to Nodes 4-6 by NodeControlManager 2 Does NCM have the ability to selectively deploy flows to separate nodes, effectively creating separate logical sets of NiFi cluster workers (nodes)? In summary can we have ... "One NCM to rule them all and in the darkness bind them"
... View more
Labels:
- Labels:
-
Apache NiFi
-
Cloudera DataFlow (CDF)