Member since
02-17-2017
71
Posts
17
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1099 | 03-02-2017 04:19 PM | |
20291 | 02-20-2017 10:44 PM | |
11157 | 01-10-2017 06:51 PM |
04-20-2018
08:30 PM
Could be a data skew issue. Checkout if any partition has huge chunk of the data compared to the rest. https://github.com/adnanalvee/spark-assist/blob/master/spark-assist.scala From the link above, copy the function "partitionStats" and pass in your data as a dataframe. It will show the maximum, minimum and average amount of data across your partitions like below. +------+-----+------------------+
|MAX |MIN |AVERAGE |
+------+-----+------------------+
|135695|87694|100338.61149653122|
+------+-----+------------------+
... View more
04-20-2018
08:26 PM
There is a working Scala version tested and used by me. https://community.hortonworks.com/questions/77130/how-to-iterate-multiple-hdfs-files-in-spark-scala.html
... View more
04-13-2018
02:39 PM
1 Kudo
@Simran Kaur How about use hive queries inside Spark. It has a built it catalyst optimizer, give it a shot 🙂 My suggestion: Load it in Spark and Store as Parquet format and then do aggregations on it.
... View more
04-11-2018
04:48 PM
Try this (hcc has buggy formatting issues, ignore that, notice the import and spark val) import org.apache.spark.sql.SparkSession
objectTestCode{
def main(args:Array[String]):Unit={
val spark = SparkSession
.builder()
.appName("SilverTailParser")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.getOrCreate()
/*
sample value but it replace with business logic. and try to push into table.
for loop consider as business logic.
*/
for(i <-0 to 100-1){
var fstring ="fstring"+ i
var cmd ="cmd"+ i
var idpath ="idpath"+ i
import spark.implicits._ // NOTE
val sDF =Seq((fstring, cmd, idpath)).toDF("t_als_s_path","t_als_s_cmd","t_als_s_pd")
sDF.write.insertInto("l_sequence");
println("write data ==> "+ i)}}
... View more
04-11-2018
04:36 PM
1. Import SparkSession, SparkContext is deprecated import org.apache.spark.sql.SparkSession
2. Check you have hive-site.xml in the "/usr/lib/spark/conf" directory You can try adding this as well with the spark-submit --files /usr/hdp/current/spark-client/conf/hive-site.xml
... View more
04-11-2018
02:58 PM
val data = sc.textFile("/sample.txt")
val word_count = sc.accumulator(0L, "Total Words")
data.foreach { line =>
val fields = line.toString.split("\t")
println(fields(0).toString)
fields.foreach{
x => word_count +=1
}
}
println(word_count)
... View more
01-17-2018
04:07 PM
Why are you using 10g of driver memory? What is the size of your dataset and how many partitions does it have? I would suggest using the config below: --executor-memory 32G \ --num-executors 20 \ --driver-memory 4g \ --executor-cores 3 \ --conf spark.driver.maxResultSize=3g \
... View more
10-03-2017
05:38 PM
@Marcos Da Silva This should solve the problem as it did for mine. select column1,column2 from table where partition_column in
(select max(distinct partition_column) from table)"
... View more
07-14-2017
03:46 PM
NOTES: Tried different no. executors from 10-60 but performance doesn't improve. Saving in Parquet format saves 1 minute but I dont want parquet.
... View more
07-13-2017
10:51 PM
I was planning to avoid broadcast thats why I asked it. Thanks!
... View more
07-13-2017
10:49 PM
I am looping over a dataset of 1000 partitions and running operation as I go. I'm using Spark 2.0 and doing an expensive join for each of the partitions. The join takes less than a second when I call .show but when I try to save the data which is around 59 million, it takes 5 minutes.(tried reparitioning too) 5 minutes * 1000 partitions is 5000 minutes. I cannot wait that long. Any idea on optimizing the saveAsText file performance?
... View more
Labels:
07-10-2017
05:43 AM
I have two dataframes. One with 131 rows and another with 54 million rows. I need join the first one with the second and thereby generating 6 billion rows.
Its taking forever even doing a broadcast hash join along with executor/memory tuning trials.
I need help with the syntax.
... View more
Labels:
04-25-2017
02:56 PM
Thanks a lot!
... View more
04-14-2017
09:08 PM
@rahul gulati This is how I did mine, val outer_join = a.join(b, df1("id") === df2("id"), "left_outer")
... View more
04-11-2017
07:58 PM
2 Kudos
Does Hortonworks have plans for introducing a Big Data architect certification similar to IBM?
... View more
04-04-2017
03:49 PM
1 Kudo
If you are running on cluster mode, you need to set the number of executors while submitting the JAR or you can manually enter it in the code. The former way is better spark-submit \
--master yarn-cluster \
--class com.yourCompany.code \
--executor-memory 32G \
--num-executors 5 \
--driver-memory 4g \
--executor-cores 3 \
--queue parsons \
YourJARfile.jar \
If running locally, spark-shell --master yarn --num-executors 6 --driver-memory 5g --executor-memory 7g
... View more
04-04-2017
03:05 PM
I think you are comparing apples and oranges. RDDs are a container of instructions on how to materialize big (arrays
of) distributed data, and how to split it into partitions so Spark using its executors can hold some of them. While executors are merely containers on YARN. When running Spark on YARN, each Spark executor runs as a YARN
container. Where MapReduce schedules a container and fires up a JVM for
each task, Spark hosts multiple tasks within the same container.
... View more
03-29-2017
02:32 PM
Is there a way to run a pig udf in parallel across the cluster? So far in yarn I'm seeing only one container being used. I'm running Pig on tez with a Java UDF doing some heavy weight lifting. The tuple I'm passing to the UDF is a grouped bag.
... View more
Labels:
03-27-2017
05:11 PM
1 Kudo
@Dinesh Das Coursera has a popular one. https://www.coursera.org/specializations/scala
... View more
03-13-2017
05:35 PM
this is just a suggestion but have you tried running on Hive on Tez? Its a much faster and efficient execution engine. Try this before you execute your code. set hive.execution.engine=tez;
... View more
03-10-2017
05:41 PM
I don't know of any other way of comparing two dataframe other than joining first. Here is your action item. 1. Join tab1 and tab2 using broadcast hash by column key "locaton". 2. Filter column by pole = N Here is my code from your sample data. Paste it in a spark shell and see.
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._
case class tab1x(id:Int,phno:Long,location:String,address:String,name:String,temp:String)
case class tab2x(id:Int,phno:Long,location:String,pole:String)
val tab1 = Seq(
tab1x(1,656,"IND","Street no1","X","30F"),
tab1x(2,657,"USA","RHGD no 23","Y","23F"),
tab1x(3,658,"RUS","YWKY 58","Z", "20F")
).toDF
val tab2 = Seq(
tab2x(1,656,"IND","S"),
tab2x(2,657,"USA","N"),
tab2x(3,658,"RUS","N")
).toDF
val joined_df = tab1.join(broadcast(tab2), "location")
val z = joined_df.filter($"pole" === "N")
z.show()
... View more
03-09-2017
05:26 PM
hi @Evan Willett The official Spark Documentation says this: The only reason Kryo is not the default is because of the custom
registration requirement, but we recommend trying it in any
network-intensive application.
Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs
with simple types, arrays of simple types, or string type Link: http://spark.apache.org/docs/latest/tuning.html#data-serialization
... View more
03-09-2017
04:28 PM
Wow. ORC got me from going 3TB(PigStorage) to 60 gb. This is insane. I didn't notice any performance improvement though. But I am happy with savings in storage. Thanks! 🙂
... View more
03-08-2017
06:11 PM
what is the error you are getting while trying to use it then? This is what I used in Spark 1.6.1 import org.apache.spark.sql.functions.broadcast
val joined_df = df1.join(broadcast(df2), "key")
... View more
03-08-2017
05:42 PM
2 Kudos
Hi @X Long The official documentation does include it http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables Here is one tutorial using spark 2 https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html
... View more
03-08-2017
04:00 PM
1 Kudo
Hi @PJ I faced this vertex failure issues while using tez. It usually happens when you have a huge workload, like doing expensive joins one after another and running out of memory etc. One quick way to fix is saving intermediate data to disk and then load the data again and continue processing. For example if you have 4 joins where you can't just optimize it anyhow, save the dataset after 2 joins and then load it for further wrangling. It really depends on the size of data and what you are trying to do. So test it out and play with this method. Works perfectly for me. 2nd way is to increase the memory. But I'm locked on my side by system admin to do it. You can try it out. Here is more info on that: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.3.6/bk_installing_manually_book/content/ref-ffec9e6b-41f4-47de-b5cd-1403b4c4a7c8.1.html
... View more
03-07-2017
06:25 PM
oh! that worked. Thanks a lot!
... View more
03-07-2017
04:51 PM
I am trying to run some spark streaming examples online. But even before I start, I'm getting this error Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) I tried this below but doesn't help. conf.set("spark.driver.allowMultipleContexts","true"); Sample code I was trying to run in HDP 2.5 import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
... View more
Labels:
03-07-2017
04:27 PM
Hi @soumyabrata kole Here is one from my own blog. Wrote this a while back Check this example out
... View more