Member since 
    
	
		
		
		10-02-2015
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                76
            
            
                Posts
            
        
                80
            
            
                Kudos Received
            
        
                8
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 2505 | 11-15-2016 03:28 PM | |
| 3804 | 11-15-2016 03:15 PM | |
| 2903 | 07-25-2016 08:03 PM | |
| 2285 | 05-11-2016 04:10 PM | |
| 4659 | 02-02-2016 08:09 PM | 
			
    
	
		
		
		02-03-2017
	
		
		04:53 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							           Word Count using Spark Streaming in Pyspark  This is a WordCount example with the following   Local File System as a source  Calculate counts using reduceByKey and store them in a temp table  Querying running counts through SQL  
 
         
             Setup: Define the function that sets up the StreamingContext  This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations.  But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place. 
 
         
 
  from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10 
def creatingFunc():
  ssc = StreamingContext(sc, batchIntervalSeconds)
  # Set each DStreams in this context to remember RDDs it generated in the last given duration.
  # DStreams remember RDDs only for a limited duration of time and releases them for garbage
  # collection. This method allows the developer to specify how long to remember the RDDs (
  # if the developer wishes to query old data outside the DStream computation).
  ssc.remember(60)
  
  lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
  lines.pprint()
  words = lines.flatMap(lambda line: line.split(","))
  pairs = words.map(lambda word: (word, 1))
  wordCounts = pairs.reduceByKey(lambda x, y: x + y)
  def process(time, rdd):
    df = sqlContext.createDataFrame(rdd)
    df.registerTempTable("myCounts")
  
  wordCounts.foreachRDD(process)
  
  return ssc
   
           Create a streaming context  If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory 
 
         
 
  checkpointDir = "/LOCAL_DIR/DATA/"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
   
             Start the streaming context 
 
         
    # This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
     -------------------------------------------
Time: 2015-10-07 20:57:00
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
-------------------------------------------
Time: 2015-10-07 20:57:10
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
 
    
  
           Interactive query 
 
         
    %sql select * from myCounts
                world    hello                
  
           Stop the streaming context. 
 
         
    ssc.stop(stopSparkContext=False)
    
  
           Stop all streaming contexts running in this JVM from python  This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it. 
 
         
  
 
  if( not sc._jvm.StreamingContext.getActive().isEmpty() ): 
	sc._jvm.StreamingContext.getActive().get().stop(False)
   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		02-03-2017
	
		
		04:44 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		3 Kudos
		
	
				
		
	
		
					
							 If you’re using Apache Kafka, you know it persists all the messages on disk as a distributed commit log. You might sometimes want to take advantage of that and reprocess some of the messages. Assuming that you want to reprocess all the messages currently stored on your brokers and you set auto.offset.reset to smallest, you can just delete your consumers’ data from Zookeeper. After restarting, your consumers should start from the beginning. But what if you forgot or didn’t want to set auto.offset.reset in you consumers to smallest? Then you can manually set the offsets for each partition for your consumers to the smallest currently available offset.    List all topics  kafka-topics --list --zookeeper localhost:2181      
  Get Offsets for the topic  kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1      
  Set the offset manually  $ ZOOKEEPER_HOME/zkCli
$ ls /consumers/flasfka/offsets/vital_signs
$ set /consumers/flasfka/offsets/vital_signs/0 11492949      
  Verify the new offset  kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1     
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		02-03-2017
	
		
		04:26 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		5 Kudos
		
	
				
		
	
		
					
							 The Apache Ambari project is aimed at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs.  Ambari enables System Administrators to:   Provision a Hadoop Cluster  Ambari provides a step-by-step wizard for installing Hadoop services across any number of hosts.  Ambari handles configuration of Hadoop services for the cluster.      Manage a Hadoop Cluster  Ambari provides central management for starting, stopping, and reconfiguring Hadoop services across the entire cluster.      Monitor a Hadoop Cluster  Ambari provides a dashboard for monitoring health and status of the Hadoop cluster.  Ambari leverages Ambari Metrics System for metrics collection.  Ambari leverages Ambari Alert Framework for system alerting and will notify you when your attention is needed (e.g., a node goes down, remaining disk space is low, etc).     Ambari enables Application Developers and System Integrators to:   Easily integrate Hadoop provisioning, management, and monitoring capabilities to their own applications with the Ambari REST APIs.   $ export AMBARI_USER = <FILL IN>
