Member since
10-02-2015
76
Posts
80
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
535 | 11-15-2016 03:28 PM | |
1353 | 11-15-2016 03:15 PM | |
522 | 07-25-2016 08:03 PM | |
473 | 05-11-2016 04:10 PM | |
635 | 02-02-2016 08:09 PM |
02-03-2017
04:53 PM
1 Kudo
Word Count using Spark Streaming in Pyspark This is a WordCount example with the following Local File System as a source Calculate counts using reduceByKey and store them in a temp table Querying running counts through SQL
Setup: Define the function that sets up the StreamingContext This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations. But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
def creatingFunc():
ssc = StreamingContext(sc, batchIntervalSeconds)
# Set each DStreams in this context to remember RDDs it generated in the last given duration.
# DStreams remember RDDs only for a limited duration of time and releases them for garbage
# collection. This method allows the developer to specify how long to remember the RDDs (
# if the developer wishes to query old data outside the DStream computation).
ssc.remember(60)
lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
lines.pprint()
words = lines.flatMap(lambda line: line.split(","))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
def process(time, rdd):
df = sqlContext.createDataFrame(rdd)
df.registerTempTable("myCounts")
wordCounts.foreachRDD(process)
return ssc
Create a streaming context If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory
checkpointDir = "/LOCAL_DIR/DATA/"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
Start the streaming context
# This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
-------------------------------------------
Time: 2015-10-07 20:57:00
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
-------------------------------------------
Time: 2015-10-07 20:57:10
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
Interactive query
%sql select * from myCounts
world hello
Stop the streaming context.
ssc.stop(stopSparkContext=False)
Stop all streaming contexts running in this JVM from python This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.
if( not sc._jvm.StreamingContext.getActive().isEmpty() ):
sc._jvm.StreamingContext.getActive().get().stop(False)
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- How-ToTutorial
- pyspark
- spark-streaming
Labels:
02-03-2017
04:44 PM
3 Kudos
If you’re using Apache Kafka, you know it persists all the messages on disk as a distributed commit log. You might sometimes want to take advantage of that and reprocess some of the messages. Assuming that you want to reprocess all the messages currently stored on your brokers and you set auto.offset.reset to smallest, you can just delete your consumers’ data from Zookeeper. After restarting, your consumers should start from the beginning. But what if you forgot or didn’t want to set auto.offset.reset in you consumers to smallest? Then you can manually set the offsets for each partition for your consumers to the smallest currently available offset. List all topics kafka-topics --list --zookeeper localhost:2181
Get Offsets for the topic kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1
Set the offset manually $ ZOOKEEPER_HOME/zkCli
$ ls /consumers/flasfka/offsets/vital_signs
$ set /consumers/flasfka/offsets/vital_signs/0 11492949
Verify the new offset kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1
... View more
- Find more articles tagged with:
- administration
- Data Ingestion & Streaming
- FAQ
- Kafka
- queue
Labels:
02-03-2017
04:26 PM
5 Kudos
The Apache Ambari project is aimed at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs. Ambari enables System Administrators to: Provision a Hadoop Cluster Ambari provides a step-by-step wizard for installing Hadoop services across any number of hosts. Ambari handles configuration of Hadoop services for the cluster. Manage a Hadoop Cluster Ambari provides central management for starting, stopping, and reconfiguring Hadoop services across the entire cluster. Monitor a Hadoop Cluster Ambari provides a dashboard for monitoring health and status of the Hadoop cluster. Ambari leverages Ambari Metrics System for metrics collection. Ambari leverages Ambari Alert Framework for system alerting and will notify you when your attention is needed (e.g., a node goes down, remaining disk space is low, etc). Ambari enables Application Developers and System Integrators to: Easily integrate Hadoop provisioning, management, and monitoring capabilities to their own applications with the Ambari REST APIs. $ export AMBARI_USER = <FILL IN>
$ export AMBARI_PASSWD = <FILL IN>
$ export AMBARI_HOST = <FILL IN>
$ export CLUSER_NAME = <FILL IN>
$ export SERVICE = <FILL IN>
$ export COMPONENT = <FILL IN>
$ export COMPONENT_HOST = <FILL IN> LIST SERVICES RELATED TO HIVE:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X GET "http://$AMBARI_HOST:8080/api/v1/clusters/PALEBLUEDOT/services/HIVE" STOP THE COMPONENT HIVE_SERVER_INTERACTIVE:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X PUT -d '{"RequestInfo":{"context":"Stop Service"},"Body":{"ServiceInfo":{"state":"INSTALLED"}}}' "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/services/$SERVICE/components/$COMPONENT" DELETE THE COMPONENT:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H "X-Requested-By: ambari" -X PUT -d '{"RequestInfo":{"context":"Stop All Components"},"Body":{"ServiceComponentInfo":{"state":"INSTALLED"}}}' "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/services/$SERVICE/components/$COMPONENT" DELETE A COMPONENT ON A HOST:
curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X DELETE "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/hosts/$COMPONENT_HOST/host_components/$COMPONENT" DELETE A HOST: curl -u $AMBARI_USER:$AMBARI_PASSWD -H 'X-Requested-By: ambari' -X DELETE "http://$AMBARI_HOST:8080/api/v1/clusters/$CLUSER_NAME/hosts/$COMPONENT_HOST"
... View more
- Find more articles tagged with:
- Ambari
- api
- Cloud & Operations
- FAQ
- hadoop
Labels:
02-02-2017
04:26 PM
Repo Description This Demo is built for Hortonworks HDP 2.3 Sandbox. This is based on the Hortonworks Twitter Demo Purpose: Monitor Twitter stream for the procided Hastags & act on unexpected increases in tweet volume
Ingest: Listen for Twitter streams related to Hashtags input in NiFi Garden Hose (GetHTTP) processor Processing:
Monitor tweets for unexpected volume Volume thresholds managed in HBASE Persistence:
HDFS (for future batch processing) Hive (for interactive query) HBase (for realtime alerts) Solr/Banana (for search and reports/dashboards) Refine:
Update threshold values based on historical analysis of tweet volumes Demo setup:
Either download and start prebuilt VM Start HDP 2.3 sandbox and run provided scripts to setup demo Repo Info Github Repo URL https://github.com/vedantja/hdp_nifi_twitter_demo Github account name vedantja Repo name hdp_nifi_twitter_demo
... View more
- Find more articles tagged with:
- ambari-extensions
- Data Ingestion & Streaming
Labels:
02-02-2017
04:23 PM
Repo Description The application simulates a typical patient metadata at a clinic. Repo Info Github Repo URL https://github.com/vedantja/nodejs-phoenix-app.git Github account name vedantja Repo name nodejs-phoenix-app.git
... View more
- Find more articles tagged with:
- ambari-extensions
- Data Ingestion & Streaming
- HBase
- nodejs
- Phoenix
Labels:
02-02-2017
04:05 PM
1 Kudo
SparkR is an R package that provides a lightweight front end for using Apache Spark from R, thus supporting large-scale analytics on Hortonworks Data Platform (HDP) from the R language and environment. As of Spark 1.6.2, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, and aggregation on large datasets. In addition, SparkR supports distributed machine learning through MLlib. Architecture SparkR which was introduced to Spark in version1.4 consists of wrappers over DataFrames and DataFrame-based APIs. In SparkR, the APIs are similar to existing ones in R (or R packages), rather than Python/Java/Scala APIs. The reason is that SparkR is very popular is primarily because it allows users to write spark jobs while staying entirely in the R framework/model. A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. All of the examples on this page use sample data included in R or the Spark distribution and can be run using the ./bin/sparkR shell. R is very convenient for analytics and users love it. However, scalability is the main issue with R and SparkR is a means to address that. The key challenge that was addressed in implementing SparkR was having
support for invoking Spark functions on a JVM from R. Spark Driver runs the R Spark Context which passes functions to R-JVM bridge. This is responsible for launching Worker JVMs. Run SparkR example on HDP 1. Prerequisites Before you run SparkR, ensure that your cluster meets the following prerequisites:
R must be installed on all nodes. JAVA_HOME must be set on all nodes. 2. SparkR Example The following example launches SparkR and then uses R to create a people DataFrame, list part of the DataFrame, and read the DataFrame. (For more information about Spark DataFrames, see "Using the Spark DataFrame API").
Launch SparkR: $ su spark
$ cd /usr/hdp/2.5.0.0-3485/spark/bin
#$ ./sparkR Output similar to the following displays: Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.2
/_/
Spark context is available as sc, SQL context is available as sqlContext
>
From your R prompt (not the Spark shell), initialize SQLContext, create a DataFrame, and list the first few rows: sparkR> sqlContext <- sparkRSQL.init(sc)
sparkR> df <- createDataFrame(sqlContext, faithful)
sparkR> head(df) Output similar to the following displays: ...
eruptions waiting
1 3.600 79
2 1.800 54
3 3.333 74
4 2.283 62
5 4.533 85
6 2.883 55
Read the people DataFrame: sparkR> people <- read.df(sqlContext, "people.json", "json")
sparkR> Output similar to the following displays: age name
1 NA Michael
2 30 Andy
3 19 Justin head(people)
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- data-science
- How-ToTutorial
- sparkr
02-02-2017
03:42 PM
3 Kudos
@Beau Piccart We plan to make Spark 2.1 GA in HDP 2.6 release around March 2017 time frame.
... View more
02-02-2017
03:38 PM
1 Kudo
kafka-python is best used with newer brokers (0.10 or 0.9), but is backwards-compatible with older versions (to 0.8.0). Some features will only be enabled on newer brokers, however; for example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka brokers. Supporting this feature for earlier broker releases would require writing and maintaining custom leadership election and membership / health check code (perhaps using zookeeper or consul). For older brokers, you can achieve something similar by manually assigning different partitions to each consumer instance with config management tools like chef, ansible, etc. This approach will work fine, though it does not support rebalancing on failures. See <http://kafka-python.readthedocs.org/en/master/compatibility.html> for more details. Please note that the master branch may contain unreleased features. For release documentation, please see readthedocs and/or python's inline help. >>> pip install kafka-python
KafkaConsumer KafkaConsumer is a high-level message consumer, intended to operate as similarly as possible to the official java client. Full support for coordinated consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+. See <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html> for API and configuration details. The consumer iterator returns ConsumerRecords, which are simple namedtuples that expose basic message attributes: topic, partition, offset, key, and value: >>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
KafkaProducer KafkaProducer is a high-level, asynchronous message producer. The class is intended to operate as similarly as possible to the official java client. See <http://kafka-python.readthedocs.org/en/master/apidoc/KafkaProducer.html> for more details. >>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')
>>> # Block until all pending messages are sent
>>> producer.flush()
>>> # Block until a single message is sent (or timeout)
>>> producer.send('foobar', b'another_message').get(timeout=60)
>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
Compression kafka-python supports gzip compression/decompression natively. To produce or consume lz4 compressed messages, you must install lz4tools and xxhash (modules may not work on python2.6). To enable snappy compression/decompression install python-snappy (also requires snappy library). See <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install> for more information. Protocol A secondary goal of kafka-python is to provide an easy-to-use protocol layer for interacting with kafka brokers via the python repl. This is useful for testing, probing, and general experimentation. The protocol support is leveraged to enable a KafkaClient.check_version() method that probes a kafka broker and attempts to identify which version it is running (0.8.0 to 0.10).
... View more
- Find more articles tagged with:
- How-ToTutorial
- Kafka
- python
- Sandbox & Learning
- streaming
Labels:
01-08-2017
01:16 AM
1 Kudo
If you are on a Mac and run into the following error when running start-all.sh: % sh start-all.sh
starting org.apache.spark.deploy.master.Master, logging to ...
localhost: ssh: connect to host localhost port 22: Connection refused
You need to enable "Remote Login" for your machine. From System Preferences, select Sharing, and then turn on Remote Login.
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- FAQ
- Spark
Labels:
12-28-2016
06:32 PM
1 Kudo
This first in a series of article lists 3 easy ways in which you can optimize your Spark code. This can be summed up as follows:
Use ReduceByKey over GroupByKey Be vary of Actions Gracefully Deal with Bad Quality Data Use ReduceByKey over GroupByKey Let's look at two different ways to compute word counts, one using reduceByKey and the other using groupByKey : val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()
val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data. With reduceByKey, the pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey ) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result. On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network. To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted. You can imagine that for a much larger dataset size, the difference in the amount of data you are shuffling becomes more exaggerated and different between reduceByKey and groupByKey . Here are more functions to prefer over groupByKey : combineByKey can be used when you are combining elements but your return type differs from your input value type. foldByKey merges the values for each key using an associative function and a neutral "zero value". Be Vary of Actions If your RDD is so large that all of it's elements won't fit in memory on the drive machine, don't do this: val values = myVeryLargeRDD.collect()
Collect will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash. Instead, you can make sure the number of elements you return is capped by calling take or takeSample , or perhaps filtering or sampling your RDD. Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory: countByKey countByValue collectAsMap If you really do need every one of these values of the RDD and the data is too big to fit into memory, you can write out the RDD to files or export the RDD to a database that is large enough to hold all the data. Gracefully Deal with Bad Quality Data When dealing with vast amounts of data, a common problem is that a small amount of the data is malformed or corrupt. Using a filter transformation, you can easily discard bad inputs, or use a map transformation if it's possible to fix the bad input. Or perhaps the best option is to use a flatMap function where you can try fixing the input but fall back to discarding the input if you can't. Let's consider the json strings below as input: input_rdd = sc.parallelize(["{\"value\": 1}", # Good
"bad_json", # Bad
"{\"value\": 2}", # Good
"{\"value\": 3" # Missing an ending brace.
])
If we tried to input this set of json strings to a sqlContext, it would clearly fail due to the malformed input's. sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.
Instead, let's try fixing the input with this python function: def try_correct_json(json_string):
try:
# First check if the json is okay.
json.loads(json_string)
return [json_string]
except ValueError:
try:
# If not, try correcting it by adding a ending brace.
try_to_correct_json = json_string + "}"
json.loads(try_to_correct_json)
return [try_to_correct_json]
except ValueError:
# The malformed json input can't be recovered, drop this input.
return []
Now, we can apply that function to fix our input and try again. This time we will succeed to read in three inputs: corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect() # Returns [Row(value=1), Row(value=2), Row(value=3)]
... View more
- Find more articles tagged with:
- best-practices
- Data Science & Advanced Analytics
- FAQ
- Spark
Labels:
12-27-2016
05:06 PM
8 Kudos
The distributed nature of HBase, coupled with the concepts of an ordered write log and a log-structured merge tree, makes HBase a great database for large scale data processing. Over the years, HBase has proven itself to be a reliable storage mechanism when you need random, realtime read/write access to your Big Data.
HBase is able to deliver because of a prioritization mechanism for memory utilization and caching structures over disk I/O. Memory store, implemented as the MemStore , accumulates data edits as they’re received, buffering them in memory (1). The block cache, an implementation of the BlockCache interface, keeps data blocks resident in memory after they’re read. OVERVIEW OF HBASE I/O
Memstore
The MemStore is important for accessing recent edits. Without the MemStore , accessing that data as it was written into the write log would require reading and deserializing entries back out of that file, at least a O(n) operation. Instead, MemStore maintains a skiplist structure, which enjoys a O(log n) access cost and requires no disk I/O. The MemStore contains just a tiny piece of the data stored in HBase, however. Caching Options
HBase provides two different BlockCache implementations: the default on-heap LruBlockCache and the BucketCache , which is (usually) off-heap. This section discusses benefits and drawbacks of each implementation, how to choose the appropriate option, and configuration options for each.
The following table describes several concepts related to HBase file operations and memory (RAM) caching.
Component
Description
HFile
An HFile contains table data, indexes
over that data, and metadata about the data.
Block
An HBase block is the smallest unit of data that can be
read from an HFile. Each HFile consists of a series of blocks. (Note: an
HBase block is different than an HDFS block or other underlying file system
blocks.)
BlockCache
BlockCache is the main HBase mechanism
for low-latency random read operations. BlockCache is one of two memory cache
structures maintained by HBase. When a block is read from HDFS, it is cached
in BlockCache. Frequent access to rows in a block cause the block to be kept
in cache, improving read performance.
MemStore
MemStore ("memory store") is the second of two
cache structures maintained by HBase. MemStore improves write performance. It
accumulates data until it is full, and then writes ("flushes") the
data to a new HFile on disk. MemStore serves two purposes: it increases the
total amount of data written to disk in a single operation, and it retains
recently-written data in memory for subsequent low-latency reads.
Write
Ahead Log (WAL)
The WAL is a log file that records
all changes to data until the data is successfully written to disk (MemStore
is flushed). This protects against data loss in the event of a failure before
MemStore contents are written to disk.
BlockCache and MemStore reside in random-access memory (RAM); HFiles and the Write Ahead Log are persisted to HDFS.
The read and write operations for any OOTB HBase implementation can be described as follows:
During write operations, HBase writes to WAL and MemStore. Data is flushed from MemStore to disk according to size limits and flush interval. During read operations, HBase reads the block from BlockCache or MemStore if it is available in those caches. Otherwise it reads from disk and stores a copy in BlockCache Caching Structure
By default, BlockCache resides in an area of RAM that is managed by the Java Virtual Machine ("JVM") Garbage Collector; this area of memory is known as “on-heap" memory or the "Java heap." The BlockCache implementation that manages on-heap cache is called LruBlockCache.
As you are going to see in the example below, if you have stringent read latency requirements and you have more than 20 GB of RAM available on your servers for use by HBase RegionServers, consider configuring BlockCache to use both on-heap and off-heap memory, as shown below. The associated BlockCache implementation is called BucketCache. Read latencies for BucketCache tend to be less erratic than LruBlockCache for large cache loads, because BucketCache (not JVM Garbage Collection) manages block cache allocation.
Figure below explains HBase's caching structure:
How to choose?
There are two reasons to consider enabling one of the alternative BlockCache implementations. The first is simply the amount of RAM you can dedicate to the region server. Community wisdom recognizes the upper limit of the JVM heap, as far as the region server is concerned, to be somewhere between 14GB and 31GB. The precise limit usually depends on a combination of hardware profile, cluster configuration, the shape of data tables, and application access patterns.
The other time to consider an alternative cache is when response latency really matters. Keeping the heap down around 8-12GB allows the CMS collector to run very smoothly, which has measurable impact on the 99th percentile of response times. Given this restriction, the only choices are to explore an alternative garbage collector or take one of these off-heap implementations for a spin.
Let's take a real world example and go from there. REAL WORLD SCENARIO
In the example that I use below, I had 3 HBase region server nodes, each with ~24TB of SSD based storage on each node and 512 GB RAM on each machine. I allocated: 256 GB for the RegionServer process (there is additional memory available for other HDP processes) Workload consists of 50% reads, 50% writes. HBase Heap Size= 20 GB (20480 MB)
1. The best way to monitor HBase is through the UI that comes with every HBase installation. It is known as the HBase Master UI. The link to the HBase Master UI by default is: http://<HBASE_MASTER_HOST:16010/master-status.
2. Click on the individual region server to view the stats for that server. See below:
3. Click on the stats and you will have all that you need to see about Block cache. BlockCache Issues In this particular example, we experienced a number of GC pauses and RegionTooBusyException had started flooding my logs. So followed the steps above to see if there was an issue with the way my HBase's block cache was behaving. The LruBlockCache is an LRU cache that contains three levels of block priority to allow for scan-resistance and in-memory ColumnFamilies: Single access priority: The first time a block is loaded from HDFS it normally has this priority and it will be part of the first group to be considered during evictions. The advantage is that scanned blocks are more likely to get evicted than blocks that are getting more usage. Multi access priority: If a block in the previous priority group is accessed again, it upgrades to this priority. It is thus part of the second group considered during evictions. In-memory access priority: If the block’s family was configured to be "in-memory", it will be part of this priority disregarding the number of times it was accessed. Catalog tables are configured like this. This group is the last one considered during evictions. As you can see and what I noticed as well, there were a large number of blocks that were evicted. Eviction is generally ok simply because, you would need to free up your cache as you continue to write data to your region server. However, in my case there was plenty of room in the cache and technically HBase should have continued to cache the data rather than evicting any blocks. The second problematic behavior was that there were very large number of cache misses (~70 million). Cache miss is a state where the data requested for processing by a component or application is not found in the cache memory. It causes execution delays by requiring the program or application to fetch the data from other cache levels or the main memory. So clearly, we had to solve for the following issues in order to get better performance from our HBase cluster: Number of evicted blocks Number of eviction occurrences Number of block requests that were cache misses and set to use the block cache Maximize the hit ratio
Issue Resolution In order to resolve the issues mentioned above, we need to configure and enable BucketCache. To do this, note the memory specifications and values, plus additional values shown in the following table:
Item
Description
Value or Formula
Example
A
Total physical memory
for RegionServer operations: on-heap plus off-heap ("direct")
memory (MB)
(hardware dependent)
262144
B
HBASE_HEAPSIZE (-Xmx) Maximum size of JVM heap (MB)
Recommendation: 20480
20480
C
-XX:MaxDirectMemorySize Amount of off-heap ("direct") memory to
allocate to HBase (MB)
A - B
262144 - 20480 = 241664
Dp
hfile.block.cache.size Proportion of maximum JVM heap size (HBASE_HEAPSIZE, -Xmx) to allocate to BlockCache. The sum of this value plus hbase.regionserver.global.memstore.size must not exceed 0.8.
(proportion of reads) * 0.8
0.50 * 0.8 = 0.4
Dm
Maximum amount of JVM
heap to allocate to BlockCache (MB)
B * Dp
20480 * 0.4 = 8192
Ep
hbase.regionserver.
global.memstore.size Proportion of
maximum JVM heap size (HBASE_HEAPSIZE, -Xmx) to allocate to MemStore. The sum of this
value plus hfile.block.cache.size must be less than or equal to 0.8.
0.8 - Dp
0.8 - 0.4 = 0.4
F
Amount of off-heap
memory to reserve for other uses (DFSClient; MB)
Recommendation: 1024 to
2048
2048
G
Amount of off-heap memory to allocate to
BucketCache (MB)
C - F
241664 - 2048 = 239616
hbase.bucketcache.size Total amount of memory to allocate to BucketCache, on-
and off-heap (MB)
Dm + G
264192
Enable BucketCache After completing the steps in Configuring BlockCache, follow these steps to configure BucketCache. In the hbase-env.sh file for each RegionServer, or in the hbase-env.sh file supplied to Ambari, set the -XX:MaxDirectMemorySize argument for HBASE_REGIONSERVER_OPTS to the amount of direct memory you wish to allocate to HBase. In the configuration for the example discussed above, the value would be 241664m . ( -XX:MaxDirectMemorySize accepts a number followed by a unit indicator; m indicates megabytes.)
HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=241664m" In the hbase-site.xml file, specify BucketCache size and percentage. In the example discussed above, the values would be 264192 and 0.90697674 , respectively. If you choose to round the proportion, round up. This will allocate space related to rounding error to the (larger) off-heap memory area. <property>
<name>hbase.bucketcache.size</name>
<value>241664</value>
</property>
In the hbase-site.xml file, set hbase.bucketcache.ioengine to offheap . This enables BucketCache. <property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
Restart (or rolling restart) the cluster. It can take a minute or more to allocate BucketCache, depending on how much memory you are allocating. Check logs for error messages. Results
And Voila!
100% reduction in the number of evicted blocks 99% reduction in the number of evictions A reduction of 90% in the number of cache misses Hit ratio went up 10x
... View more
- Find more articles tagged with:
- Cloud & Operations
- configuration
- FAQ
- HBase
- hbasestorage
Labels:
11-29-2016
06:12 PM
@Alessandro Lulli Your cluster has allocated the resources that you have asked for the Spark. It looks like you have a backlog of jobs that are already running. May be there is a job that is stuck in the queue. Kill all the previous applications and see if the job runs. You can also kill all the yarn applications and resubmit the jobs. $ yarn application -list
$ yarn application -kill $application_id
... View more
11-29-2016
06:06 PM
@Ganesan Vetri Please refer to this article: https://nicolasmaillard.com/2016/02/06/remote-debugging-201-spark/
... View more
11-22-2016
03:15 PM
1 Kudo
@Gobi Subramani You are looking at it wrong. Spark Context is the main entry point into Spark and is the connection to a Spark cluster, and can be used to create RDDs, accumulators etc. on that cluster. You can run both in cluster as well as local mode and if you would have to define which one, you'd define that in the Spark context. The workers don't get the Spark context per say, but if you were to package your program into a jar, the cluster manager would be responsible for copying the jar file to the workers, before it allocates tasks.
... View more
11-22-2016
03:06 PM
2 Kudos
@Fernando Lopez Bello It is recommended that you run Spark in either Yarn cluster or Yarn client mode. You will need to install Spark client on one of the machines. You can also install the thrift server for connecting external applications but that's optional. See link below: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.3/bk_installing_manually_book/content/ch_installing_spark_chapter.html
... View more
11-21-2016
03:26 PM
@Gobi Subramani Driver Program is the process that runs the main() function of the application and creates the Spark Context. The Cluster manger then acquires resources on the cluster. After this an executor process is launched on the resources acquired by the cluster manager. The task/s then gets sent to the individual executors for execution.
... View more
11-16-2016
03:09 AM
By default, zeppelin.spark.maxResult is set to 1000 which means that Spark SQL will result only a total of 1000 results regardless of the size of the dataset. If your notebook hangs, then you should restart the zeppelin service from Ambari. You can then navigate to your notebook and delete the paragraph manually.
... View more
11-16-2016
02:56 AM
Can you paste the output of the stderr log?
... View more
11-16-2016
02:06 AM
@Gundrathi babu You should use this package: https://spark-packages.org/package/HyukjinKwon/spark-xml val selectedData = df.select("author", "_id")
selectedData.write
.format("com.databricks.spark.xml")
.option("rootTag", "books")
.option("rowTag", "book")
.save("newbooks.xml")
... View more
11-15-2016
04:44 PM
@Zeeshan Ahmed This needs to be done after the step: remove version 1.3.1 I personally have not tried it, but you can download the version of Spark you are looking for from here: https://github.com/apache/ambari/tree/2ad42074f1633c5c6f56cf979bdaa49440457566/ambari-server/src/main/resources/common-services/SPARK Create a directory called SPARK in /var/lib/ambari-server/resources/stacks/HDP/2.3/services/ and copy the downloaded contents into this directory. Restart ambari server and you should see this version of Spark as an option in 'Add a service' menu in Ambari. Follow the steps to install the service.
... View more
11-15-2016
03:28 PM
@Zeeshan Ahmed Stop & Delete Spark Ambari service: $ curl -u admin:admin
-H "X-Requested-By:ambari" -i PUT -d '{"RequestInfo":{"context":"Stop
Service"},"Body":{"ServiceInfo":{"state":"INSTALLED"}}}'
http://AMBARI-URL:8080//api/v1/clusters/CLUSTERNAME/services/SPARK
$ curl -u admin:admin
-H "X-Requested-By:ambari" -X DELETE http://AMBARI-URL:8080/api/v1/clusters/CLUSTERNAME/services/SPARK Stop Spark 1.3.1 history server: su
- spark -c
"/usr/hdp/current/spark-client/sbin/stop-history-server.sh" Remove Spark 1.3.1: yum erase "spark*" Add the node where
you want Spark 1.4.1 History Server and Client to run: $ su - root wget -nv
$ http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.3.2.0-2950/hdpbn.repo
-O /etc/yum.repos.d/Spark141TP.repo
$ yum install spark_2_3_2_0_2950-master -y
$ conf-select create-conf-dir --package spark --stack-version 2.3.2.0-2950 --conf-version 0 cp
/etc/spark/2.3.0.0-2950/0/* /etc/spark/2.3.2.0-2950/0/
$ conf-select set-conf-dir --package spark --stack-version 2.3.2.0-2950 --conf-version 0
$ hdp-select set spark-client 2.3.2.0-2950
$ hdp-select set spark-historyserver 2.3.2.0-2950 Validate the Spark
installation. As user spark, run SparkPI example: su - spark -c
"cd /usr/hdp/current/spark-client" ./bin/spark-submit
--class org.apache.spark.examples.SparkPi --master yarn-client --num-executors
3 --driver-memory 512m --executor-memory 512m --executor-cores 1
lib/spark-examples*.jar 10
... View more
11-15-2016
03:15 PM
@Anchika Agarwal
Assuming that reading and writing data from Teradata is like MySQL or Postgresql.... You will need to include the JDBC driver for Teradata on the spark classpath.
$ SPARK_CLASSPATH=teradata-jdbc.jar bin/spark-shell
Use the following code in Spark shell. Modify and pass all necessary parameters
scala> val jdbcUsername = "USER_NAME"
scala> val jdbcPassword = "PASSWORD"
scala> val jdbcHostname = "HOSTNAME"
scala> val jdbcPort = port_num
scala> val jdbcDatabase ="DATABASE"
scala> val jdbcUrl = s"jdbc:teradata://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}?user=${jdbcUsername}&password=${jdbcPassword}"
scala> val connectionProperties = new java.util.Properties()
scala> Class.forName("com.teradata.jdbc.Driver")
scala> import java.sql.DriverManager
scala> val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword) connection.isClosed()
scala> sqlContext.table("jdbcDF").withColumnRenamed("table", "table_number") .write .jdbc(jdbcUrl, "tablename", connectionProperties)
... View more
10-22-2016
07:40 PM
What version of Tableau are you using? Spark connection only works with Tableau 9.2 and later.
... View more
10-12-2016
10:58 PM
1 Kudo
@mohamed sabri marnaoui Can you past the full stack trace and the code you are trying to run? You can get the spark job from the Yarn Resource manager UI. Go to Ambari -> Yarn -> QuickLinks -> Resource Manager UI SparkContext can shutdown for many different reasons including code errors.
... View more
09-22-2016
01:49 PM
4 Kudos
1 Challenges managing LAS files Siloed datasets Working with disparate, complex datasets under a traditional analysis model limits innovation and does not allow for the speed required for unconventionals LAS File Volume A single well could have 10s or 100s of LAS files making it difficult to provide a consolidated view for analysis Extrapolating this volume out across 1000s of wells requires an automated approach Manual QC process Identifying out of range data is time consuming and challenging even for experienced geoscientists and petrophysicists Management and storage is expensive What if cost could be reduced from $23/Gb to $.19/Gb; $55 GB could cost $1,200 or $10 Delta is 1-2 orders of magnitude Download Sample Data Set The wellbook concept is about a single view of an oil well and its history- something akin to a "Facebook Wall" for oil wells. This repo is built from data collected and made available by the North Dakota Industrial Commission. I used the wellindex.csv file to obtain a list of well file numbers (file_no), scraped their respective Production, Injection, Scout Ticket web pages, any available LAS format well logfiles, and loaded them into HDFS (/user/dev/wellbook/) for analysis. To avoid the HDFS small files problem I used the Apache Mahout seqdirectory tool for combining my textfiles into SequenceFiles: the keys are the filenames and the values are the contents of each textfile. Then I used a combination of Hive queries and the pyquery Python library for parsing relevant fields out of the raw HTML pages. List of Tables: wellbook.wells -- well metadata including geolocation and owner wellbook.well_surveys -- borehole curve wellbook.production -- how much oil, gas, and water was produced for each well on a monthly basis wellbook.auctions -- how much was paid for each parcel of land at auction wellbook.injections -- how much fluid and gas was injected into each well (for enhanced oil recovery and disposal purposes) wellbook.log_metadata -- metadata for each LAS well log file wellbook.log_readings -- sensor readings for each depth step in all LAS well log files
wellbook.log_key -- map of log mnemonics to their descriptions wellbook.formations -- manually annotated map of well depths to rock formations wellbook.formations_key -- Descriptions of rock formations
wellbook.water_sites -- metadata for water quality monitoring stations in North Dakota 2 Watch video to get started Automated Analysis of LAS Files 3 Join with Production / EOR / Auction data (Power BI) Get a 360-degree view of the well <Hive tables - Master> a. Predictive Analytics (Linear Regression) b. Visualize the data using Yarn Ready applications 4 Dynamic Well Logs Query for multiple mnemonic readings per well or multiple wells in a given region. Normalize and graph data for specific depth steps on the fly. 5 Dynamic Time Warping Run the algorithm per well or for all wells and all mnemonics and visualize the results to know what readings belong to the same curve class. Using supervised machine learning, enable automatic bucketing of mnemonics belonging to the same curve class. Build on your own Clone the git below and follow the steps in Readme to create your own demo. $ git clone https://github.com/vedantja/wellbook.git For more questions, please contact Vedant Jain. Special thanks to Randy Gelhausen and Ofer Mendelevitch for the work and help put into this.
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- datascience
- demo
- dtw
- Hive
- How-ToTutorial
- oilandgas
- python
- Spark
09-12-2016
12:40 AM
The FlowFile Repository is where NiFi keeps track of the state of what it knows about a given FlowFile that is presently active in the flow. The implementation of the repository is pluggable. The default approach is a persistent Write-Ahead Log located on a specified disk partition. Apart from the fact that NiFi itself lives in a JVM, there is no memory involvement. The processor retrieves the FlowFiles and metadata from the disk.
... View more
09-08-2016
07:24 PM
8 Kudos
HBase supports several different compression algorithms which can be enabled on a ColumnFamily. Data block encoding attempts to limit duplication of information in keys, taking advantage of some of the fundamental designs and patterns of HBase, such as sorted row keys and the schema of a given table. Compressors reduce the size of large, opaque byte arrays in cells, and can significantly reduce the storage space needed to store uncompressed data.
Compressors and data block encoding can be used together on the same ColumnFamily.
Changes Take Effect Upon Compaction. If you change compression or encoding for a ColumnFamily, the changes take effect during compaction.
Some codecs take advantage of capabilities built into Java, such as GZip compression. Others rely on native libraries. Native libraries may be available as part of Hadoop, such as LZ4. In this case, HBase only needs access to the appropriate shared library. Other codecs, such as Google Snappy, need to be installed first. Some codecs are licensed in ways that conflict with HBase's license and cannot be shipped as part of HBase.
This articles discusses common codecs that are used and tested with HBase. No matter what codec you use, be sure to test that it is installed correctly and is available on all nodes in your cluster.
Block Compressors
none
Snappy
LZO
LZ4
GZ
LZO
Unfortunately, HBase cannot ship with LZO because of the licensing issues; HBase is Apache-licensed, LZO is GPL. Therefore LZO install is to be done post-HBase install. See the
Using LZO Compression wiki page for how to make LZO work with HBase.
A common problem users run into when using LZO is that while initial setup of the cluster runs smooth, a month goes by and some sysadmin goes to add a machine to the cluster only they'll have forgotten to do the LZO fixup on the new machine. In versions since HBase 0.90.0, we should fail in a way that makes it plain what the problem is, but maybe not.
GZIP
GZIP will generally compress better than LZO though slower. For some setups, better compression may be preferred. Java will use java's GZIP unless the native Hadoop libs are available on the CLASSPATH; in this case it will use native compressors instead (If the native libs are NOT present, you will see lots of
Got brand-new compressor reports in your logs
SNAPPY
If snappy is installed, HBase can make use of it (courtesy of
hadoop-snappy [32] ).
Build and install snappy on all nodes of your cluster (see below)
Use CompressionTest to verify snappy support is enabled and the libs can be loaded ON ALL NODES of your cluster:
$ hbase org.apache.hadoop.hbase.util.CompressionTest hdfs://host/path/to/hbase snappy
Create a column family with snappy compression and verify it in the hbase shell:
$ hbase> create 't1', { NAME => 'cf1', COMPRESSION => 'SNAPPY' }
hbase> describe 't1'
In the output of the "describe" command, you need to ensure it lists "COMPRESSION => 'SNAPPY'"
Installation
You will find the snappy library file under the .libs directory from your Snappy build (For example /home/hbase/snappy-1.0.5/.libs/). The file is called libsnappy.so.1.x.x where 1.x.x is the version of the snappy code you are building. You can either copy this file into your hbase directory under libsnappy.so name, or simply create a symbolic link to it. The second file you need is the hadoop native library. You will find this file in your hadoop installation directory under lib/native/Linux-amd64-64/ or lib/native/Linux-i386-32/. The file you are looking for is libhadoop.so.1.x.x. Again, you can simply copy this file or link to it, under the name libhadoop.so. At the end of the installation, you should have both libsnappy.so and libhadoop.so links or files present into lib/native/Linux-amd64-64 or into lib/native/Linux-i386-32 To point hbase at snappy support, in hbase-env.sh set export HBASE_LIBRARY_PATH=/pathtoyourhadoop/lib/native/Linux-amd64-64<br> In /pathtoyourhadoop/lib/native/Linux-amd64-64 you should have something like: libsnappy.a
libsnappy.so
libsnappy.so.1
libsnappy.so.1.1.2
Data Block Encoding Types Prefix - Often, keys are very similar. Specifically, keys often share a common prefix and only differ near the end. For instance, one key might be RowKey:Family:Qualifier0 and the next key might be RowKey:Family:Qualifier1 . In Prefix encoding, an extra column is added which holds the length of the prefix shared between the current key and the previous key. Assuming the first key here is totally different from the key before, its prefix length is 0. The second key's prefix length is 23 , since they have the first 23 characters in common.
Obviously if the keys tend to have nothing in common, Prefix will not provide much benefit. Diff - Diff encoding expands upon Prefix encoding. Instead of considering the key sequentially as a monolithic series of bytes, each key field is split so that each part of the key can be compressed more efficiently. Two new fields are added: timestamp and type. If the ColumnFamily is the same as the previous row, it is omitted from the current row. If the key length, value length or type are the same as the previous row, the field is omitted. In addition, for increased compression, the timestamp is stored as a Diff from the previous row's timestamp, rather than being stored in full. Given the two row keys in the Prefix example, and given an exact match on timestamp and the same type, neither the value length, or type needs to be stored for the second row, and the timestamp value for the second row is just 0, rather than a full timestamp.
Diff encoding is disabled by default because writing and scanning are slower but more data is cached. Fast Diff - Fast Diff works similar to Diff, but uses a faster implementation. It also adds another field which stores a single bit to track whether the data itself is the same as the previous row. If it is, the data is not stored again. Fast Diff is the recommended codec to use if you have long keys or many columns. The data format is nearly identical to Diff encoding, so there is not an image to illustrate it. Prefix Tree encoding was introduced as an experimental feature in HBase 0.96. It provides similar memory savings to the Prefix, Diff, and Fast Diff encoder, but provides faster random access at a cost of slower encoding speed. Prefix Tree may be appropriate for applications that have high block cache hit ratios. It introduces new 'tree' fields for the row and column. The row tree field contains a list of offsets/references corresponding to the cells in that row. This allows for a good deal of compression. Which Compressor or Data Block Encoder To Use?
The compression or codec type to use depends on the characteristics of your data. Choosing the wrong type could cause your data to take more space rather than less, and can have performance implications. In general, you need to weigh your options between smaller size and faster compression/decompression. Following are some general guidelines, expanded from a discussion at Documenting Guidance on compression and codecs. If you have long keys (compared to the values) or many columns, use a prefix encoder. FAST_DIFF is recommended, as more testing is needed for Prefix Tree encoding. If the values are large (and not precompressed, such as images), use a data block compressor.
Use GZIP for cold data, which is accessed infrequently. GZIP compression uses more CPU resources than Snappy or LZO, but provides a higher compression ratio. Use Snappy or LZO for hot data, which is accessed frequently. Snappy and LZO use fewer CPU resources than GZIP, but do not provide as high of a compression ratio. In most cases, enabling Snappy or LZO by default is a good choice, because they have a low performance overhead and provide space savings.
Before Snappy became available by Google in 2011, LZO was the default. Snappy has similar qualities as LZO but has been shown to perform better.
... View more
- Find more articles tagged with:
- compression
- FAQ
- Hadoop Core
- HBase
Labels:
08-04-2016
11:03 AM
12 Kudos
This article is applicable for anyone deploying a Kafka cluster for production use. It points out certain high level points that you should be thinking about before deploying your cluster.
It is divided into 3 parts: Important configurations for the Producer, Broker, Consumer and tips for Performance tuning.
Important Client /
Producer Configurations:
The most important
producer configurations:
compression
The compression type
for all data generated by the producer. The default is none (i.e. no
compression). Valid values arenone, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of
batching will also impact the compression ratio (more batching means better
compression).
Name:
compression.type Default:
None
sync vs async production
Batching is one of the
big drivers of efficiency, and to enable batching the Kafka producer will
attempt to accumulate data in memory and to send out larger batches in a single
request.
Name:
producer.type Default:
sync
batch size (for async producers)
A small batch size
will make batching less common and may reduce throughput (a batch size of zero
will disable batching entirely). A very large batch size may use memory a bit
more wastefully as we will always allocate a buffer of the specified batch size
in anticipation of additional records.
Name:
batch.size Default:
200
maximum message size
This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.
Account for this when doing disk sizing. Average Message size+Retention Period * Replication factor
Name:
max.message.bytes Default: 1,000,000
acks
The number of
acknowledgments the producer requires the leader to have received before
considering a request complete. This controls the durability of records that are
sent. The following settings are common:
acks=0 If set to zero then the producer will not wait for any
acknowledgment from the server at all. The record will be immediately added to
the socket buffer and considered sent. No guarantee can be made that the server
has received the record in this case, and theretries configuration will not take effect (as the client won't
generally know of any failures). The offset given back for each record will
always be set to -1.
acks=1 This will mean the leader will write the record to its
local log but will respond without awaiting full acknowledgement from all
followers. In this case should the leader fail immediately after acknowledging
the record but before the followers have replicated it then the record will be lost.
acks=all This means the leader will wait for the full set of
in-sync replicas to acknowledge the record. This guarantees that the record
will not be lost as long as at least one in-sync replica remains alive. This is
the strongest available guarantee.
Name:
acks Default:
1
To see full list of
producer configs:
http://kafka.apache.org/documentation.html#producerconfigs
Brokers:
The important configurations
are the following:
compression.type
Specify the final compression
type for a given topic. This configuration accepts the standard compression
codecs ('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which
is equivalent to no compression; and 'producer' which means retain the original
compression codec set by the producer.
Name:
compression.type Default:
producer
num.replica.fetchers
Number of fetcher threads
used to replicate messages from a source broker. Increasing this value can
increase the degree of I/O parallelism in the follower broker.
Name:
num.replica.fetchers Default:
1
offsets.topic.replication.factor
The replication factor for the offsets topic
(set higher to ensure availability). To ensure that the effective replication
factor of the offsets topic is the configured value, the number of alive
brokers has to be at least the replication factor at the time of the first
request for the offsets topic. If not, either the offsets topic creation will
fail or it will get a replication factor of min(alive brokers, configured
replication factor).
Name:
offsets.topic.replication.factor Default:
3
Credits: http://www.michael-noll.com/blog/2013/03/13/running-a-multi-broker-apache-kafka-cluster-on-a-single-node/
Number of partitions for a topic
Number of topics and partitions impact how much can be stored in page cache
Topic/Partition is unit of parallelism in Kafka
Partitions in Kafka drives the parallelism of consumers
Throughput requirements drive number of number of partitions on a topic.
Formula:
P= Throughput from producer to a single partition is
https://github.com/apache/kafka/blob/trunk/bin/kafka-producer-perf-test.sh
C= Throughput from a single partition to a consumer
https://github.com/apache/kafka/blob/trunk/bin/kafka-consumer-perf-test.sh
T=Target throughput–Required # of partitions= Max (T/P, T/C)
CONSUMERS:
max.partition.fetch.bytes
The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be #partitions * max.partition.fetch.bytes . This size must be at least as large as the message.max.bytes or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.
fetch.message.max.bytes
The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. The fetch request size must be at least as large as
message.max.bytes or else it is possible for the producer to send messages larger than the consumer can fetch.
Performance Tuning
General
num.io.threads should be greater than the number of disks dedicated for Kafka. I strongly recommend to start with same number of disks first.
num.network.threads adjust based on the number of producers + number of consumers + replication factor
Message size will affects network bandwidth. For higher performance kafka cluster, use 10GbEcards
Kernel tuning
While most Kafka deployments contain a lot of RAM, you can get comparable performance with spinning disks depending on the type of IOPS you need.
Give huge page cache & avoid disks if you are going to be doing random IO operations.
Kafka broker will always write to page cache (OS) first
Messages are flushed by based on several configurations
– http://kafka.apache.org/documentation.html
Flush message to disk controlled by log.flush.interval.message
Defaults to Long.MAX_VALUE which is very big
The number of messages written to a log partition before we force an fsync on the log
Name: log.flush.interval.message
The per-topic override for log.flush.interval.messages
Name: log.flush.interval.ms.per.topic
OS will flush (pdflush) regardless of kafka params
https://en.wikipedia.org/wiki/Page_cache
Default flush configurations will cover most use cases as the durability is provided by replication
Source: http://queue.acm.org/detail.cfm?id=1563874
Disk throughput & durability
If you enable auto commit enable.auto.commit , add longer intervals for auto.commit.interval.ms (default is 500 ms)
Since Kafka has replication the redundancy provided by RAID can also be provided at theapplication
Isolate disk and only use to store the application data
Using multi disk with multi-directory often resulted in better performance than using RAID
RAID can potentially do a better load balancing of the data at the low-level. But the major downside of RAID is usually a big performance hit for write throughput and reduces the available disk space. Another potential benefit of RAID is the ability to tolerate disk failures. Rebuilding the RAID array is so I/O intensive that it effectively disables the server,so this does not provide much real availability improvement.
Java/JVM tuning
Minimize GC pauses by using Oracle JDK it uses new G1 garbage-first collector
Kafka Heap Size
From HCC Article, by default kafka-broker jvm is set to 1Gb this can be increased using Ambari kafka-env template. When you are sending large messages JVM garbage collection can be an issue. Try to keep the Kafka Heap size below 4GB.
Example: In kafka-env.sh add following settings.
export KAFKA_HEAP_OPTS="-Xmx16g -Xms16g"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
Kafka Deployment Example
8 coredual socket (16CPU)
64-128 Gig ofRam
8-16GB Kafka JVM
Rest for OS + Page Cache
8x2TB disk
EXT4
XFS may handle locking during fsync better. EXT4 mostly used in field
SAS or SSD preferred
Based on Retention period
Account for Broker and # of partitions it will handle
JBOD – Assuming you will use replication
Optional
RAID10
The primary downside of RAIDis that it is usually a big performance hit for write throughput and reduces the available disk space.
10GigE Bonded NICs for extreme performance
LinkedIn Published Benchmarks
SOURCE: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
Single producer thread, no replication
821,557 records/sec(78.3 MB/sec)
Single producer thread, 3x asynchronous replication
786,980 records/sec(75.1 MB/sec)
... View more
- Find more articles tagged with:
- Data Ingestion & Streaming
- FAQ
- Kafka
- realtime
Labels:
07-25-2016
08:03 PM
1 Kudo
@Timothy Spann There is no officially supported processor to schedule VORA jobs using NiFi. However, A VORA agent communicates directly with the Spark Client when running in Yarn mode. You can write your program in Python or Scala which invokes the VORA classes and then call those scripts through spark-submit in NiFi using the ExecuteCommand processor.
... View more
07-22-2016
02:19 PM
I created my own docker vm with NiFi 0.6.1. I then ran the vm using the following command: docker run -i -p 8080:8080 -t vedantja/nificentos7 /bin/bash When I got to localhost:8080/nifi is where I am getting the error
... View more
Labels: