Member since
10-12-2015
63
Posts
56
Kudos Received
13
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
24599 | 02-10-2017 12:35 AM | |
1805 | 02-09-2017 11:00 PM | |
1177 | 02-08-2017 04:48 PM | |
2884 | 01-23-2017 03:11 PM | |
4713 | 11-22-2016 07:33 PM |
02-10-2017
12:39 AM
Dinesh, I recreated your scenario (in spark-shell) and its working fine for me. Are you running this in Zeppelin by chance? I've noticed sometimes in Zeppelin, it doesnt create the hive context correctly, so what you can do to make sure you're doing it correctly is run the following code. val sqlContext = New HiveContext(sc)
//your code here
What will happen is we'll create a new HiveContext, and it should fix your problem. I think we're losing the pointer to your sqlContext for some reason, so by recreating, we'll guarantee that the temptable is registered to a sqlContext, and the same sqlContext is going to be queried. Let me know if that fixes your problem, or if you have more issues.
... View more
02-10-2017
12:35 AM
Definitely possible! Here is some sample code: gsam.join(input_file, (gsam("CCKT_NO")===input_file("ckt_id")) && (gsam("SEV_LVL") === 3)), "inner")
Notice the double && sign. You can put as many conditions as you'd like in.
... View more
02-09-2017
11:00 PM
1 Kudo
You need to enable checkpointing. It will catalog the last read kafka offset and start reading from there. Heres the code you need to accomplish that: def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
val ssc = new StreamingContext(sc,Seconds(duration))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
... View more
02-08-2017
04:48 PM
1 Kudo
There is a JDBC RDD function: newJdbcRDD(sc: SparkContext, getConnection: () ⇒ Connection, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) ⇒ T = JdbcRDD.resultSetToObjectArray)(implicit arg0: ClassTag[T])
... View more
01-23-2017
03:11 PM
1 Kudo
Use the mapValues api. I made an example of doing what you wanted below. You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want. val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
val b = a.toArray
var indx = 2
val lb = new ListBuffer[(Int, Int)]
for(i <- 0 to b.size-1) {
lb.append((b(i), indx))
indx += 1
}
lb.toArray
}).collectAsMap()
indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] =
Map(
2 -> Array((3,2), (4,3)),
1 -> Array((2,2), (3,3))
)
... View more
11-30-2016
11:25 PM
I've seen this issue quite often when folks are first setting up their cluster. Make sure you have node managers on all of your data nodes. In addition check the YARN configuration for MAX container size. If it is less than what you are requesting, resources will never get allocated. And finally, check the default number of executors. Try specifying --num-executors 2. If you request more resources than your cluster has (more vcores or ram), then you will get this issue.
... View more
11-22-2016
07:33 PM
2 Kudos
Too add to what @Scott Shaw said, the biggest thing we'd be looking for initially is data skew. So we can take a look at a couple things to help determine this. The first is to take a look at the input size. With input size, we can completely ignore the min, and take a look at the 25, median and 75th percentiles. We see that in your job the are fairly close together, and we also the see the max is never dramatically more than the median. If we saw the max and 75% percentile were very large, we would definitely see data skew. Another indicator of data skew is the task duration. Again ignore the minimum, we're definitely going to inevitably get a small partition due to one reason or another. Focus on the 25th median 75th and max. In a perfect world the seperation between the 4 would be a tiny amount. So seeing 6s, 10s, 11s, 17s, they may seem like significantly different but theyre actually relatively close. The only time we would have a cause for concern would be when the 75% and max are quite a bit greater then 25% and median. When I saw significant, I'm talking about most tasks take ~30s and the max taking 10 mins. That would be a clear indicator of data skew.
... View more
11-22-2016
07:24 PM
1 Kudo
Sorry for the late response. I'm not sure of a way to broadcast the data without collecting to the data driver first. Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly. You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways. The join(broadcast(right),) is giving spark a hint to do a broadcast join. So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default. Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception. The data structure of the blocks are capped at 2gb. This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them. So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run. The general Spark Core broadcast function will still work. In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api. The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it.
... View more
11-01-2016
02:58 AM
I'm not sure the exact problem but have a couple of ideas. When it works in the spark-shell, how are you starting up the session?
... View more
10-31-2016
04:14 AM
1 Kudo
Make sure you add the jar to your class path and include it when you run the application. sc.addJar("yourDriver.jar")
val jdbcDF = sqlContext.load("jdbc", Map(
"url" -> "jdbc:teradata://<server_name>, TMODE=TERA, user=my_user, password=*****",
"dbtable" -> "schema.table_name",
"driver" -> "com.teradata.jdbc.TeraDriver"))
... View more