Member since
10-02-2015
76
Posts
80
Kudos Received
8
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1984 | 11-15-2016 03:28 PM | |
3396 | 11-15-2016 03:15 PM | |
2113 | 07-25-2016 08:03 PM | |
1742 | 05-11-2016 04:10 PM | |
3620 | 02-02-2016 08:09 PM |
01-17-2021
12:41 PM
Hi @vjain , To configure the BuckeCache in the descripption there is a two JVM properties. Which one to use please? : HBASE_OPTS or HBASE_REGIONSERVER_OPTS In the hbase-env.sh file for each RegionServer, or in the hbase-env.sh file supplied to Ambari, set the -XX:MaxDirectMemorySize argument forHBASE_REGIONSERVER_OPTS to the amount of direct memory you wish to allocate to HBase. In the configuration for the example discussed above, the value would be 241664m. (-XX:MaxDirectMemorySize accepts a number followed by a unit indicator; m indicates megabytes.) HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=241664m" Thanks, Helmi KHALIFA
... View more
04-20-2020
03:17 PM
Hi, I am trying to connect from local machine to a kerberized kafka cluster through python as python client, could you please let me know what all the properties to include along with bootstrap server ? consumer = KafkaConsumer('test',bootstrap_servers='XXX.ORG:XXXX', #client_id= kafka-python- + __version__, request_timeout_ms=30000, connections_max_idle_ms=9 * 60 * 1000, reconnect_backoff_ms=50, reconnect_backoff_max_ms=1000, max_in_flight_requests_per_connection=5, receive_buffer_bytes=None, send_buffer_bytes=None, #socket_options= [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], sock_chunk_bytes=4096, # undocumented experimental option sock_chunk_buffer_count=1000, # undocumented experimental option retry_backoff_ms=100, metadata_max_age_ms=300000, security_protocol='SASL_SSL', ssl_context=None, ssl_check_hostname=True, ssl_cafile=None, ssl_certfile=None, ssl_keyfile=None, ssl_password=None, ssl_crlfile=None, api_version=None, api_version_auto_timeout_ms=2000, #selector=selectors.DefaultSelector, sasl_mechanism='GSSAPI', #sasl_plain_username= None, #sasl_plain_password='XXXX', sasl_kerberos_service_name='XXXX', # metrics configs metric_reporters=[], metrics_num_samples=2, metrics_sample_window_ms=30000) Your help is appreciated. Thanks
... View more
04-14-2020
12:10 AM
you can use .repartition(1) DF..repartition(1) .....
... View more
02-03-2017
04:53 PM
1 Kudo
Word Count using Spark Streaming in Pyspark This is a WordCount example with the following Local File System as a source Calculate counts using reduceByKey and store them in a temp table Querying running counts through SQL
Setup: Define the function that sets up the StreamingContext This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations. But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
def creatingFunc():
ssc = StreamingContext(sc, batchIntervalSeconds)
# Set each DStreams in this context to remember RDDs it generated in the last given duration.
# DStreams remember RDDs only for a limited duration of time and releases them for garbage
# collection. This method allows the developer to specify how long to remember the RDDs (
# if the developer wishes to query old data outside the DStream computation).
ssc.remember(60)
lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
lines.pprint()
words = lines.flatMap(lambda line: line.split(","))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
def process(time, rdd):
df = sqlContext.createDataFrame(rdd)
df.registerTempTable("myCounts")
wordCounts.foreachRDD(process)
return ssc
Create a streaming context If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory
checkpointDir = "/LOCAL_DIR/DATA/"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
Start the streaming context
# This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
-------------------------------------------
Time: 2015-10-07 20:57:00
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
-------------------------------------------
Time: 2015-10-07 20:57:10
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
Interactive query
%sql select * from myCounts
world hello
Stop the streaming context.
ssc.stop(stopSparkContext=False)
Stop all streaming contexts running in this JVM from python This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.
if( not sc._jvm.StreamingContext.getActive().isEmpty() ):
sc._jvm.StreamingContext.getActive().get().stop(False)
... View more
Labels:
04-23-2018
05:39 AM
That's pretty useful, but only if you're storing the offsets in Zookeeper. Can someone please update this article with instructions for how to reset offsets when they are stored in Kafka?
... View more
02-02-2017
04:05 PM
1 Kudo
SparkR is an R package that provides a lightweight front end for using Apache Spark from R, thus supporting large-scale analytics on Hortonworks Data Platform (HDP) from the R language and environment. As of Spark 1.6.2, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, and aggregation on large datasets. In addition, SparkR supports distributed machine learning through MLlib. Architecture SparkR which was introduced to Spark in version1.4 consists of wrappers over DataFrames and DataFrame-based APIs. In SparkR, the APIs are similar to existing ones in R (or R packages), rather than Python/Java/Scala APIs. The reason is that SparkR is very popular is primarily because it allows users to write spark jobs while staying entirely in the R framework/model. A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. All of the examples on this page use sample data included in R or the Spark distribution and can be run using the ./bin/sparkR shell. R is very convenient for analytics and users love it. However, scalability is the main issue with R and SparkR is a means to address that. The key challenge that was addressed in implementing SparkR was having
support for invoking Spark functions on a JVM from R. Spark Driver runs the R Spark Context which passes functions to R-JVM bridge. This is responsible for launching Worker JVMs. Run SparkR example on HDP 1. Prerequisites Before you run SparkR, ensure that your cluster meets the following prerequisites:
R must be installed on all nodes. JAVA_HOME must be set on all nodes. 2. SparkR Example The following example launches SparkR and then uses R to create a people DataFrame, list part of the DataFrame, and read the DataFrame. (For more information about Spark DataFrames, see "Using the Spark DataFrame API").
Launch SparkR: $ su spark
$ cd /usr/hdp/2.5.0.0-3485/spark/bin
#$ ./sparkR Output similar to the following displays: Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.2
/_/
Spark context is available as sc, SQL context is available as sqlContext
>
From your R prompt (not the Spark shell), initialize SQLContext, create a DataFrame, and list the first few rows: sparkR> sqlContext <- sparkRSQL.init(sc)
sparkR> df <- createDataFrame(sqlContext, faithful)
sparkR> head(df) Output similar to the following displays: ...
eruptions waiting
1 3.600 79
2 1.800 54
3 3.333 74
4 2.283 62
5 4.533 85
6 2.883 55
Read the people DataFrame: sparkR> people <- read.df(sqlContext, "people.json", "json")
sparkR> Output similar to the following displays: age name
1 NA Michael
2 30 Andy
3 19 Justin head(people)
... View more
01-08-2017
01:16 AM
1 Kudo
If you are on a Mac and run into the following error when running start-all.sh: % sh start-all.sh
starting org.apache.spark.deploy.master.Master, logging to ...
localhost: ssh: connect to host localhost port 22: Connection refused
You need to enable "Remote Login" for your machine. From System Preferences, select Sharing, and then turn on Remote Login.
... View more
Labels:
12-28-2016
06:32 PM
1 Kudo
This first in a series of article lists 3 easy ways in which you can optimize your Spark code. This can be summed up as follows:
Use ReduceByKey over GroupByKey Be vary of Actions Gracefully Deal with Bad Quality Data Use ReduceByKey over GroupByKey Let's look at two different ways to compute word counts, one using reduceByKey and the other using groupByKey : val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()
val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data. With reduceByKey, the pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey ) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result. On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network. To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted. You can imagine that for a much larger dataset size, the difference in the amount of data you are shuffling becomes more exaggerated and different between reduceByKey and groupByKey . Here are more functions to prefer over groupByKey : combineByKey can be used when you are combining elements but your return type differs from your input value type. foldByKey merges the values for each key using an associative function and a neutral "zero value". Be Vary of Actions If your RDD is so large that all of it's elements won't fit in memory on the drive machine, don't do this: val values = myVeryLargeRDD.collect()
Collect will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash. Instead, you can make sure the number of elements you return is capped by calling take or takeSample , or perhaps filtering or sampling your RDD. Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory: countByKey countByValue collectAsMap If you really do need every one of these values of the RDD and the data is too big to fit into memory, you can write out the RDD to files or export the RDD to a database that is large enough to hold all the data. Gracefully Deal with Bad Quality Data When dealing with vast amounts of data, a common problem is that a small amount of the data is malformed or corrupt. Using a filter transformation, you can easily discard bad inputs, or use a map transformation if it's possible to fix the bad input. Or perhaps the best option is to use a flatMap function where you can try fixing the input but fall back to discarding the input if you can't. Let's consider the json strings below as input: input_rdd = sc.parallelize(["{\"value\": 1}", # Good
"bad_json", # Bad
"{\"value\": 2}", # Good
"{\"value\": 3" # Missing an ending brace.
])
If we tried to input this set of json strings to a sqlContext, it would clearly fail due to the malformed input's. sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.
Instead, let's try fixing the input with this python function: def try_correct_json(json_string):
try:
# First check if the json is okay.
json.loads(json_string)
return [json_string]
except ValueError:
try:
# If not, try correcting it by adding a ending brace.
try_to_correct_json = json_string + "}"
json.loads(try_to_correct_json)
return [try_to_correct_json]
except ValueError:
# The malformed json input can't be recovered, drop this input.
return []
Now, we can apply that function to fix our input and try again. This time we will succeed to read in three inputs: corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect() # Returns [Row(value=1), Row(value=2), Row(value=3)]
... View more
Labels:
12-01-2016
11:18 AM
Hi All, Fixing the following issue fixed also this one: https://community.hortonworks.com/questions/68989/datanodes-status-not-consistent.html#answer-69461 Regards Alessandro
... View more