$ export AMBARI_PASSWD = <FILL IN>
$ export AMBARI_HOST = <FILL IN>
$ export CLUSER_NAME = <FILL IN>
$ export SERVICE = <FILL IN>
$ export COMPONENT = <FILL IN>
$ export COMPONENT_HOST = <FILL IN>    LIST SERVICES RELATED TO HIVE:
  curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X GET "http://$AMBARI_HOST:8080/api/v1/clusters/PALEBLUEDOT/services/HIVE"  STOP THE COMPONENT HIVE_SERVER_INTERACTIVE:
  curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo":{"context":"Stop Service"},"Body":{"ServiceInfo":{"state":"INSTALLED"}}}' "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/services/$SERVICE/components/$COMPONENT"  DELETE THE COMPONENT:
  curl -u $AMBARI_USER:$AMBARI_PASSWD -H "X-Requested-By: ambari" -X PUT -d '{"RequestInfo":{"context":"Stop All Components"},"Body":{"ServiceComponentInfo":{"state":"INSTALLED"}}}' "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/services/$SERVICE/components/$COMPONENT"  DELETE A COMPONENT ON A HOST:
  curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X DELETE "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/hosts/$COMPONENT_HOST/host_components/$COMPONENT"  DELETE A HOST:   curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X DELETE "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/hosts/$COMPONENT_HOST" 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		02-02-2017
	
		
		04:05 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 SparkR is an R package that provides a lightweight front end for using Apache Spark from R, thus supporting large-scale analytics on Hortonworks Data Platform (HDP) from the R language and environment. As of Spark 1.6.2, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, and aggregation on large datasets. In addition, SparkR supports distributed machine learning through MLlib.  Architecture  SparkR which was introduced to Spark in version1.4 consists of wrappers over DataFrames and DataFrame-based APIs. In SparkR, the APIs are similar to existing ones in R (or R packages), rather than Python/Java/Scala APIs. The reason is that SparkR is very popular is primarily because it allows users to write spark jobs while staying entirely in the R framework/model.   A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames.  All of the examples on this page use sample data included in R or the Spark distribution and can be run using the  ./bin/sparkR  shell.  R is very convenient for analytics and users love it. However, scalability is the main issue with R and SparkR is a means to address that.   The key challenge that was addressed in implementing SparkR was having
support for invoking Spark functions on a JVM from R. Spark Driver runs the R Spark Context which passes functions to R-JVM bridge. This is responsible for launching Worker JVMs.       Run SparkR example on HDP  1. Prerequisites  Before you run SparkR, ensure that your cluster meets the following prerequisites:  
 R must be installed on all nodes.   JAVA_HOME  must be set on all nodes.   2. SparkR Example  The following example launches SparkR and then uses R to create a  people  DataFrame, list part of the DataFrame, and read the DataFrame. (For more information about Spark DataFrames, see "Using the Spark DataFrame API").  
 Launch SparkR: $ su spark
$ cd /usr/hdp/2.5.0.0-3485/spark/bin
#$ ./sparkR  Output similar to the following displays:  Welcome to
    ____              __ 
   / __/__  ___ _____/ /__ 
  _\ \/ _ \/ _ `/ __/  '_/ 
 /___/ .__/\_,_/_/ /_/\_\   version  1.6.2
    /_/ 
Spark context is available as sc, SQL context is available as sqlContext
> 
  From your R prompt (not the Spark shell), initialize SQLContext, create a DataFrame, and list the first few rows: sparkR> sqlContext <- sparkRSQL.init(sc)
sparkR> df <- createDataFrame(sqlContext, faithful)
sparkR> head(df)  Output similar to the following displays:  ...
 eruptions waiting
1     3.600      79
2     1.800      54
3     3.333      74
4     2.283      62
5     4.533      85
6     2.883      55 
  Read the  people  DataFrame: sparkR> people <- read.df(sqlContext, "people.json", "json")
sparkR>   Output similar to the following displays:   age    name
1  NA Michael
2  30    Andy
3  19  Justin    head(people)       
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-02-2017
	
		
		03:38 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 kafka-python is best used with newer brokers (0.10 or 0.9), but is backwards-compatible with older versions (to 0.8.0). Some features will only be enabled on newer brokers, however; for example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka brokers. Supporting this feature for earlier broker releases would require writing and maintaining custom leadership election and membership / health check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. See <http://kafka-python.readthedocs.org/en/master/compatibility.html> for more details.  Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help.  >>> pip install kafka-python
  KafkaConsumer  KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.  See <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html> for API and configuration details.  The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value:  >>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
...     print (msg)
  >>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
  >>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)
  KafkaProducer  KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaProducer.html> for more details.  >>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
...     producer.send('foobar', b'some_message_bytes')
  >>> # Block until all pending messages are sent
>>> producer.flush()
  >>> # Block until a single message is sent (or timeout)
>>> producer.send('foobar', b'another_message').get(timeout=60)
  >>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
  >>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
  >>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
  >>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
...     producer.send('foobar', b'msg %d' % i)
  Compression  kafka-python supports gzip compression/decompression natively. To produce or consume lz4 compressed messages, you must install lz4tools and xxhash (modules may not work on python2.6). To enable snappy compression/decompression install python-snappy (also requires snappy library). See <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install> for more information.  Protocol  A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. The protocol support is leveraged to enable a KafkaClient.check_version() method that probes a kafka broker and attempts to identify which version it is running (0.8.0 to 0.10). 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		01-08-2017
	
		
		01:16 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							   If you are on a Mac and run into the following error when running start-all.sh:  % sh start-all.sh
starting org.apache.spark.deploy.master.Master, logging to ...
localhost: ssh: connect to host localhost port 22: Connection refused
  You need to enable "Remote Login" for your machine. From System Preferences, select Sharing, and then turn on Remote Login. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		12-28-2016
	
		
		06:32 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 This first in a series of article lists 3 easy ways in which you can optimize your Spark code. This can be summed up as follows:  
 Use ReduceByKey over GroupByKey  Be vary of Actions  Gracefully Deal with Bad Quality Data   Use ReduceByKey over GroupByKey  Let's look at two different ways to compute word counts, one using  reduceByKey  and the other using  groupByKey :  val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()
val wordCountsWithGroup = wordPairsRDD
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()
  While both of these functions will produce the correct answer, the  reduceByKey  example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.  With  reduceByKey, the  pairs on the same machine with the same key are combined (by using the lamdba function passed into  reduceByKey ) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result.  On the other hand, when calling  groupByKey  - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network.  To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted.  You can imagine that for a much larger dataset size, the difference in the amount of data you are shuffling becomes more exaggerated and different between  reduceByKey  and  groupByKey .  Here are more functions to prefer over  groupByKey :    combineByKey  can be used when you are combining elements but your return type differs from your input value type.   foldByKey  merges the values for each key using an associative function and a neutral "zero value".   Be Vary of Actions   If your RDD is so large that all of it's elements won't fit in memory on the drive machine, don't do this:  val values = myVeryLargeRDD.collect()
  Collect will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash.  Instead, you can make sure the number of elements you return is capped by calling  take  or  takeSample , or perhaps filtering or sampling your RDD.  Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory:    countByKey    countByValue    collectAsMap    If you really do need every one of these values of the RDD and the data is too big to fit into memory, you can write out the RDD to files or export the RDD to a database that is large enough to hold all the data.  Gracefully Deal with Bad Quality Data  When dealing with vast amounts of data, a common problem is that a small amount of the data is malformed or corrupt. Using a  filter  transformation, you can easily discard bad inputs, or use a  map transformation if it's possible to fix the bad input. Or perhaps the best option is to use a  flatMap function where you can try fixing the input but fall back to discarding the input if you can't.  Let's consider the json strings below as input:  input_rdd = sc.parallelize(["{\"value\": 1}",  # Good
                            "bad_json",  # Bad
                            "{\"value\": 2}",  # Good
                            "{\"value\": 3"  # Missing an ending brace.
                            ])
  If we tried to input this set of json strings to a sqlContext, it would clearly fail due to the malformed input's.  sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.
  Instead, let's try fixing the input with this python function:  def try_correct_json(json_string):
  try:
    # First check if the json is okay.
    json.loads(json_string)
    return [json_string]
  except ValueError:
    try:
      # If not, try correcting it by adding a ending brace.
      try_to_correct_json = json_string + "}"
      json.loads(try_to_correct_json)
      return [try_to_correct_json]
    except ValueError:
      # The malformed json input can't be recovered, drop this input.
      return []
  Now, we can apply that function to fix our input and try again. This time we will succeed to read in three inputs:  corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect()   # Returns [Row(value=1), Row(value=2), Row(value=3)] 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		12-27-2016
	
		
		05:06 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		9 Kudos
		
	
				
		
	
		
					
							 
 The distributed nature of HBase, coupled with the  concepts of an ordered write log and a log-structured merge tree, makes HBase a great database for large scale data processing. Over the years, HBase has proven itself to be a reliable storage mechanism when you need random, realtime read/write access to your Big Data.  
 HBase is able to deliver because of a prioritization mechanism for memory utilization and caching structures over disk I/O. Memory store, implemented as the  MemStore , accumulates data edits as they’re received, buffering them in memory (1). The block cache, an implementation of the  BlockCache  interface, keeps data blocks resident in memory after they’re read.  OVERVIEW OF HBASE I/O  
 Memstore  
 The  MemStore  is important for accessing recent edits. Without the  MemStore , accessing that data as it was written into the write log would require reading and deserializing entries back out of that file, at least a  O(n) operation. Instead,  MemStore  maintains a skiplist structure, which enjoys a  O(log n)  access cost and requires no disk I/O. The  MemStore  contains just a tiny piece of the data stored in HBase, however.  Caching Options  
 HBase provides two different BlockCache implementations: the default on-heap  LruBlockCache  and the  BucketCache , which is (usually) off-heap. This section discusses benefits and drawbacks of each implementation, how to choose the appropriate option, and configuration options for each.  
 The following table describes several concepts related to HBase file operations and memory (RAM) caching. 
 
 
 
  
  Component
  
  
  Description
  
 
 
  
  HFile
  
  
  An HFile contains table data, indexes
  over that data, and metadata about the data.
  
 
 
  
  Block
  
  
  An HBase block is the smallest unit of data that can be
  read from an HFile. Each HFile consists of a series of blocks. (Note: an
  HBase block is different than an HDFS block or other underlying file system
  blocks.)
  
 
 
  
  BlockCache
  
  
  BlockCache is the main HBase mechanism
  for low-latency random read operations. BlockCache is one of two memory cache
  structures maintained by HBase. When a block is read from HDFS, it is cached
  in BlockCache. Frequent access to rows in a block cause the block to be kept
  in cache, improving read performance.
  
 
 
  
  MemStore
  
  
  MemStore ("memory store") is the second of two
  cache structures maintained by HBase. MemStore improves write performance. It
  accumulates data until it is full, and then writes ("flushes") the
  data to a new HFile on disk. MemStore serves two purposes: it increases the
  total amount of data written to disk in a single operation, and it retains
  recently-written data in memory for subsequent low-latency reads.
  
 
 
  
  Write
  Ahead Log (WAL)
  
  
  
  The WAL is a log file that records
  all changes to data until the data is successfully written to disk (MemStore
  is flushed). This protects against data loss in the event of a failure before
  MemStore contents are written to disk.
  
 
 
  
 BlockCache and MemStore reside in random-access memory (RAM); HFiles and the Write Ahead Log are persisted to HDFS.  
 The read and write operations for any OOTB HBase implementation can be described as follows:  
  During write operations, HBase writes to WAL and MemStore. Data is flushed from MemStore to disk according to size limits and flush interval.   During read operations, HBase reads the block from BlockCache or MemStore if it is available in those caches. Otherwise it reads from disk and stores a copy in BlockCache   Caching Structure  
 By default, BlockCache resides in an area of RAM that is managed by the Java Virtual Machine ("JVM") Garbage Collector; this area of memory is known as “on-heap" memory or the "Java heap." The BlockCache implementation that manages on-heap cache is called LruBlockCache.  
 As you are going to see in the example below, if you have stringent read latency requirements and you have more than 20 GB of RAM available on your servers for use by HBase RegionServers, consider configuring BlockCache to use both on-heap and off-heap memory, as shown below. The associated BlockCache implementation is called BucketCache. Read latencies for BucketCache tend to be less erratic than LruBlockCache for large cache loads, because BucketCache (not JVM Garbage Collection) manages block cache allocation.  
 Figure below explains HBase's caching structure:  
         How to choose?   
 There are two reasons to consider enabling one of the alternative  BlockCache  implementations. The first is simply the amount of RAM you can dedicate to the region server. Community wisdom recognizes the upper limit of the JVM heap, as far as the region server is concerned, to be somewhere between 14GB and 31GB. The precise limit usually depends on a combination of hardware profile, cluster configuration, the shape of data tables, and application access patterns.  
 The other time to consider an alternative cache is when response latency really matters. Keeping the heap down around 8-12GB allows the CMS collector to run very smoothly, which has measurable impact on the 99th percentile of response times. Given this restriction, the only choices are to explore an alternative garbage collector or take one of these off-heap implementations for a spin.  
 
 Let's take a real world example and go from there.   REAL WORLD SCENARIO  
 In the example that I use below, I had 3 HBase region server nodes, each with ~24TB of SSD based storage on each node and 512 GB RAM on each machine. I allocated:   256 GB for the RegionServer process (there is additional memory available for other HDP processes)  Workload consists of 50% reads, 50% writes.  HBase Heap Size= 20 GB (20480 MB)   
 1. The best way to monitor HBase is through the UI that comes with every HBase installation. It is known as the HBase Master UI. The link to the HBase Master UI by default is: http://<HBASE_MASTER_HOST:16010/master-status.   
     2. Click on the individual region server to view the stats for that server. See below:  
     3. Click on the stats and you will have all that you need to see about Block cache.  BlockCache Issues  In this particular example, we experienced a number of GC pauses and  RegionTooBusyException  had started flooding my logs. So followed the steps above to see if there was an issue with the way my HBase's block cache was behaving.  The LruBlockCache is an LRU cache that contains three levels of block priority to allow for scan-resistance and in-memory ColumnFamilies:   Single access priority: The first time a block is loaded from HDFS it normally has this priority and it will be part of the first group to be considered during evictions. The advantage is that scanned blocks are more likely to get evicted than blocks that are getting more usage.  Multi access priority: If a block in the previous priority group is accessed again, it upgrades to this priority. It is thus part of the second group considered during evictions.  In-memory access priority: If the block’s family was configured to be "in-memory", it will be part of this priority disregarding the number of times it was accessed. Catalog tables are configured like this. This group is the last one considered during evictions.   As you can see and what I noticed as well, there were a large number of blocks that were evicted. Eviction is generally ok simply because, you would need to free up your cache as you continue to write data to your region server. However, in my case there was plenty of room in the cache and technically HBase should have continued to cache the data rather than evicting any blocks.  The second problematic behavior was that there were very large number of cache misses (~70 million). Cache miss is a state where the data requested for processing by a component or application is not found in the cache memory. It causes execution delays by requiring the program or application to fetch the data from other cache levels or the main memory.  So clearly, we had to solve for the following issues in order to get better performance from our HBase cluster:   Number of evicted blocks  Number of eviction occurrences  Number of block requests that were cache misses and set to use the block cache  Maximize the hit ratio   
     Issue Resolution  In order to resolve the issues mentioned above, we need to configure and enable BucketCache. To do this, note the memory specifications and values, plus additional values shown in the following table:  
   
   Item 
   Description 
   Value or Formula 
   Example 
  
  
   A 
   Total physical memory
  for RegionServer operations: on-heap plus off-heap ("direct")
  memory (MB) 
   (hardware dependent) 
   262144 
  
  
   B 
   HBASE_HEAPSIZE (-Xmx) Maximum size of JVM heap (MB) 
   Recommendation: 20480 
   20480 
  
  
   C 
   -XX:MaxDirectMemorySize Amount of off-heap ("direct") memory to
  allocate to HBase (MB) 
   A - B 
   262144 - 20480 = 241664 
  
  
   Dp 
   hfile.block.cache.size Proportion of maximum JVM heap size (HBASE_HEAPSIZE, -Xmx) to allocate to BlockCache. The sum of this value plus hbase.regionserver.global.memstore.size must not exceed 0.8. 
   (proportion of reads) * 0.8 
   0.50 * 0.8 = 0.4 
  
  
   Dm 
   Maximum amount of JVM
  heap to allocate to BlockCache (MB) 
   B * Dp 
   20480 * 0.4 = 8192 
  
  
   Ep 
   hbase.regionserver.
  global.memstore.size Proportion of
  maximum JVM heap size (HBASE_HEAPSIZE, -Xmx) to allocate to MemStore. The sum of this
  value plus hfile.block.cache.size must be less than or equal to 0.8. 
   0.8 - Dp 
   0.8 - 0.4 = 0.4 
  
  
   F 
   Amount of off-heap
  memory to reserve for other uses (DFSClient; MB) 
   Recommendation: 1024 to
  2048 
   2048 
  
  
   G 
   Amount of off-heap memory to allocate to
  BucketCache (MB) 
   C - F 
   241664 - 2048 = 239616 
  
  
    
   hbase.bucketcache.size Total amount of memory to allocate to BucketCache, on-
  and off-heap (MB) 
   Dm + G 
   264192 
     Enable BucketCache  After completing the steps in Configuring BlockCache, follow these steps to configure BucketCache.   In the  hbase-env.sh  file for each RegionServer, or in the  hbase-env.sh  file supplied to Ambari, set the  -XX:MaxDirectMemorySize  argument for HBASE_REGIONSERVER_OPTS  to the amount of direct memory you wish to allocate to HBase. In the configuration for the example discussed above, the value would be  241664m . ( -XX:MaxDirectMemorySize  accepts a number followed by a unit indicator;  m  indicates megabytes.)
  HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=241664m"    In the  hbase-site.xml  file, specify BucketCache size and percentage. In the example discussed above, the values would be  264192  and  0.90697674 , respectively. If you choose to round the proportion, round up. This will allocate space related to rounding error to the (larger) off-heap memory area.   <property>  
   <name>hbase.bucketcache.size</name>  
   <value>241664</value> 
</property>  
 In the  hbase-site.xml  file, set  hbase.bucketcache.ioengine  to  offheap . This enables BucketCache. <property>
 <name>hbase.bucketcache.ioengine</name>
 <value>offheap</value> 
</property>
   Restart (or rolling restart) the cluster. It can take a minute or more to allocate BucketCache, depending on how much memory you are allocating. Check logs for error messages.   Results  
     And Voila!  
 100% reduction in the number of evicted blocks  99% reduction in the number of evictions  A reduction of 90% in the number of cache misses  Hit ratio went up 10x  
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
	
					
			
		
	
	
	
	
				
		
	
	
			
    
	
		
		
		11-29-2016
	
		
		06:12 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							@Alessandro Lulli Your cluster has allocated the resources that you have asked for the Spark. It looks like you have a backlog of jobs that are already running. May be there is a job that is stuck in the queue. Kill all the previous applications and see if the job runs. You can also kill all the yarn applications and resubmit the jobs.  $ yarn application -list 
$ yarn application -kill $application_id 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-22-2016
	
		
		03:15 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 @Gobi Subramani  You are looking at it wrong. Spark Context is the main entry point into Spark and is the connection to a Spark cluster, and can be used to create RDDs, accumulators etc. on that cluster. You can run both in cluster as well as local mode and if you would have to define which one, you'd define that in the Spark context. The workers don't get the Spark context per say, but if you were to package your program into a jar, the cluster manager would be responsible for copying the jar file to the workers, before it allocates tasks.
 
						
					
					... View more