Created 10-07-2016 07:06 PM
I am creating a dataframe in spark by loading tab separated files from s3. I need to get the input file name information of each record in the dataframe for further processing. I tried
dataframe.select(inputFileName())
But I am getting null value for input_file_name.
Created 10-11-2016 04:31 PM
@Amal Babu this is my take, I'm sure there's better ways
%spark
import sqlContext.implicits._
val data = sc.wholeTextFiles("hdfs://sandbox.hortonworks.com:8020/user/guest/")
val dataDF = data.toDF()
dataDF.select("_1").show()
result
import sqlContext.implicits._ data: org.apache.spark.rdd.RDD[(String, String)] = hdfs://sandbox.hortonworks.com:8020/user/guest/ MapPartitionsRDD[64] at wholeTextFiles at <console>:68 dataDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string] +--------------------+ | _1| +--------------------+ |hdfs://sandbox.ho...| |hdfs://sandbox.ho...| |hdfs://sandbox.ho...| +--------------------+
as long as you use wholeTextFiles you should be able to maintain filenames. From the documentation SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.
Created 10-18-2016 04:09 PM
@Amal Babu See this Stackoverflow question, I would follow that approach create a case class like they show:
case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
Person(inputPath, tokens(0), tokens(1).trim().toInt)
}
rdd.collect().foreach(println)//and than convert RDD to DF
import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")http://stackoverflow.com/questions/33293362/how-to-add-source-file-name-to-each-row-in-spark
Created 02-14-2018 10:48 AM
inputFileName() will return a column containing the file location info of current dataframe
dont use it with select
Created 02-14-2018 12:33 PM
try inputFiles function;
returns an array
for a dataframe df
1-
var locationInfo = df.inputFiles(0) //might give OutOfIndexError
//locationInfo has the complete path like "/FileStore/tables/xyz.csv split it to get the name of file
2-
to add a column to dataframe df with the file name
var df2 = df.withColumn("file_name", input_file_name()) //adds column with complete path of file
//create a UDF if you need on file name
def funFileName: ((String) => String) = { (s) =>(s.split("/")(3))}
import org.apache.spark.sql.functions.udf
val myFileName = udf(funFileName)
var df3 = df..withColumn("file_name",myFileName(input_file_name()))