Member since
10-02-2015
76
Posts
80
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1972 | 11-15-2016 03:28 PM | |
3393 | 11-15-2016 03:15 PM | |
2113 | 07-25-2016 08:03 PM | |
1741 | 05-11-2016 04:10 PM | |
3604 | 02-02-2016 08:09 PM |
02-03-2017
04:53 PM
1 Kudo
Word Count using Spark Streaming in Pyspark This is a WordCount example with the following Local File System as a source Calculate counts using reduceByKey and store them in a temp table Querying running counts through SQL
Setup: Define the function that sets up the StreamingContext This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations. But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
def creatingFunc():
ssc = StreamingContext(sc, batchIntervalSeconds)
# Set each DStreams in this context to remember RDDs it generated in the last given duration.
# DStreams remember RDDs only for a limited duration of time and releases them for garbage
# collection. This method allows the developer to specify how long to remember the RDDs (
# if the developer wishes to query old data outside the DStream computation).
ssc.remember(60)
lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
lines.pprint()
words = lines.flatMap(lambda line: line.split(","))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
def process(time, rdd):
df = sqlContext.createDataFrame(rdd)
df.registerTempTable("myCounts")
wordCounts.foreachRDD(process)
return ssc
Create a streaming context If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory
checkpointDir = "/LOCAL_DIR/DATA/"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
Start the streaming context
# This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
-------------------------------------------
Time: 2015-10-07 20:57:00
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
-------------------------------------------
Time: 2015-10-07 20:57:10
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
Interactive query
%sql select * from myCounts
world hello
Stop the streaming context.
ssc.stop(stopSparkContext=False)
Stop all streaming contexts running in this JVM from python This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.
if( not sc._jvm.StreamingContext.getActive().isEmpty() ):
sc._jvm.StreamingContext.getActive().get().stop(False)
... View more
Labels:
02-03-2017
04:44 PM
3 Kudos
If you’re using Apache Kafka, you know it persists all the messages on disk as a distributed commit log. You might sometimes want to take advantage of that and reprocess some of the messages. Assuming that you want to reprocess all the messages currently stored on your brokers and you set auto.offset.reset to smallest, you can just delete your consumers’ data from Zookeeper. After restarting, your consumers should start from the beginning. But what if you forgot or didn’t want to set auto.offset.reset in you consumers to smallest? Then you can manually set the offsets for each partition for your consumers to the smallest currently available offset. List all topics kafka-topics --list --zookeeper localhost:2181
Get Offsets for the topic kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1
Set the offset manually $ ZOOKEEPER_HOME/zkCli
$ ls /consumers/flasfka/offsets/vital_signs
$ set /consumers/flasfka/offsets/vital_signs/0 11492949
Verify the new offset kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1
... View more
Labels:
02-03-2017
04:26 PM
5 Kudos
The Apache Ambari project is aimed at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs. Ambari enables System Administrators to: Provision a Hadoop Cluster Ambari provides a step-by-step wizard for installing Hadoop services across any number of hosts. Ambari handles configuration of Hadoop services for the cluster. Manage a Hadoop Cluster Ambari provides central management for starting, stopping, and reconfiguring Hadoop services across the entire cluster. Monitor a Hadoop Cluster Ambari provides a dashboard for monitoring health and status of the Hadoop cluster. Ambari leverages Ambari Metrics System for metrics collection. Ambari leverages Ambari Alert Framework for system alerting and will notify you when your attention is needed (e.g., a node goes down, remaining disk space is low, etc). Ambari enables Application Developers and System Integrators to: Easily integrate Hadoop provisioning, management, and monitoring capabilities to their own applications with the Ambari REST APIs. $ export AMBARI_USER = <FILL IN>
$ export AMBARI_PASSWD = <FILL IN>
$ export AMBARI_HOST = <FILL IN>
$ export CLUSER_NAME = <FILL IN>
$ export SERVICE = <FILL IN>
$ export COMPONENT = <FILL IN>
$ export COMPONENT_HOST = <FILL IN> LIST SERVICES RELATED TO HIVE:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X GET "http://$AMBARI_HOST:8080/api/v1/clusters/PALEBLUEDOT/services/HIVE" STOP THE COMPONENT HIVE_SERVER_INTERACTIVE:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo":{"context":"Stop Service"},"Body":{"ServiceInfo":{"state":"INSTALLED"}}}' "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/services/$SERVICE/components/$COMPONENT" DELETE THE COMPONENT:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H "X-Requested-By: ambari" -X PUT -d '{"RequestInfo":{"context":"Stop All Components"},"Body":{"ServiceComponentInfo":{"state":"INSTALLED"}}}' "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/services/$SERVICE/components/$COMPONENT" DELETE A COMPONENT ON A HOST:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X DELETE "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/hosts/$COMPONENT_HOST/host_components/$COMPONENT" DELETE A HOST: curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X DELETE "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/hosts/$COMPONENT_HOST"
... View more
Labels:
02-02-2017
04:05 PM
1 Kudo
SparkR is an R package that provides a lightweight front end for using Apache Spark from R, thus supporting large-scale analytics on Hortonworks Data Platform (HDP) from the R language and environment. As of Spark 1.6.2, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, and aggregation on large datasets. In addition, SparkR supports distributed machine learning through MLlib. Architecture SparkR which was introduced to Spark in version1.4 consists of wrappers over DataFrames and DataFrame-based APIs. In SparkR, the APIs are similar to existing ones in R (or R packages), rather than Python/Java/Scala APIs. The reason is that SparkR is very popular is primarily because it allows users to write spark jobs while staying entirely in the R framework/model. A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. All of the examples on this page use sample data included in R or the Spark distribution and can be run using the ./bin/sparkR shell. R is very convenient for analytics and users love it. However, scalability is the main issue with R and SparkR is a means to address that. The key challenge that was addressed in implementing SparkR was having
support for invoking Spark functions on a JVM from R. Spark Driver runs the R Spark Context which passes functions to R-JVM bridge. This is responsible for launching Worker JVMs. Run SparkR example on HDP 1. Prerequisites Before you run SparkR, ensure that your cluster meets the following prerequisites:
R must be installed on all nodes. JAVA_HOME must be set on all nodes. 2. SparkR Example The following example launches SparkR and then uses R to create a people DataFrame, list part of the DataFrame, and read the DataFrame. (For more information about Spark DataFrames, see "Using the Spark DataFrame API").
Launch SparkR: $ su spark
$ cd /usr/hdp/2.5.0.0-3485/spark/bin
#$ ./sparkR Output similar to the following displays: Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.2
/_/
Spark context is available as sc, SQL context is available as sqlContext
>
From your R prompt (not the Spark shell), initialize SQLContext, create a DataFrame, and list the first few rows: sparkR> sqlContext <- sparkRSQL.init(sc)
sparkR> df <- createDataFrame(sqlContext, faithful)
sparkR> head(df) Output similar to the following displays: ...
eruptions waiting
1 3.600 79
2 1.800 54
3 3.333 74
4 2.283 62
5 4.533 85
6 2.883 55
Read the people DataFrame: sparkR> people <- read.df(sqlContext, "people.json", "json")
sparkR> Output similar to the following displays: age name
1 NA Michael
2 30 Andy
3 19 Justin head(people)
... View more
02-02-2017
03:38 PM
1 Kudo
kafka-python is best used with newer brokers (0.10 or 0.9), but is backwards-compatible with older versions (to 0.8.0). Some features will only be enabled on newer brokers, however; for example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka brokers. Supporting this feature for earlier broker releases would require writing and maintaining custom leadership election and membership / health check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. See <http://kafka-python.readthedocs.org/en/master/compatibility.html> for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. >>> pip install kafka-python
KafkaConsumer KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. See <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html> for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: >>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaProducer.html> for more details. >>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')
>>> # Block until all pending messages are sent
>>> producer.flush()
>>> # Block until a single message is sent (or timeout)
>>> producer.send('foobar', b'another_message').get(timeout=60)
>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
Compression kafka-python supports gzip compression/decompression natively. To produce or consume lz4 compressed messages, you must install lz4tools and xxhash (modules may not work on python2.6). To enable snappy compression/decompression install python-snappy (also requires snappy library). See <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install> for more information. Protocol A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. The protocol support is leveraged to enable a KafkaClient.check_version() method that probes a kafka broker and attempts to identify which version it is running (0.8.0 to 0.10).
... View more
Labels:
01-08-2017
01:16 AM
1 Kudo
If you are on a Mac and run into the following error when running start-all.sh: % sh start-all.sh
starting org.apache.spark.deploy.master.Master, logging to ...
localhost: ssh: connect to host localhost port 22: Connection refused
You need to enable "Remote Login" for your machine. From System Preferences, select Sharing, and then turn on Remote Login.
... View more
Labels:
12-28-2016
06:32 PM
1 Kudo
This first in a series of article lists 3 easy ways in which you can optimize your Spark code. This can be summed up as follows:
Use ReduceByKey over GroupByKey Be vary of Actions Gracefully Deal with Bad Quality Data Use ReduceByKey over GroupByKey Let's look at two different ways to compute word counts, one using reduceByKey and the other using groupByKey : val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()
val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data. With reduceByKey, the pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey ) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result. On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network. To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted. You can imagine that for a much larger dataset size, the difference in the amount of data you are shuffling becomes more exaggerated and different between reduceByKey and groupByKey . Here are more functions to prefer over groupByKey : combineByKey can be used when you are combining elements but your return type differs from your input value type. foldByKey merges the values for each key using an associative function and a neutral "zero value". Be Vary of Actions If your RDD is so large that all of it's elements won't fit in memory on the drive machine, don't do this: val values = myVeryLargeRDD.collect()
Collect will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash. Instead, you can make sure the number of elements you return is capped by calling take or takeSample , or perhaps filtering or sampling your RDD. Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory: countByKey countByValue collectAsMap If you really do need every one of these values of the RDD and the data is too big to fit into memory, you can write out the RDD to files or export the RDD to a database that is large enough to hold all the data. Gracefully Deal with Bad Quality Data When dealing with vast amounts of data, a common problem is that a small amount of the data is malformed or corrupt. Using a filter transformation, you can easily discard bad inputs, or use a map transformation if it's possible to fix the bad input. Or perhaps the best option is to use a flatMap function where you can try fixing the input but fall back to discarding the input if you can't. Let's consider the json strings below as input: input_rdd = sc.parallelize(["{\"value\": 1}", # Good
"bad_json", # Bad
"{\"value\": 2}", # Good
"{\"value\": 3" # Missing an ending brace.
])
If we tried to input this set of json strings to a sqlContext, it would clearly fail due to the malformed input's. sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.
Instead, let's try fixing the input with this python function: def try_correct_json(json_string):
try:
# First check if the json is okay.
json.loads(json_string)
return [json_string]
except ValueError:
try:
# If not, try correcting it by adding a ending brace.
try_to_correct_json = json_string + "}"
json.loads(try_to_correct_json)
return [try_to_correct_json]
except ValueError:
# The malformed json input can't be recovered, drop this input.
return []
Now, we can apply that function to fix our input and try again. This time we will succeed to read in three inputs: corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect() # Returns [Row(value=1), Row(value=2), Row(value=3)]
... View more
Labels:
12-27-2016
05:06 PM
9 Kudos
The distributed nature of HBase, coupled with the concepts of an ordered write log and a log-structured merge tree, makes HBase a great database for large scale data processing. Over the years, HBase has proven itself to be a reliable storage mechanism when you need random, realtime read/write access to your Big Data.
HBase is able to deliver because of a prioritization mechanism for memory utilization and caching structures over disk I/O. Memory store, implemented as the MemStore , accumulates data edits as they’re received, buffering them in memory (1). The block cache, an implementation of the BlockCache interface, keeps data blocks resident in memory after they’re read. OVERVIEW OF HBASE I/O
Memstore
The MemStore is important for accessing recent edits. Without the MemStore , accessing that data as it was written into the write log would require reading and deserializing entries back out of that file, at least a O(n) operation. Instead, MemStore maintains a skiplist structure, which enjoys a O(log n) access cost and requires no disk I/O. The MemStore contains just a tiny piece of the data stored in HBase, however. Caching Options
HBase provides two different BlockCache implementations: the default on-heap LruBlockCache and the BucketCache , which is (usually) off-heap. This section discusses benefits and drawbacks of each implementation, how to choose the appropriate option, and configuration options for each.
The following table describes several concepts related to HBase file operations and memory (RAM) caching.
Component
Description
HFile
An HFile contains table data, indexes
over that data, and metadata about the data.
Block
An HBase block is the smallest unit of data that can be
read from an HFile. Each HFile consists of a series of blocks. (Note: an
HBase block is different than an HDFS block or other underlying file system
blocks.)
BlockCache
BlockCache is the main HBase mechanism
for low-latency random read operations. BlockCache is one of two memory cache
structures maintained by HBase. When a block is read from HDFS, it is cached
in BlockCache. Frequent access to rows in a block cause the block to be kept
in cache, improving read performance.
MemStore
MemStore ("memory store") is the second of two
cache structures maintained by HBase. MemStore improves write performance. It
accumulates data until it is full, and then writes ("flushes") the
data to a new HFile on disk. MemStore serves two purposes: it increases the
total amount of data written to disk in a single operation, and it retains
recently-written data in memory for subsequent low-latency reads.
Write
Ahead Log (WAL)
The WAL is a log file that records
all changes to data until the data is successfully written to disk (MemStore
is flushed). This protects against data loss in the event of a failure before
MemStore contents are written to disk.
BlockCache and MemStore reside in random-access memory (RAM); HFiles and the Write Ahead Log are persisted to HDFS.
The read and write operations for any OOTB HBase implementation can be described as follows:
During write operations, HBase writes to WAL and MemStore. Data is flushed from MemStore to disk according to size limits and flush interval. During read operations, HBase reads the block from BlockCache or MemStore if it is available in those caches. Otherwise it reads from disk and stores a copy in BlockCache Caching Structure
By default, BlockCache resides in an area of RAM that is managed by the Java Virtual Machine ("JVM") Garbage Collector; this area of memory is known as “on-heap" memory or the "Java heap." The BlockCache implementation that manages on-heap cache is called LruBlockCache.
As you are going to see in the example below, if you have stringent read latency requirements and you have more than 20 GB of RAM available on your servers for use by HBase RegionServers, consider configuring BlockCache to use both on-heap and off-heap memory, as shown below. The associated BlockCache implementation is called BucketCache. Read latencies for BucketCache tend to be less erratic than LruBlockCache for large cache loads, because BucketCache (not JVM Garbage Collection) manages block cache allocation.
Figure below explains HBase's caching structure:
How to choose?
There are two reasons to consider enabling one of the alternative BlockCache implementations. The first is simply the amount of RAM you can dedicate to the region server. Community wisdom recognizes the upper limit of the JVM heap, as far as the region server is concerned, to be somewhere between 14GB and 31GB. The precise limit usually depends on a combination of hardware profile, cluster configuration, the shape of data tables, and application access patterns.
The other time to consider an alternative cache is when response latency really matters. Keeping the heap down around 8-12GB allows the CMS collector to run very smoothly, which has measurable impact on the 99th percentile of response times. Given this restriction, the only choices are to explore an alternative garbage collector or take one of these off-heap implementations for a spin.
Let's take a real world example and go from there. REAL WORLD SCENARIO
In the example that I use below, I had 3 HBase region server nodes, each with ~24TB of SSD based storage on each node and 512 GB RAM on each machine. I allocated: 256 GB for the RegionServer process (there is additional memory available for other HDP processes) Workload consists of 50% reads, 50% writes. HBase Heap Size= 20 GB (20480 MB)
1. The best way to monitor HBase is through the UI that comes with every HBase installation. It is known as the HBase Master UI. The link to the HBase Master UI by default is: http://<HBASE_MASTER_HOST:16010/master-status.
2. Click on the individual region server to view the stats for that server. See below:
3. Click on the stats and you will have all that you need to see about Block cache. BlockCache Issues In this particular example, we experienced a number of GC pauses and RegionTooBusyException had started flooding my logs. So followed the steps above to see if there was an issue with the way my HBase's block cache was behaving. The LruBlockCache is an LRU cache that contains three levels of block priority to allow for scan-resistance and in-memory ColumnFamilies: Single access priority: The first time a block is loaded from HDFS it normally has this priority and it will be part of the first group to be considered during evictions. The advantage is that scanned blocks are more likely to get evicted than blocks that are getting more usage. Multi access priority: If a block in the previous priority group is accessed again, it upgrades to this priority. It is thus part of the second group considered during evictions. In-memory access priority: If the block’s family was configured to be "in-memory", it will be part of this priority disregarding the number of times it was accessed. Catalog tables are configured like this. This group is the last one considered during evictions. As you can see and what I noticed as well, there were a large number of blocks that were evicted. Eviction is generally ok simply because, you would need to free up your cache as you continue to write data to your region server. However, in my case there was plenty of room in the cache and technically HBase should have continued to cache the data rather than evicting any blocks. The second problematic behavior was that there were very large number of cache misses (~70 million). Cache miss is a state where the data requested for processing by a component or application is not found in the cache memory. It causes execution delays by requiring the program or application to fetch the data from other cache levels or the main memory. So clearly, we had to solve for the following issues in order to get better performance from our HBase cluster: Number of evicted blocks Number of eviction occurrences Number of block requests that were cache misses and set to use the block cache Maximize the hit ratio
Issue Resolution In order to resolve the issues mentioned above, we need to configure and enable BucketCache. To do this, note the memory specifications and values, plus additional values shown in the following table:
Item
Description
Value or Formula
Example
A
Total physical memory
for RegionServer operations: on-heap plus off-heap ("direct")
memory (MB)
(hardware dependent)
262144
B
HBASE_HEAPSIZE (-Xmx) Maximum size of JVM heap (MB)
Recommendation: 20480
20480
C
-XX:MaxDirectMemorySize Amount of off-heap ("direct") memory to
allocate to HBase (MB)
A - B
262144 - 20480 = 241664
Dp
hfile.block.cache.size Proportion of maximum JVM heap size (HBASE_HEAPSIZE, -Xmx) to allocate to BlockCache. The sum of this value plus hbase.regionserver.global.memstore.size must not exceed 0.8.
(proportion of reads) * 0.8
0.50 * 0.8 = 0.4
Dm
Maximum amount of JVM
heap to allocate to BlockCache (MB)
B * Dp
20480 * 0.4 = 8192
Ep
hbase.regionserver.
global.memstore.size Proportion of
maximum JVM heap size (HBASE_HEAPSIZE, -Xmx) to allocate to MemStore. The sum of this
value plus hfile.block.cache.size must be less than or equal to 0.8.
0.8 - Dp
0.8 - 0.4 = 0.4
F
Amount of off-heap
memory to reserve for other uses (DFSClient; MB)
Recommendation: 1024 to
2048
2048
G
Amount of off-heap memory to allocate to
BucketCache (MB)
C - F
241664 - 2048 = 239616
hbase.bucketcache.size Total amount of memory to allocate to BucketCache, on-
and off-heap (MB)
Dm + G
264192
Enable BucketCache After completing the steps in Configuring BlockCache, follow these steps to configure BucketCache. In the hbase-env.sh file for each RegionServer, or in the hbase-env.sh file supplied to Ambari, set the -XX:MaxDirectMemorySize argument for HBASE_REGIONSERVER_OPTS to the amount of direct memory you wish to allocate to HBase. In the configuration for the example discussed above, the value would be 241664m . ( -XX:MaxDirectMemorySize accepts a number followed by a unit indicator; m indicates megabytes.)
HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=241664m" In the hbase-site.xml file, specify BucketCache size and percentage. In the example discussed above, the values would be 264192 and 0.90697674 , respectively. If you choose to round the proportion, round up. This will allocate space related to rounding error to the (larger) off-heap memory area. <property>
<name>hbase.bucketcache.size</name>
<value>241664</value>
</property>
In the hbase-site.xml file, set hbase.bucketcache.ioengine to offheap . This enables BucketCache. <property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
Restart (or rolling restart) the cluster. It can take a minute or more to allocate BucketCache, depending on how much memory you are allocating. Check logs for error messages. Results
And Voila!
100% reduction in the number of evicted blocks 99% reduction in the number of evictions A reduction of 90% in the number of cache misses Hit ratio went up 10x
... View more
Labels:
11-29-2016
06:12 PM
@Alessandro Lulli Your cluster has allocated the resources that you have asked for the Spark. It looks like you have a backlog of jobs that are already running. May be there is a job that is stuck in the queue. Kill all the previous applications and see if the job runs. You can also kill all the yarn applications and resubmit the jobs. $ yarn application -list
$ yarn application -kill $application_id
... View more
11-22-2016
03:15 PM
1 Kudo
@Gobi Subramani You are looking at it wrong. Spark Context is the main entry point into Spark and is the connection to a Spark cluster, and can be used to create RDDs, accumulators etc. on that cluster. You can run both in cluster as well as local mode and if you would have to define which one, you'd define that in the Spark context. The workers don't get the Spark context per say, but if you were to package your program into a jar, the cluster manager would be responsible for copying the jar file to the workers, before it allocates tasks.
... View more