Member since
02-17-2017
71
Posts
17
Kudos Received
3
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4499 | 03-02-2017 04:19 PM | |
32395 | 02-20-2017 10:44 PM | |
19066 | 01-10-2017 06:51 PM |
02-20-2017
10:44 PM
1 Kudo
Hi @Dinesh Das
the following code is tested on spark-shell with scala and works perfectly with psv and csv data. the following are the datasets I used from the same directory /data/dev/spark file1.csv
1,2,3
x,y,z
a,b,c file2.psv
q|w|e
1|2|3 To test, you can copy paste my code into spark shell (copy only few lines/functions at a time, do not paste all code at once in Spark Shell) import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
// --EDIT YOUR SCHEMA HERE
case class refLineID(
attr1:String,
attr2:String,
attr3:String
)
import org.apache.hadoop.fs.{FileSystem,Path}
val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/data/dev/spark"))
// function to check delimiter of each file
def checkDelim(file:String): String ={
val x = sc.textFile(file);
val grab_x = x.take(1) // grab the first row to check delimiter
val str = grab_x.mkString("")
val pipe = "\\|"
val comma = "\\,"
var delim = ""
for (c <- str) {
if (c == ',') {
delim = comma
} else if (c == '|') {
delim = pipe
}
}
return delim
}
// -- Function to convert RDD to dataframe after checking delimiter
def convertToDF(file: String) = {
var delim = ""
delim = checkDelim(file) // grab the delimiter by calling function
val x = sc.textFile(file);
// pass the file and delimiter type to transform to dataframe
val x_df = x.map(_.split(delim))
.map(a => refLineID(
a(0).toString,
a(1).toString,
a(2).toString
)).toDF
x_df.show()
}
// -- Loop through each file and call the function 'convertToDF'
files.foreach(filename => {
val a = filename.getPath.toString()
convertToDF(a)
})
Note: I'm using Spark 1.6 and scala. I am using one function called "checkDelim" which checks the delimiter of the first row of each file under the directory. "convertToDataframe" function then knows how to split the rows and converts the data into a dataframe. Pretty simple!
... View more
02-17-2017
06:48 PM
@Joe Widen @Timothy Spann Why did I get a down vote here? My code is working!!! Nothing against you but just want to know if you could figure out the reason! Thanks!
... View more
02-08-2017
05:15 PM
You can use dataframe. Convert the text file to a dataframe like the code below and do a join to start comparing. sc.setLogLevel("WARN")
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
case class d1(
ckt_id:String,
location:String,
usage:String,
port:String,
machine:String
)
val f2 = sc.textFile("textfile location")
val f1_df = f2.map(_.split("\\|"))
.map(x => d1(
x(0).toString,
x(0).toString,
x(0).toString,
x(0).toString,
x(0).toString
)).toDF
// this will give u this table
+----------+----------+----------+----------+----------+
| ckt_id| location| usage| port| machine|
+----------+----------+----------+----------+----------+
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|ABZCSD21DF|
|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|AXZCSD21DF|
|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|BXZCSD21DF|
+----------+----------+----------+----------+----------+
... View more
01-14-2017
11:12 PM
I have been really looking to index/rank a grouped rdd. The RDD was grouped by a key as you can see below and I want to index it starting from number 2 for each of the arrays under a key. What I have scala.collection.immutable.Map[String,Array[(String, String, String)]] =
Map(
394 -> Array((394,394,0), (394,362,7), (394,368,7)),
328 -> Array((328,328,0), (328,324,7), (328,325,7), (328,326,7), (328,327,7), (328,329,7),
368 -> Array((368,368,0), (368,394,7), (368,396,7), (368,397,7), (368,479896,7)),
278 -> Array((278,278,0), (278,371,7), (278,372,7))
) What I want (Notice the the new 4th element of each Array, its an index starting from 2)
394 -> Array((394,394,0,2), (394,362,7,3), (394,368,7,4)),
328 -> Array((328,328,0,2), (328,324,7,3), (328,325,7,4), (328,326,7,5), (328,327,7), (328,329,7,6),
368 -> Array((368,368,0,2), (368,394,7,3), (368,396,7,4), (368,397,7,5), (368,479896,7,6),
278 -> Array((278,278,0,2), (278,371,7,3), (278,372,7,4))
... View more
Labels:
- Labels:
-
Apache Spark
01-13-2017
11:43 PM
1 Kudo
Thanks! I will be testing it soon and will be accepting your answer if it works out.
... View more
01-11-2017
06:10 PM
I have a rows like this below. +--------------+-------------------+-------------------+
| id| line_start_date| line_end_date|
+--------------+-------------------+-------------------+
| ID1 | 8/15/2016 00:00:00| 8/21/2016 23:55:59|
| ID2 | 1/25/2016 00:00:00| 1/31/2016 23:55:59|
| ID3 | 2/29/2016 00:00:00| 2/29/2016 23:55:59|
| ID4 | 2/8/2016 00:00:00| 2/14/2016 23:55:59|
| ID5 | 7/25/2016 00:00:00| 7/31/2016 23:55:59|
| ID6 | 8/1/2016 00:00:00| 8/7/2016 23:55:59|
+--------------+-------------------+-------------------+
Note:date format is "MM/dd/yyyy HH:mm:ss" in UTC I want to generate individual dates between the start date and end date for each of the Ids. As a result I will have more rows and easily do a groupBy to do aggregation I want. Example Output Required: +--------------+-------------------+
| id| dates |
+--------------+-------------------+
| ID1 | 8/15/2016 00:00:00|
| ID1 | 8/16/2016 00:00:00|
| ID1 | 8/17/2016 00:00:00|
| ... | ...... |
| ... | ...... |
+--------------+-------------------+ How can I do this in dataframe API? Searched for hours, no clue yet!
... View more
Labels:
- Labels:
-
Apache Spark
01-10-2017
06:51 PM
Thanks for your help. It kinda helped. I was getting
"ArrayOutOfBound..." error while trying to iterate over, couldn't fix it
after debugging. Added my code below. :)
... View more
01-10-2017
06:51 PM
1 Kudo
Finally find out the solution. Here is the full code below. Fire up a spark shell, change the 'hadoopPath' below to your own hdfs path which contains several other directories with same schema and see it yourself. It will convert each dataset to dataframe and print the table. import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Test(
attr1:String,
attr2:String
)
sc.setLogLevel("WARN")
import org.apache.hadoop.fs.{FileSystem,Path}
val files = FileSystem.get( sc.hadoopConfiguration ).listStatus(new Path("/hadoopPath"))
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val x = sc.textFile(file);
val classMapper = x.map(_.split("\\|"))
.map(x => refLineID(
x(0).toString,
x(1).toString
)).toDF
classMapper.show()
}
files.foreach( filename => {
// the following code makes sure "_SUCCESS" file name is not processed
val a = filename.getPath.toString()
val m = a.split("/")
val name = m(10)
println("\nFILENAME: " + name)
if (name == "_SUCCESS") {
println("Cannot Process '_SUCCSS' Filename")
} else {
doSomething(a)
}
})
... View more
- « Previous
- Next »