Member since
10-12-2015
63
Posts
56
Kudos Received
13
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
24489 | 02-10-2017 12:35 AM | |
1774 | 02-09-2017 11:00 PM | |
1153 | 02-08-2017 04:48 PM | |
2813 | 01-23-2017 03:11 PM | |
4640 | 11-22-2016 07:33 PM |
07-23-2016
11:05 PM
1 Kudo
Use a broadcast variable for the smaller table, to join it to the larger table. This will implement a broadcast join, the same as a mapside join and save you quite a bit of network IO and time.
... View more
07-01-2016
02:19 PM
11 Kudos
Spark is notoriously knobby
when it comes to tuning applications and requesting resources. While this
added freedom gives us an incredible amount of control and flexibility when
running our applications, it also gives us an opportunity to make a lot of mistakes.
This guide is for Spark 1.5 and earlier.
Spark 1.6 introduces a new setting that allows for a less rigid mapping
of the memory areas. There are a lot of knobs in
spark to tune, knowing which ones do what will allow you to maximize your memory
usage, and understand why you’re doing what you're doing. Most people think when requesting 10g for a worker with Spark, your application has 10g of memory for caching, but its actually quite more complex than that. A Spark executor has several memory areas allocated, and knowing what these areas are for will help understand the best allocation for resources. An executor has two regions, room for caching, and room for aggregations pre-shuffle. Depending on the workload, we can optimize these to use a minimal amount of resources when running our application. Lets take a look at a picture that at a high level sums up our executor. When we request an executor of 10gb to YARN will allocate a JVM on a worker node that looks like something to the left. So the first thing that sticks out is that we 10gb JVM, but Spark has a "safety" region. This is to help prevent OOMs. This number is 90% of total executor memory and is controlled internally and not exposed to the developers. The configuration is named: spark.storage.safetyFraction. So our 10gb executor has 9gb of usable space. The next thing we see is two distinct boxes: shuffle and storage. These are percentages of the total safety memory. They are controlled by two configs: spark.storage.memoryFraction and spark.shuffle.memoryFraction which are by default 60% and 20%. So with a 10gb executor, we have 90%*60% or 5.4gb for "storage." That means each 10gb executor has 5.4 gb set aside for caching data. If the data doesn't fit entirely into memory, depending on the storage level, it may spill to disk or only cache what it can, this will be discussed later on. There is another internal config named spark.shuffle.safetyFraction which is by default 80% or 80% of the JVM heap. This is to help avoid OOM. The area labeled "shuffle" is used for pre-reduce aggregations. An example of using this area is when a reduceByKey happens, the reduce happens locally first and the shuffle memory is used for storing the results. If the results of the local reduce by key don't fit into this area, spark will start spilling to disk. In this example, we have 80%*20% or 1.6gb per executor for pre-reduce aggregations. Now that we know what our memory is allocated for, before we can talk about sizing the executors we need to talk a little bit about caching data. In spark, when the developer calls .persist or .cache on an rdd, the next action will cause the data to be persisted or cached in memory. rdd.cache() is the equivalent of rdd.persist(storageLevel.MEMORY_ONLY). There are several storage levels to choose from, and this will decide what we do if the rdd doesn't fit into memory entirely. When choosing different storage levels, they have different implications. When working with Spark Scala, and we decide to persist an rdd, we have several choices of how to cache our data. MEMORY_ONLY -> This stores our data as java objects. Its not very space efficient as the size of the java object for an int or sting has a lot of extra packing, but this is the fastest. If your data can be completely cached with MEMORY_ONLY, choose that and move on. MEMORY_ONLY_SER -> This stores data in a serialized format. This allows us to fit substantially more data into memory, but there is a small expense in the fact that data must be serialized/deserialized when used, so this is a bit more CPU intensive. This is a typically a very good choice for caching data MEMORY_AND_DISK_SER -> This stores data in a serialized format. This has the same advantages as the previous one, but anything that doesn't fit in memory will be spilled to local disk in a serialized format. This is a good choice if there has been heavy processing, or any joins/reduces prior to caching.
**NOTE, the default serializer in Spark is the java serializer. Its not great. I would highly suggest using the Kryo Serializer for all spark applications.** There are several more, but the above three can fit most use cases. OFF_HEAP storage is becoming popular, and is looking like it will be the default in Spark 2.0, but as of 1.5 it is still experimental. The only way to tell how much memory your cached data is taking up is in the Storage tab in the Spark UI. The Spark UI will be very important to us when trying to accurately size executors. Now onto the fun part of the article, sizing executors. Unfortunately there is no blanket statement that covers all workloads. Let's start by some general rules of thumbs The JVM heap should be no larger than 64gb. 40 gb seems to be the sweet spot for max size. This is due to garbage collection. If the executor gets too big, GC will kill your application performance. New GC methods are available, like G1C1, but this guide is assuming we're using the legacy method. The executor should use no more than 4 cores. Through trial and error, it seems we get full return on investment with 4 cores, so 4 cores is the equivalent of to 400% more throughput if 1 core was the equivalent of 100% throughput. At 5 cores, the return on investment seems to be about 430-450%, so it doesn't make sense to add that extra core. Fewer large executors are generally better than many small executors. The more executors we have, the more we need to pass data around the network. There is one caveat to this, it is sometimes hard to get 4 cores and 40gb of resources from YARN on a single node, so if the cluster is highly utilized, it may take longer for the job to start.
The rules of thumb are good for executor sizing, but the actual work load should drive the resource requesting. There are several types of workloads, and lets break them down to ETL, Data Analysis with caching, and Machine Learning. ETL. In ETL there are typically limited reduces and many jobs are map only jobs. Because of this, it is actually much better to request many smaller executors to help with data locality. In addition, in ETL caching data doesn't happen very often so we need to remove the storage aspect of the executors entirely. A good starting point for requesting resources on ETL is to request many 4gb executors with 2 cores, and set the spark.storage.memoryFraction to 0 and the spark.shuffle.memoryFraction to 1. This will allow us to leverage the executor to do only what we need, and thats space for joins and reduces. In addition, we care about the total size of the data set involved. A good starting point is 1gb of ram per gb of data on disk. A common spark-submit string may look something like the below for a 40gb dataset spark-sumbit --class MyClass --executor-memory 4g --executor-cores 2 --num-executors 10 --conf spark.storage.memoryFraction=0 --conf spark.shuffle.memoryFraction=1 /path/to/app.jar Data Analysis with Caching. In data analysis, the same datasets are often queried several times. In this case, we want to take advantage of caching and limiting HDFS reads. Data analysis with caching leans more on our rules of thumb. A good starting point for number of executors would be 2gb ram per gb of dataset. A sample spark-submit string may look like the below with a 40gb dataset
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 /path/to/app.jar Machine Learning can actually leverage the same idea as data analysis with caching with one small exception. When building models, the application driver has more pressure as it does some work when building the model. Adding more memory/cores to the driver will help the application run smoother. Also, the driver memory is only required if you're receiving an OOM error. A sample spark-submit string may look like the below for a 40gb dataset
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 --conf spark.driver.memory=2g --conf spark.driver.cores=2 /path/to/app.jar Now all of this is general starting points. The most important part is after we start our job, is to go to the Spark UI and monitor memory usage. The storage tab will be instrumental when trying to determine the amount of resources your application is using. To show an example, I'm going to start a spark-shell using the following command: spark-shell --master yarn-client --executor-memory 1g --num-executors 2 The first thing we notice, is that each executor has Storage Memory of 530mb, even though I requested 1gb. If we do the math 1gb * .9 (safety) * .6 (storage) we get 540mb, which is pretty close to 530mb. The storage memory and shuffle write are the two things we need to monitor the closest when first tuning application. The shuffle write corresponds to amount of data that was spilled to disk prior to a shuffle operation. The storage memory is the amount of memory being used/available on each executor for caching. These two columns should help us decide if we have too much executor or too little. If we have larges amount of shuffle write, we should probably increase the spark.shuffle.memoryFraction, to reduce the intermediate writes to disk. If we're using all of our cache, we might want to increase the size of the executor. This should give you a good glimpse into memory management with spark, and how to initially size executors and further calibration. One last thing to note spark.storage.memoryFraction is 0.6 default which corresponds to Old Gen, never make this number larger unless you increase the size of old gen.
... View more
Labels:
06-30-2016
09:36 PM
2 Kudos
Spark uses and optimizer under the hood, so there will be no performance difference.
... View more
06-05-2016
11:55 PM
The attributes are stored in the Hive metastore and there is a query to it to get table/file information. Pinging the hive metastore isn't really an expensive operation. In fact, when you write an application, when you create a Dataframe from a hive table, as soon as you create it, it pings the metastore, not when an action is run on a dataframe. We only need to ping the metastore the first time we create the dataframes, not for every subsequent query on the dataframe. The second part of your question regarding predicate pushdown and partition pruning. What that means is that data can be partitioned in Hive by a field. So lets say we have a file full of information about customers and we want to create a hive table. Say we create a table and partition that table by state the customer is located. Hive is going to put all customers from a particular in a single directory, so the structure looks like something like this .../database_name/my_table
.../database_name/cust/STATE=AK
.../database_name/cust/STATE=AL
....
.../database_name/my_table/STATE=WY Hive will put all customers with State of Alaska in the AK directory. Now that we know how the data is divided up, Spark will leverage that. So if you wrote a query like df = hc.sql("Select * from cust where state=AK") Spark will only read from the directory database_name/cust/State=AK. This saves a significant amount of time, as we only have to query a smaller set of the data. Catalyst takes the semantics of the query (IE the select statement), combined with the information in the metastore (the underlying data is partitioned by state), to tell the application to only read data in the database_name/cust/State=AK directory.
... View more
05-23-2016
06:33 PM
2 Kudos
Long story short, you don't have enough RAM. The minimum recommendation is 8gb of RAM. Try running it with 4gb of RAM. I've gotten older version of HDP to start up with 4gb of RAM, though it was incredibly slow. If you can get it started with 4gb of RAM, you can navigate to Ambari and turn off alot of tools you don't want to use, and that should make the user experience a little better. One other possible solution is if you are using a Windows machine, ensure that you have enabled your BIOS settings for virtualization. In order to do so, you will need to restart your computer and bring up the BIOS settings before the OS finishes loading (I think you need to hold f10, or f12 on start up). In the bios, look for device configuration. In that folder there should be a setting with the name like enable virtualization.
... View more
05-17-2016
06:36 PM
1 Kudo
The input path you put corresponds to the location in the HDFS "/tmp/rawpanda.json". If the file is actually sitting on your local filesystem, you should use the following instead "file:///tmp/rawpanda.json" Also, one gotcha with reading JSON files using Spark is that the entire record needs to be on a single line (instead of the pretty exploded view) for the JSONreader to successfully parse the JSON record. You can test if the JSON record is being read correctly by running the following bit of code: df2.show(1) If there is something like _corrupt in the first column, then the records are most likely not formed correctly.
... View more
05-09-2016
04:19 PM
Good questions. The ".sql" before the query is the transformation in this case. "Select * from customers" is telling the .sql transformation what to do. You can definitely include more advanced queries in the .sql transformation. Spark will parse the sql query and convert it back to Spark code, so all data will still be processed with Spark.
... View more
05-09-2016
12:33 AM
2 Kudos
1. query is going to be a dataframe (an RDD with some schema attached to it). 2. Once the dataframe is created, you can apply RDD transformations like map, or you can apply dataframe operations to it, like query.select("name") if name was a column in the original table. When you create a dataframe from a Hive table, its using the metastore attached to hive to get information about the file, like location, format. Spark doesn't actually use hive for any processing. The sql query passed in is actually processed (through an optimizer) and turned into Spark code. When working dataframes, you can transform the data if you'd like, it just like any other RDD with the exception is has a schema attached to it. The action in your care is indeed collect(), though show() is the dataframe action which will give you a much nicer table to look at. 3. Spark SQL + Hive still has the concept of lazy evaluation. The lazy evaluation just means that Spark keeps track of all the transformations you did, and reads/processes data through those transformations when you call an action.
... View more
04-28-2016
06:00 PM
1 Kudo
The foreach action in Spark is designed like a forced map (so the "map" action occurs on the executors). Foreach is useful for a couple of operations in Spark. They are required to be used when you want to guarantee an accumulator's value to be correct. In addition, they can be used when you want to move data to an external system, like a database, though typically a foreachPartition is used for that operation.
... View more
04-01-2016
05:15 PM
2 Kudos
I think your scala versions are clashing with your databricks spark csv package. You're most likely running scala 2.10, try adding updating your artifact thats being used to build your application to: groupId: com.databricks artifactId: spark-csv_2.10 version: 1.4.0
... View more
- « Previous
- Next »