Member since 
    
	
		
		
		10-12-2015
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                63
            
            
                Posts
            
        
                56
            
            
                Kudos Received
            
        
                13
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 25643 | 02-10-2017 12:35 AM | |
| 2358 | 02-09-2017 11:00 PM | |
| 1546 | 02-08-2017 04:48 PM | |
| 3581 | 01-23-2017 03:11 PM | |
| 5723 | 11-22-2016 07:33 PM | 
			
    
	
		
		
		02-10-2017
	
		
		12:39 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Dinesh,  I recreated your scenario (in spark-shell) and its working fine for me.  Are you running this in Zeppelin by chance?  I've noticed sometimes in Zeppelin, it doesnt create the hive context correctly, so what you can do to make sure you're doing it correctly is run the following code.  val sqlContext = New HiveContext(sc)
//your code here
  What will happen is we'll create a new HiveContext, and it should fix your problem.  I think we're losing the pointer to your sqlContext for some reason, so by recreating, we'll guarantee that the temptable is registered to a sqlContext, and the same sqlContext is going to be queried.  Let me know if that fixes your problem, or if you have more issues. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-10-2017
	
		
		12:35 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Definitely possible!  Here is some sample code:  gsam.join(input_file, (gsam("CCKT_NO")===input_file("ckt_id")) && (gsam("SEV_LVL") === 3)), "inner")
  Notice the double && sign.  You can put as many conditions as you'd like in. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-09-2017
	
		
		11:00 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 You need to enable checkpointing.  It will catalog the last read kafka offset and start reading from there.  Heres the code you need to accomplish that:  def createContext(sc: SparkContext, checkpointDir: String, duration: Int): StreamingContext = {
  val ssc = new StreamingContext(sc,Seconds(duration))
  ssc.checkpoint(checkpointDir)
  ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, createContext(sc, checkpointDir, duration))
KafkaUtils.createDirectStream(ssc,kafkaParams,topics)
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-08-2017
	
		
		04:48 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 There is a JDBC RDD function:  newJdbcRDD(sc: SparkContext, getConnection: () ⇒ Connection, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) ⇒ T = JdbcRDD.resultSetToObjectArray)(implicit arg0: ClassTag[T]) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		01-23-2017
	
		
		03:11 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Use the mapValues api.  I made an example of doing what you wanted below.  You'll have to update the listbuffer to use the types you have, but it looks like its doing what you want.  val rdd1 = sc.parallelize(Array((1,2),(2,3),(1,3),(2,4)))
val gRdd = rdd1.groupByKey()
val indxRdd = gRdd.mapValues(a => {
  val b = a.toArray
  var indx = 2
  val lb = new ListBuffer[(Int, Int)]
  for(i <- 0 to b.size-1) {
    lb.append((b(i), indx))
    indx += 1
  }
  lb.toArray
}).collectAsMap()
  indxRdd.collectAsMap()
res8: scala.collection.Map[Int,Array[(Int, Int)]] = 
Map(
	2 -> Array((3,2), (4,3)), 
	1 -> Array((2,2), (3,3))
) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-30-2016
	
		
		11:25 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I've seen this issue quite often when folks are first setting up their cluster.  Make sure you have node managers on all of your data nodes.  In addition check the YARN configuration for MAX container size.  If it is less than what you are requesting, resources will never get allocated.  And finally, check the default number of executors.  Try specifying --num-executors 2.  If you request more resources than your cluster has (more vcores or ram), then you will get this issue. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-22-2016
	
		
		07:33 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		2 Kudos
		
	
				
		
	
		
					
							 Too add to what @Scott Shaw said, the biggest thing we'd be looking for initially is data skew.  So we can take a look at a couple things to help determine this.  The first is to take a look at the input size.  With input size, we can completely ignore the min, and take a look at the 25, median and 75th percentiles.  We see that in your job the are fairly close together, and we also the see the max is never dramatically more than the median.  If we saw the max and 75% percentile were very large, we would definitely see data skew.  Another indicator of data skew is the task duration. Again ignore the minimum, we're definitely going to inevitably get a small partition due to one reason or another.  Focus on the 25th median 75th and max.  In a perfect world the seperation between the 4 would be a tiny amount.  So seeing 6s, 10s, 11s, 17s, they may seem like significantly different but theyre actually relatively close.  The only time we would have a cause for concern would be when the 75% and max are quite a bit greater then 25% and median.  When I saw significant, I'm talking about most tasks take ~30s and the max taking 10 mins.  That would be a clear indicator of data skew. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-22-2016
	
		
		07:24 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Sorry for the late response.  I'm not sure of a way to broadcast the data without collecting to the data driver first.  Because we assume the size of hte broadcasted table is small, the collect and broadcast to and from the driver should be fairly quickly.  You would have about the same network traffic if you were to someway skip the collect as we need a full copy of the smaller table on each worker anyways.  The join(broadcast(right),) is giving spark a hint to do a broadcast join.  So this will override the spark.sql.autoBroadcastJoinThreshold, which is 10mb by default.  Don't try to broadcast anything larger than 2gb, as this is the limit for a single block in Spark and you will get an OOM or Overflow exception.  The data structure of the blocks are capped at 2gb.  This autoBroadcastJoinThreshold only applies to hive tables right now that have statistics previously ran on them.  So the broadcast hint is going to be used for dataframes not in Hive or one where statistics haven't been run.  The general Spark Core broadcast function will still work.  In fact, underneath the hood, the dataframe is calling the same collect and broadcast that you would with the general api.  The concept of partitions is still there, so after you do a broadcast join, you're free to run mapPartitions on it. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		11-01-2016
	
		
		02:58 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I'm not sure the exact problem but have a couple of ideas.  When it works in the spark-shell, how are you starting up the session? 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		10-31-2016
	
		
		04:14 AM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Make sure you add the jar to your class path and include it when you run the application.  sc.addJar("yourDriver.jar")
val jdbcDF = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:teradata://<server_name>, TMODE=TERA, user=my_user, password=*****",
  "dbtable" -> "schema.table_name",
  "driver" -> "com.teradata.jdbc.TeraDriver")) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		 
         
					
				













