Member since 
    
	
		
		
		02-17-2017
	
	
	
	
	
	
	
	
	
	
	
	
	
	
			
      
                71
            
            
                Posts
            
        
                17
            
            
                Kudos Received
            
        
                3
            
            
                Solutions
            
        My Accepted Solutions
| Title | Views | Posted | 
|---|---|---|
| 5622 | 03-02-2017 04:19 PM | |
| 34027 | 02-20-2017 10:44 PM | |
| 20670 | 01-10-2017 06:51 PM | 
			
    
	
		
		
		02-20-2017
	
		
		10:44 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Hi @Dinesh Das
the following code is tested on spark-shell with scala and works perfectly with psv and csv data.  the following are the datasets I used from the same directory   /data/dev/spark  file1.csv
  1,2,3 
x,y,z
a,b,c  file2.psv
  q|w|e
1|2|3  To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell)      import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.sql.functions.broadcast
    import org.apache.spark.sql.types._
    import org.apache.spark.sql._
    import org.apache.spark.sql.functions._
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
    // --EDIT YOUR SCHEMA HERE
    case class refLineID(
      attr1:String,
      attr2:String,
      attr3:String
    )
    import  org.apache.hadoop.fs.{FileSystem,Path}
    val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark"))
    // function to check delimiter of each file
    def checkDelim(file:String): String ={
      val x = sc.textFile(file);
      val grab_x = x.take(1) // grab the first row to check delimiter
      val str = grab_x.mkString("")
      val pipe = "\\|"
      val comma = "\\,"
      var delim = ""
      for (c <- str) {
        if (c == ',') {
          delim = comma
        } else if (c == '|') {
          delim = pipe
        }
      }
      return delim
    }
    // -- Function to convert RDD to dataframe after checking delimiter
    def convertToDF(file: String) = {
     var delim = ""
     delim = checkDelim(file) // grab the delimiter by calling function
     val x = sc.textFile(file);
     // pass the file and delimiter type to transform to dataframe
     val x_df = x.map(_.split(delim))
                 .map(a => refLineID(
                    a(0).toString,
                    a(1).toString,
                    a(2).toString
                  )).toDF
    x_df.show()
  }
  // -- Loop through each file and call the function 'convertToDF'
   files.foreach(filename => {
               val a = filename.getPath.toString()
               convertToDF(a)
             })
  Note:   I'm using Spark 1.6 and scala.   I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory.  "convertToDataframe" function then knows how to split the rows and converts the data into a dataframe.  Pretty simple! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-17-2017
	
		
		06:48 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 @Joe Widen @Timothy Spann Why did I get a down vote here? My code is working!!! Nothing against you but just want to know if you could figure out the reason! Thanks! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		02-08-2017
	
		
		05:15 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 You can use dataframe. Convert the text file to a dataframe like the code below and do a join to start comparing.  sc.setLogLevel("WARN")
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
case class d1(
  ckt_id:String,
  location:String,
  usage:String,
  port:String,
  machine:String
)
val f2 = sc.textFile("textfile location")
val f1_df = f2.map(_.split("\\|"))
              .map(x => d1(
                x(0).toString,
                x(0).toString,
                x(0).toString,
                x(0).toString,
                x(0).toString
              )).toDF
// this will give u this table
+----------+----------+----------+----------+----------+
|    ckt_id|  location|     usage|      port|   machine|
+----------+----------+----------+----------+----------+
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|
+----------+----------+----------+----------+----------+ 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		01-14-2017
	
		
		11:12 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I have been really looking to index/rank a grouped rdd. The RDD was grouped by a key as you can see below and I want to index it starting from number 2 for each of the arrays under a key.  What I have   scala.collection.immutable.Map[String,Array[(String, String, String)]] = 
