Member since
10-12-2015
63
Posts
56
Kudos Received
13
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
19465 | 02-10-2017 12:35 AM | |
741 | 02-09-2017 11:00 PM | |
478 | 02-08-2017 04:48 PM | |
1515 | 01-23-2017 03:11 PM | |
2883 | 11-22-2016 07:33 PM |
03-06-2017
06:29 PM
1 Kudo
Theres not really an rdd equivalent like "foreachRDD" in core spark but there is a foreach partition that allows you to apply function foreach partition in the RDD. In spark streaming there is a foreachRDD which allows you to apply a function to each "RDD" in the DStream. //Core Spark
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
//Spark-Streaming
foreachRDD(foreachFunc: (RDD[T]) ⇒ Unit): Unit
... View more
02-10-2017
12:39 AM
Dinesh, I recreated your scenario (in spark-shell) and its working fine for me. Are you running this in Zeppelin by chance? I've noticed sometimes in Zeppelin, it doesnt create the hive context correctly, so what you can do to make sure you're doing it correctly is run the following code. val sqlContext = New HiveContext(sc)
//your code here
What will happen is we'll create a new HiveContext, and it should fix your problem. I think we're losing the pointer to your sqlContext for some reason, so by recreating, we'll guarantee that the temptable is registered to a sqlContext, and the same sqlContext is going to be queried. Let me know if that fixes your problem, or if you have more issues.
... View more
02-10-2017
12:35 AM
Definitely possible! Here is some sample code: gsam.join(input_file, (gsam("CCKT_NO")===input_file("ckt_id")) && (gsam("SEV_LVL") === 3)), "inner")
Notice the double && sign. You can put as many conditions as you'd like in.
... View more
02-09-2017
11:00 PM
1 Kudo
You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that: def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
val ssc = new StreamingContext(sc,Seconds(duration))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
... View more
02-08-2017
04:48 PM
1 Kudo
There is a JDBC RDD function: newJdbcRDD(sc: SparkContext, getConnection: () ⇒ Connection, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) ⇒ T = JdbcRDD.resultSetToObjectArray)(implicit arg0: ClassTag[T])
... View more
01-23-2017
03:11 PM
1 Kudo
Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want. val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
val b = a.toArray
var indx = 2
val lb = new ListBuffer[(Int, Int)]
for(i <- 0 to b.size-1) {
lb.append((b(i), indx))
indx += 1
}
lb.toArray
}).collectAsMap()
indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] =
Map(
2 -> Array((3,2), (4,3)),
1 -> Array((2,2), (3,3))
)
... View more
11-30-2016
11:25 PM
I've seen this issue quite often when folks are first setting up their cluster. Make sure you have node managers on all of your data nodes. In addition check the YARN configuration for MAX container size. If it is less than what you are requesting, resources will never get allocated. And finally, check the default number of executors. Try specifying --num-executors 2. If you request more resources than your cluster has (more vcores or ram), then you will get this issue.
... View more
11-22-2016
07:33 PM
2 Kudos
Too add to what @Scott Shaw said, the biggest thing we'd be looking for initially is data skew. So we can take a look at a couple things to help determine this. The first is to take a look at the input size. With input size, we can completely ignore the min, and take a look at the 25, median and 75th percentiles. We see that in your job the are fairly close together, and we also the see the max is never dramatically more than the median. If we saw the max and 75% percentile were very large, we would definitely see data skew. Another indicator of data skew is the task duration. Again ignore the minimum, we're definitely going to inevitably get a small partition due to one reason or another. Focus on the 25th median 75th and max. In a perfect world the seperation between the 4 would be a tiny amount. So seeing 6s, 10s, 11s, 17s, they may seem like significantly different but theyre actually relatively close. The only time we would have a cause for concern would be when the 75% and max are quite a bit greater then 25% and median. When I saw significant, I'm talking about most tasks take ~30s and the max taking 10 mins. That would be a clear indicator of data skew.
... View more
11-22-2016
07:24 PM
1 Kudo
Sorry for the late response. I'm not sure of a way to broadcast the data without collecting to the data driver first. Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly. You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways. The join(broadcast(right),) is giving spark a hint to do a broadcast join. So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default. Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception. The data structure of the blocks are capped at 2gb. This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them. So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run. The general Spark Core broadcast function will still work. In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api. The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it.
... View more
11-01-2016
02:58 AM
I'm not sure the exact problem but have a couple of ideas. When it works in the spark-shell, how are you starting up the session?
... View more
10-31-2016
04:14 AM
1 Kudo
Make sure you add the jar to your class path and include it when you run the application. sc.addJar("yourDriver.jar")
val jdbcDF = sqlContext.load("jdbc", Map(
"url" -> "jdbc:teradata://<server_name>, TMODE=TERA, user=my_user, password=*****",
"dbtable" -> "schema.table_name",
"driver" -> "com.teradata.jdbc.TeraDriver"))
... View more
10-27-2016
06:48 PM
1 Kudo
Sounds like a JDBC connection is in order. There is an api for creating a dataframe from jdbc connection. jdbc(url: String, table: String, predicates: Array[String], connectionProperties:Properties): DataFrame The issue with JDBC is reading data from teradata will be much slower compared to HDFS. Is it possible to run a sqoop job to move data to hdfs prior to starting your spark application?
... View more
10-24-2016
10:40 PM
1 Kudo
@Steevan Rodrigues When doing saveAsText file it takes a parameter for setting codec to compress with: rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
... View more
09-13-2016
02:12 PM
Your auto broadcast join is set to 90mb. Run the code below and then check in the spark ui env tab that its getting set correctly. conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*200)
... View more
09-11-2016
03:55 AM
1 Kudo
Broadcast joins are done automatically in Spark. There is a parameter is "spark.sql.autoBroadcastJoinThreshold" which is set to 10mb by default. You should be able to do the join as you would normally and increase the parameter to the size of the smaller dataframe.
... View more
09-05-2016
08:10 PM
The Databricks CSV library skips using Core Spark. The map function in Pyspark is run through a Python subprocess on each executor. When using Spark SQL with Databricks CSV library, everything goes through the catalyst optimizer and the output is java byte code. Scala/Java is about 40% faster than Python when using core Spark. I would guess that is the reason the 2nd implementation is much faster. The CSV library probably is much more efficient at breaking up the records, probably applying the split partition by partition as opposed to record by record.
... View more
08-01-2016
02:15 AM
1 Kudo
If I had to guess your using Spark 1.5.2 or earlier. What is happening is you run out of memory. I think youre running out of executor memory, so you're probably doing a map-side aggregate. How many keys do you have? I think we can fix this pretty simply. Are you caching data? If not set spark.suffle.fraction to a number higher than .4.
... View more
08-01-2016
02:08 AM
If you call groupByKey on a dataframe it implicitly converts the dataframe to an rdd. You lose all benefits of the optimizer for this step.
... View more
07-31-2016
09:29 PM
Agreed 100%. If you can accomplish the same task using reduceByKey, it implements a combiner, so its basically does the aggregate locally, then shuffles the results for each partition. Just keep an eye on GC when doing this.
... View more
07-31-2016
09:27 PM
Use the explain API, you should see something like BroadcastRDD if a broadcast join is happening. Also, make sure you've enabled code generation for spark sql "spark.sql.codegen=true" . Older versions of spark (1.4 and earlier) have it set to false.
... View more
07-31-2016
09:19 PM
3 Kudos
When creating a Kafka receiver, its one receiver per topic partition. You can definitely repartition the data after receiving it from Kafka. This should distribute the data to all of your workers as opposed to only having 5 executors do the work.
... View more
07-31-2016
09:17 PM
2 Kudos
Try setting your join to a broadcast join. By default, the Spark SQL does a broadcast join for tables less than 10mb. I think in this case, it would make a lot of sense to changing the setting "spark.sql.autoBroadCastJoinThreshold" to 250mb. This will do a map side join in terms of mapreduce, and should be much quicker than what you're experiencing. Also, don't worry about having a large amount of tasks. Its very ok to have that many tasks. I've found that any more than 4 cores per executor, you have diminishing returns on performance (IE 4 cores = 400% throughput, 5 cores is ~430% throughput). Another setting you might want to investigate is the spark.sql.shuffle.partitions setting, it is the number of partitions to use when shuffling data for joins, and by default is 200. I think you might want to up that number quite a bit.
... View more
07-23-2016
11:05 PM
1 Kudo
Use a broadcast variable for the smaller table, to join it to the larger table. This will implement a broadcast join, the same as a mapside join and save you quite a bit of network IO and time.
... View more
07-01-2016
02:19 PM
11 Kudos
Spark is notoriously knobby
when it comes to tuning applications and requesting resources. While this
added freedom gives us an incredible amount of control and flexibility when
running our applications, it also gives us an opportunity to make a lot of mistakes.
This guide is for Spark 1.5 and earlier.
Spark 1.6 introduces a new setting that allows for a less rigid mapping
of the memory areas. There are a lot of knobs in
spark to tune, knowing which ones do what will allow you to maximize your memory
usage, and understand why you’re doing what you're doing. Most people think when requesting 10g for a worker with Spark, your application has 10g of memory for caching, but its actually quite more complex than that. A Spark executor has several memory areas allocated, and knowing what these areas are for will help understand the best allocation for resources. An executor has two regions, room for caching, and room for aggregations pre-shuffle. Depending on the workload, we can optimize these to use a minimal amount of resources when running our application. Lets take a look at a picture that at a high level sums up our executor. When we request an executor of 10gb to YARN will allocate a JVM on a worker node that looks like something to the left. So the first thing that sticks out is that we 10gb JVM, but Spark has a "safety" region. This is to help prevent OOMs. This number is 90% of total executor memory and is controlled internally and not exposed to the developers. The configuration is named: spark.storage.safetyFraction. So our 10gb executor has 9gb of usable space. The next thing we see is two distinct boxes: shuffle and storage. These are percentages of the total safety memory. They are controlled by two configs: spark.storage.memoryFraction and spark.shuffle.memoryFraction which are by default 60% and 20%. So with a 10gb executor, we have 90%*60% or 5.4gb for "storage." That means each 10gb executor has 5.4 gb set aside for caching data. If the data doesn't fit entirely into memory, depending on the storage level, it may spill to disk or only cache what it can, this will be discussed later on. There is another internal config named spark.shuffle.safetyFraction which is by default 80% or 80% of the JVM heap. This is to help avoid OOM. The area labeled "shuffle" is used for pre-reduce aggregations. An example of using this area is when a reduceByKey happens, the reduce happens locally first and the shuffle memory is used for storing the results. If the results of the local reduce by key don't fit into this area, spark will start spilling to disk. In this example, we have 80%*20% or 1.6gb per executor for pre-reduce aggregations. Now that we know what our memory is allocated for, before we can talk about sizing the executors we need to talk a little bit about caching data. In spark, when the developer calls .persist or .cache on an rdd, the next action will cause the data to be persisted or cached in memory. rdd.cache() is the equivalent of rdd.persist(storageLevel.MEMORY_ONLY). There are several storage levels to choose from, and this will decide what we do if the rdd doesn't fit into memory entirely. When choosing different storage levels, they have different implications. When working with Spark Scala, and we decide to persist an rdd, we have several choices of how to cache our data. MEMORY_ONLY -> This stores our data as java objects. Its not very space efficient as the size of the java object for an int or sting has a lot of extra packing, but this is the fastest. If your data can be completely cached with MEMORY_ONLY, choose that and move on. MEMORY_ONLY_SER -> This stores data in a serialized format. This allows us to fit substantially more data into memory, but there is a small expense in the fact that data must be serialized/deserialized when used, so this is a bit more CPU intensive. This is a typically a very good choice for caching data MEMORY_AND_DISK_SER -> This stores data in a serialized format. This has the same advantages as the previous one, but anything that doesn't fit in memory will be spilled to local disk in a serialized format. This is a good choice if there has been heavy processing, or any joins/reduces prior to caching.
**NOTE, the default serializer in Spark is the java serializer. Its not great. I would highly suggest using the Kryo Serializer for all spark applications.** There are several more, but the above three can fit most use cases. OFF_HEAP storage is becoming popular, and is looking like it will be the default in Spark 2.0, but as of 1.5 it is still experimental. The only way to tell how much memory your cached data is taking up is in the Storage tab in the Spark UI. The Spark UI will be very important to us when trying to accurately size executors. Now onto the fun part of the article, sizing executors. Unfortunately there is no blanket statement that covers all workloads. Let's start by some general rules of thumbs The JVM heap should be no larger than 64gb. 40 gb seems to be the sweet spot for max size. This is due to garbage collection. If the executor gets too big, GC will kill your application performance. New GC methods are available, like G1C1, but this guide is assuming we're using the legacy method. The executor should use no more than 4 cores. Through trial and error, it seems we get full return on investment with 4 cores, so 4 cores is the equivalent of to 400% more throughput if 1 core was the equivalent of 100% throughput. At 5 cores, the return on investment seems to be about 430-450%, so it doesn't make sense to add that extra core. Fewer large executors are generally better than many small executors. The more executors we have, the more we need to pass data around the network. There is one caveat to this, it is sometimes hard to get 4 cores and 40gb of resources from YARN on a single node, so if the cluster is highly utilized, it may take longer for the job to start.
The rules of thumb are good for executor sizing, but the actual work load should drive the resource requesting. There are several types of workloads, and lets break them down to ETL, Data Analysis with caching, and Machine Learning. ETL. In ETL there are typically limited reduces and many jobs are map only jobs. Because of this, it is actually much better to request many smaller executors to help with data locality. In addition, in ETL caching data doesn't happen very often so we need to remove the storage aspect of the executors entirely. A good starting point for requesting resources on ETL is to request many 4gb executors with 2 cores, and set the spark.storage.memoryFraction to 0 and the spark.shuffle.memoryFraction to 1. This will allow us to leverage the executor to do only what we need, and thats space for joins and reduces. In addition, we care about the total size of the data set involved. A good starting point is 1gb of ram per gb of data on disk. A common spark-submit string may look something like the below for a 40gb dataset spark-sumbit --class MyClass --executor-memory 4g --executor-cores 2 --num-executors 10 --conf spark.storage.memoryFraction=0 --conf spark.shuffle.memoryFraction=1 /path/to/app.jar Data Analysis with Caching. In data analysis, the same datasets are often queried several times. In this case, we want to take advantage of caching and limiting HDFS reads. Data analysis with caching leans more on our rules of thumb. A good starting point for number of executors would be 2gb ram per gb of dataset. A sample spark-submit string may look like the below with a 40gb dataset
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 /path/to/app.jar Machine Learning can actually leverage the same idea as data analysis with caching with one small exception. When building models, the application driver has more pressure as it does some work when building the model. Adding more memory/cores to the driver will help the application run smoother. Also, the driver memory is only required if you're receiving an OOM error. A sample spark-submit string may look like the below for a 40gb dataset
spark-sumbit --class MyClass --executor-memory 40g --executor-cores 4 --num-executors 2 --conf spark.driver.memory=2g --conf spark.driver.cores=2 /path/to/app.jar Now all of this is general starting points. The most important part is after we start our job, is to go to the Spark UI and monitor memory usage. The storage tab will be instrumental when trying to determine the amount of resources your application is using. To show an example, I'm going to start a spark-shell using the following command: spark-shell --master yarn-client --executor-memory 1g --num-executors 2 The first thing we notice, is that each executor has Storage Memory of 530mb, even though I requested 1gb. If we do the math 1gb * .9 (safety) * .6 (storage) we get 540mb, which is pretty close to 530mb. The storage memory and shuffle write are the two things we need to monitor the closest when first tuning application. The shuffle write corresponds to amount of data that was spilled to disk prior to a shuffle operation. The storage memory is the amount of memory being used/available on each executor for caching. These two columns should help us decide if we have too much executor or too little. If we have larges amount of shuffle write, we should probably increase the spark.shuffle.memoryFraction, to reduce the intermediate writes to disk. If we're using all of our cache, we might want to increase the size of the executor. This should give you a good glimpse into memory management with spark, and how to initially size executors and further calibration. One last thing to note spark.storage.memoryFraction is 0.6 default which corresponds to Old Gen, never make this number larger unless you increase the size of old gen.
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- FAQ
- How-ToTutorial
- memory
- Spark
- YARN
Labels:
06-30-2016
09:36 PM
2 Kudos
Spark uses and optimizer under the hood, so there will be no performance difference.
... View more
06-05-2016
11:55 PM
The attributes are stored in the Hive metastore and there is a query to it to get table/file information. Pinging the hive metastore isn't really an expensive operation. In fact, when you write an application, when you create a Dataframe from a hive table, as soon as you create it, it pings the metastore, not when an action is run on a dataframe. We only need to ping the metastore the first time we create the dataframes, not for every subsequent query on the dataframe. The second part of your question regarding predicate pushdown and partition pruning. What that means is that data can be partitioned in Hive by a field. So lets say we have a file full of information about customers and we want to create a hive table. Say we create a table and partition that table by state the customer is located. Hive is going to put all customers from a particular in a single directory, so the structure looks like something like this .../database_name/my_table
.../database_name/cust/STATE=AK
.../database_name/cust/STATE=AL
....
.../database_name/my_table/STATE=WY Hive will put all customers with State of Alaska in the AK directory. Now that we know how the data is divided up, Spark will leverage that. So if you wrote a query like df = hc.sql("Select * from cust where state=AK") Spark will only read from the directory database_name/cust/State=AK. This saves a significant amount of time, as we only have to query a smaller set of the data. Catalyst takes the semantics of the query (IE the select statement), combined with the information in the metastore (the underlying data is partitioned by state), to tell the application to only read data in the database_name/cust/State=AK directory.
... View more
05-23-2016
06:33 PM
2 Kudos
Long story short, you don't have enough RAM. The minimum recommendation is 8gb of RAM. Try running it with 4gb of RAM. I've gotten older version of HDP to start up with 4gb of RAM, though it was incredibly slow. If you can get it started with 4gb of RAM, you can navigate to Ambari and turn off alot of tools you don't want to use, and that should make the user experience a little better. One other possible solution is if you are using a Windows machine, ensure that you have enabled your BIOS settings for virtualization. In order to do so, you will need to restart your computer and bring up the BIOS settings before the OS finishes loading (I think you need to hold f10, or f12 on start up). In the bios, look for device configuration. In that folder there should be a setting with the name like enable virtualization.
... View more
05-17-2016
06:36 PM
1 Kudo
The input path you put corresponds to the location in the HDFS "/tmp/rawpanda.json". If the file is actually sitting on your local filesystem, you should use the following instead "file:///tmp/rawpanda.json" Also, one gotcha with reading JSON files using Spark is that the entire record needs to be on a single line (instead of the pretty exploded view) for the JSONreader to successfully parse the JSON record. You can test if the JSON record is being read correctly by running the following bit of code: df2.show(1) If there is something like _corrupt in the first column, then the records are most likely not formed correctly.
... View more
05-09-2016
04:19 PM
Good questions. The ".sql" before the query is the transformation in this case. "Select * from customers" is telling the .sql transformation what to do. You can definitely include more advanced queries in the .sql transformation. Spark will parse the sql query and convert it back to Spark code, so all data will still be processed with Spark.
... View more