Member since
10-02-2015
76
Posts
80
Kudos Received
8
Solutions
01-17-2021
12:41 PM
Hi @vjain , To configure the BuckeCache in the descripption there is a two JVM properties. Which one to use please? : HBASE_OPTS or HBASE_REGIONSERVER_OPTS In the hbase-env.sh file for each RegionServer, or in the hbase-env.sh file supplied to Ambari, set the -XX:MaxDirectMemorySize argument forHBASE_REGIONSERVER_OPTS to the amount of direct memory you wish to allocate to HBase. In the configuration for the example discussed above, the value would be 241664m. (-XX:MaxDirectMemorySize accepts a number followed by a unit indicator; m indicates megabytes.) HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize=241664m" Thanks, Helmi KHALIFA
... View more
04-20-2020
03:17 PM
Hi, I am trying to connect from local machine to a kerberized kafka cluster through python as python client, could you please let me know what all the properties to include along with bootstrap server ? consumer = KafkaConsumer('test',bootstrap_servers='XXX.ORG:XXXX', #client_id= kafka-python- + __version__, request_timeout_ms=30000, connections_max_idle_ms=9 * 60 * 1000, reconnect_backoff_ms=50, reconnect_backoff_max_ms=1000, max_in_flight_requests_per_connection=5, receive_buffer_bytes=None, send_buffer_bytes=None, #socket_options= [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], sock_chunk_bytes=4096, # undocumented experimental option sock_chunk_buffer_count=1000, # undocumented experimental option retry_backoff_ms=100, metadata_max_age_ms=300000, security_protocol='SASL_SSL', ssl_context=None, ssl_check_hostname=True, ssl_cafile=None, ssl_certfile=None, ssl_keyfile=None, ssl_password=None, ssl_crlfile=None, api_version=None, api_version_auto_timeout_ms=2000, #selector=selectors.DefaultSelector, sasl_mechanism='GSSAPI', #sasl_plain_username= None, #sasl_plain_password='XXXX', sasl_kerberos_service_name='XXXX', # metrics configs metric_reporters=[], metrics_num_samples=2, metrics_sample_window_ms=30000) Your help is appreciated. Thanks
... View more
02-03-2017
04:53 PM
1 Kudo
Word Count using Spark Streaming in Pyspark This is a WordCount example with the following Local File System as a source Calculate counts using reduceByKey and store them in a temp table Querying running counts through SQL
Setup: Define the function that sets up the StreamingContext This function has to create a new StreamingContext and set it up with all the input stream, transformation and output operations. But it must not start the StreamingContext, it is just to capture all the setup code necessary for your streaming application in one place.
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
def creatingFunc():
ssc = StreamingContext(sc, batchIntervalSeconds)
# Set each DStreams in this context to remember RDDs it generated in the last given duration.
# DStreams remember RDDs only for a limited duration of time and releases them for garbage
# collection. This method allows the developer to specify how long to remember the RDDs (
# if the developer wishes to query old data outside the DStream computation).
ssc.remember(60)
lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
lines.pprint()
words = lines.flatMap(lambda line: line.split(","))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
def process(time, rdd):
df = sqlContext.createDataFrame(rdd)
df.registerTempTable("myCounts")
wordCounts.foreachRDD(process)
return ssc
Create a streaming context If you don't want the streaming application to restart from the checkpoint, make sure to delete the checkpoint directory
checkpointDir = "/LOCAL_DIR/DATA/"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
Start the streaming context
# This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
-------------------------------------------
Time: 2015-10-07 20:57:00
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
-------------------------------------------
Time: 2015-10-07 20:57:10
-------------------------------------------
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
hello,world
...
Interactive query
%sql select * from myCounts
world hello
Stop the streaming context.
ssc.stop(stopSparkContext=False)
Stop all streaming contexts running in this JVM from python This is helpful if you loose the handle to the streaming context. E.g: you detached the notebook from the cluster while streaming is running. In this case, you lost the handle to the streaming context from the notebook but the streaming job is running in background. If you want to stop the streaming job, then you need to access the streaming context in the jvm directly and stop it.
if( not sc._jvm.StreamingContext.getActive().isEmpty() ):
sc._jvm.StreamingContext.getActive().get().stop(False)
... View more
Labels:
04-23-2018
05:39 AM
That's pretty useful, but only if you're storing the offsets in Zookeeper. Can someone please update this article with instructions for how to reset offsets when they are stored in Kafka?
... View more
02-02-2017
04:05 PM
1 Kudo
SparkR is an R package that provides a lightweight front end for using Apache Spark from R, thus supporting large-scale analytics on Hortonworks Data Platform (HDP) from the R language and environment. As of Spark 1.6.2, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, and aggregation on large datasets. In addition, SparkR supports distributed machine learning through MLlib. Architecture SparkR which was introduced to Spark in version1.4 consists of wrappers over DataFrames and DataFrame-based APIs. In SparkR, the APIs are similar to existing ones in R (or R packages), rather than Python/Java/Scala APIs. The reason is that SparkR is very popular is primarily because it allows users to write spark jobs while staying entirely in the R framework/model. A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. All of the examples on this page use sample data included in R or the Spark distribution and can be run using the ./bin/sparkR shell. R is very convenient for analytics and users love it. However, scalability is the main issue with R and SparkR is a means to address that. The key challenge that was addressed in implementing SparkR was having
support for invoking Spark functions on a JVM from R. Spark Driver runs the R Spark Context which passes functions to R-JVM bridge. This is responsible for launching Worker JVMs. Run SparkR example on HDP 1. Prerequisites Before you run SparkR, ensure that your cluster meets the following prerequisites:
R must be installed on all nodes. JAVA_HOME must be set on all nodes. 2. SparkR Example The following example launches SparkR and then uses R to create a people DataFrame, list part of the DataFrame, and read the DataFrame. (For more information about Spark DataFrames, see "Using the Spark DataFrame API").
Launch SparkR: $ su spark
$ cd /usr/hdp/2.5.0.0-3485/spark/bin
#$ ./sparkR Output similar to the following displays: Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.2
/_/
Spark context is available as sc, SQL context is available as sqlContext
>
From your R prompt (not the Spark shell), initialize SQLContext, create a DataFrame, and list the first few rows: sparkR> sqlContext <- sparkRSQL.init(sc)
sparkR> df <- createDataFrame(sqlContext, faithful)
sparkR> head(df) Output similar to the following displays: ...
eruptions waiting
1 3.600 79
2 1.800 54
3 3.333 74
4 2.283 62
5 4.533 85
6 2.883 55
Read the people DataFrame: sparkR> people <- read.df(sqlContext, "people.json", "json")
sparkR> Output similar to the following displays: age name
1 NA Michael
2 30 Andy
3 19 Justin head(people)
... View more
Labels:
01-08-2017
01:16 AM
1 Kudo
If you are on a Mac and run into the following error when running start-all.sh: % sh start-all.sh
starting org.apache.spark.deploy.master.Master, logging to ...
localhost: ssh: connect to host localhost port 22: Connection refused
You need to enable "Remote Login" for your machine. From System Preferences, select Sharing, and then turn on Remote Login.
... View more
Labels:
12-28-2016
06:32 PM
1 Kudo
This first in a series of article lists 3 easy ways in which you can optimize your Spark code. This can be summed up as follows:
Use ReduceByKey over GroupByKey Be vary of Actions Gracefully Deal with Bad Quality Data Use ReduceByKey over GroupByKey Let's look at two different ways to compute word counts, one using reduceByKey and the other using groupByKey : val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD
.reduceByKey(_ + _)
.collect()
val wordCountsWithGroup = wordPairsRDD
.groupByKey()
.map(t => (t._1, t._2.sum))
.collect()
While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data. With reduceByKey, the pairs on the same machine with the same key are combined (by using the lamdba function passed into reduceByKey ) before the data is shuffled. Then the lamdba function is called again to reduce all the values from each partition to produce one final result. On the other hand, when calling groupByKey - all the key-value pairs are shuffled around. This is a lot of unnessary data to being transferred over the network. To determine which machine to shuffle a pair to, Spark calls a partitioning function on the key of the pair. Spark spills data to disk when there is more data shuffled onto a single executor machine than can fit in memory. However, it flushes out the data to disk one key at a time - so if a single key has more key-value pairs than can fit in memory, an out of memory exception occurs. This will be more gracefully handled in a later release of Spark so the job can still proceed, but should still be avoided - when Spark needs to spill to disk, performance is severely impacted. You can imagine that for a much larger dataset size, the difference in the amount of data you are shuffling becomes more exaggerated and different between reduceByKey and groupByKey . Here are more functions to prefer over groupByKey : combineByKey can be used when you are combining elements but your return type differs from your input value type. foldByKey merges the values for each key using an associative function and a neutral "zero value". Be Vary of Actions If your RDD is so large that all of it's elements won't fit in memory on the drive machine, don't do this: val values = myVeryLargeRDD.collect()
Collect will attempt to copy every single element in the RDD onto the single driver program, and then run out of memory and crash. Instead, you can make sure the number of elements you return is capped by calling take or takeSample , or perhaps filtering or sampling your RDD. Similarly, be cautious of these other actions as well unless you are sure your dataset size is small enough to fit in memory: countByKey countByValue collectAsMap If you really do need every one of these values of the RDD and the data is too big to fit into memory, you can write out the RDD to files or export the RDD to a database that is large enough to hold all the data. Gracefully Deal with Bad Quality Data When dealing with vast amounts of data, a common problem is that a small amount of the data is malformed or corrupt. Using a filter transformation, you can easily discard bad inputs, or use a map transformation if it's possible to fix the bad input. Or perhaps the best option is to use a flatMap function where you can try fixing the input but fall back to discarding the input if you can't. Let's consider the json strings below as input: input_rdd = sc.parallelize(["{\"value\": 1}", # Good
"bad_json", # Bad
"{\"value\": 2}", # Good
"{\"value\": 3" # Missing an ending brace.
])
If we tried to input this set of json strings to a sqlContext, it would clearly fail due to the malformed input's. sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.
Instead, let's try fixing the input with this python function: def try_correct_json(json_string):
try:
# First check if the json is okay.
json.loads(json_string)
return [json_string]
except ValueError:
try:
# If not, try correcting it by adding a ending brace.
try_to_correct_json = json_string + "}"
json.loads(try_to_correct_json)
return [try_to_correct_json]
except ValueError:
# The malformed json input can't be recovered, drop this input.
return []
Now, we can apply that function to fix our input and try again. This time we will succeed to read in three inputs: corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect() # Returns [Row(value=1), Row(value=2), Row(value=3)]
... View more
Labels:
09-22-2016
01:49 PM
4 Kudos
1 Challenges managing LAS files Siloed datasets Working with disparate, complex datasets under a traditional analysis model limits innovation and does not allow for the speed required for unconventionals LAS File Volume A single well could have 10s or 100s of LAS files making it difficult to provide a consolidated view for analysis Extrapolating this volume out across 1000s of wells requires an automated approach Manual QC process Identifying out of range data is time consuming and challenging even for experienced geoscientists and petrophysicists Management and storage is expensive What if cost could be reduced from $23/Gb to $.19/Gb; $55 GB could cost $1,200 or $10 Delta is 1-2 orders of magnitude Download Sample Data Set The wellbook concept is about a single view of an oil well and its history- something akin to a "Facebook Wall" for oil wells. This repo is built from data collected and made available by the North Dakota Industrial Commission. I used the wellindex.csv file to obtain a list of well file numbers (file_no), scraped their respective Production, Injection, Scout Ticket web pages, any available LAS format well logfiles, and loaded them into HDFS (/user/dev/wellbook/) for analysis. To avoid the HDFS small files problem I used the Apache Mahout seqdirectory tool for combining my textfiles into SequenceFiles: the keys are the filenames and the values are the contents of each textfile. Then I used a combination of Hive queries and the pyquery Python library for parsing relevant fields out of the raw HTML pages. List of Tables: wellbook.wells -- well metadata including geolocation and owner wellbook.well_surveys -- borehole curve wellbook.production -- how much oil, gas, and water was produced for each well on a monthly basis wellbook.auctions -- how much was paid for each parcel of land at auction wellbook.injections -- how much fluid and gas was injected into each well (for enhanced oil recovery and disposal purposes) wellbook.log_metadata -- metadata for each LAS well log file wellbook.log_readings -- sensor readings for each depth step in all LAS well log files
wellbook.log_key -- map of log mnemonics to their descriptions wellbook.formations -- manually annotated map of well depths to rock formations wellbook.formations_key -- Descriptions of rock formations
wellbook.water_sites -- metadata for water quality monitoring stations in North Dakota 2 Watch video to get started Automated Analysis of LAS Files 3 Join with Production / EOR / Auction data (Power BI) Get a 360-degree view of the well <Hive tables - Master> a. Predictive Analytics (Linear Regression) b. Visualize the data using Yarn Ready applications 4 Dynamic Well Logs Query for multiple mnemonic readings per well or multiple wells in a given region. Normalize and graph data for specific depth steps on the fly. 5 Dynamic Time Warping Run the algorithm per well or for all wells and all mnemonics and visualize the results to know what readings belong to the same curve class. Using supervised machine learning, enable automatic bucketing of mnemonics belonging to the same curve class. Build on your own Clone the git below and follow the steps in Readme to create your own demo. $ git clone https://github.com/vedantja/wellbook.git For more questions, please contact Vedant Jain. Special thanks to Randy Gelhausen and Ofer Mendelevitch for the work and help put into this.
... View more
02-13-2017
08:14 PM
If you have long keys (compared to the values) or many columns, use a prefix encoder. FAST_DIFF is recommended
Sorry, this post is few months old, the above sentence mean it is recommended to use FAST_DIFF over PREFIX (not PREFIX_TREE) right?
... View more