Map( 
        394 -> Array((394,394,0), (394,362,7), (394,368,7)), 
        328 -> Array((328,328,0), (328,324,7), (328,325,7), (328,326,7), (328,327,7), (328,329,7),
        368 -> Array((368,368,0), (368,394,7), (368,396,7), (368,397,7), (368,479896,7)),
        278 -> Array((278,278,0), (278,371,7), (278,372,7))
)  What I want (Notice the the new 4th element of each Array, its an index starting from 2)
  
        394 -> Array((394,394,0,2), (394,362,7,3), (394,368,7,4)), 
        328 -> Array((328,328,0,2), (328,324,7,3), (328,325,7,4), (328,326,7,5), (328,327,7), (328,329,7,6),
        368 -> Array((368,368,0,2), (368,394,7,3), (368,396,7,4), (368,397,7,5), (368,479896,7,6),
        278 -> Array((278,278,0,2), (278,371,7,3), (278,372,7,4))
   
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache Spark
			
    
	
		
		
		01-13-2017
	
		
		11:43 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Thanks! I will be testing it soon and will be accepting your answer if it works out. 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		01-11-2017
	
		
		06:10 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 I have a rows like this below.  +--------------+-------------------+-------------------+
|            id|    line_start_date|      line_end_date|
+--------------+-------------------+-------------------+
|        ID1   | 8/15/2016 00:00:00| 8/21/2016 23:55:59|
|        ID2   | 1/25/2016 00:00:00| 1/31/2016 23:55:59|
|        ID3   | 2/29/2016 00:00:00| 2/29/2016 23:55:59|
|        ID4   |  2/8/2016 00:00:00| 2/14/2016 23:55:59|
|        ID5   | 7/25/2016 00:00:00| 7/31/2016 23:55:59|
|        ID6   |  8/1/2016 00:00:00|  8/7/2016 23:55:59|
+--------------+-------------------+-------------------+
  Note:date format is "MM/dd/yyyy HH:mm:ss" in UTC  I want to generate individual dates between the start date and end date for each of the Ids.  As a result I will have more rows and easily do a groupBy to do aggregation I want.  Example Output Required:  +--------------+-------------------+
|            id|    dates          |   
+--------------+-------------------+
|        ID1   | 8/15/2016 00:00:00|
|        ID1   | 8/16/2016 00:00:00|
|        ID1   | 8/17/2016 00:00:00| 
|        ...   | ......            | 
|        ...   | ......            | 
+--------------+-------------------+  How can I do this in dataframe API? Searched for hours, no clue yet! 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
		
			
				
						
							Labels:
						
						
		
			
	
					
			
		
	
	
	
	
				
		
	
	
- Labels:
- 
						
							
		
			Apache Spark
			
    
	
		
		
		01-10-2017
	
		
		06:51 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
				
		
	
		
					
							 Thanks for your help. It kinda helped. I was getting 
"ArrayOutOfBound..." error while trying to iterate over, couldn't fix it 
after debugging. Added my code below. :)
 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		
			
    
	
		
		
		01-10-2017
	
		
		06:51 PM
	
	
	
	
	
	
	
	
	
	
	
	
	
	
		
	
				
		
			
					
	
		1 Kudo
		
	
				
		
	
		
					
							 Finally find out the solution. Here is the full code below.   Fire up a spark shell, change the 'hadoopPath' below to your own hdfs path which contains several other directories with same schema and see it yourself. It will convert each dataset to dataframe and print the table.  import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Test(
  attr1:String,
  attr2:String
)
sc.setLogLevel("WARN")
import  org.apache.hadoop.fs.{FileSystem,Path}
val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath"))
def doSomething(file: String) = {
 println (file);
 // your logic of processing a single file comes here
 val x = sc.textFile(file);
 val classMapper = x.map(_.split("\\|"))
          .map(x => refLineID(
            x(0).toString,
            x(1).toString
          )).toDF
  classMapper.show()
}
files.foreach( filename => {
             // the following code makes sure "_SUCCESS" file name is not processed
             val a = filename.getPath.toString()
             val m = a.split("/")
             val name = m(10)
             println("\nFILENAME: " + name)
             if (name == "_SUCCESS") {
               println("Cannot Process '_SUCCSS' Filename")
             } else {
               doSomething(a)
             }
}) 
						
					
					... View more
				
			
			
			
			
			
			
			
			
			
		- « Previous
- Next »
 
        













