Created 10-07-2016 07:06 PM
I am creating a dataframe in spark by loading tab separated files from s3. I need to get the input file name information of each record in the dataframe for further processing. I tried
dataframe.select(inputFileName())
But I am getting null value for input_file_name.
Created 10-11-2016 04:31 PM
@Amal Babu this is my take, I'm sure there's better ways
%spark import sqlContext.implicits._ val data = sc.wholeTextFiles("hdfs://sandbox.hortonworks.com:8020/user/guest/") val dataDF = data.toDF() dataDF.select("_1").show()
result
import sqlContext.implicits._ data: org.apache.spark.rdd.RDD[(String, String)] = hdfs://sandbox.hortonworks.com:8020/user/guest/ MapPartitionsRDD[64] at wholeTextFiles at <console>:68 dataDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string] +--------------------+ | _1| +--------------------+ |hdfs://sandbox.ho...| |hdfs://sandbox.ho...| |hdfs://sandbox.ho...| +--------------------+
as long as you use wholeTextFiles you should be able to maintain filenames. From the documentation SparkContext.wholeTextFiles
lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile
, which would return one record per line in each file.
Created 10-18-2016 04:09 PM
@Amal Babu See this Stackoverflow question, I would follow that approach create a case class like they show:
case class Person(inputPath: String, name: String, age: Int) val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt" val rdd = sc.textFile(inputPath).map { l => val tokens = l.split(",") Person(inputPath, tokens(0), tokens(1).trim().toInt) } rdd.collect().foreach(println)
//and than convert RDD to DF
import sqlContext.implicits._ val df = rdd.toDF() df.registerTempTable("x")
http://stackoverflow.com/questions/33293362/how-to-add-source-file-name-to-each-row-in-spark
Created 02-14-2018 10:48 AM
inputFileName() will return a column containing the file location info of current dataframe
dont use it with select
Created 02-14-2018 12:33 PM
try inputFiles function;
returns an array
for a dataframe df
1-
var locationInfo = df.inputFiles(0) //might give OutOfIndexError
//locationInfo has the complete path like "/FileStore/tables/xyz.csv split it to get the name of file
2-
to add a column to dataframe df with the file name
var df2 = df.withColumn("file_name", input_file_name()) //adds column with complete path of file
//create a UDF if you need on file name
def funFileName: ((String) => String) = { (s) =>(s.split("/")(3))}
import org.apache.spark.sql.functions.udf
val myFileName = udf(funFileName)
var df3 = df..withColumn("file_name",myFileName(input_file_name()))