Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

how to get input file name of a record in spark dataframe?

avatar
New Contributor

I am creating a dataframe in spark by loading tab separated files from s3. I need to get the input file name information of each record in the dataframe for further processing. I tried

dataframe.select(inputFileName())

But I am getting null value for input_file_name.

4 REPLIES 4

avatar
Master Mentor

@Amal Babu this is my take, I'm sure there's better ways

%spark
import sqlContext.implicits._
val data = sc.wholeTextFiles("hdfs://sandbox.hortonworks.com:8020/user/guest/")
val dataDF = data.toDF()
dataDF.select("_1").show()

result

import sqlContext.implicits._
data: org.apache.spark.rdd.RDD[(String, String)] = hdfs://sandbox.hortonworks.com:8020/user/guest/ MapPartitionsRDD[64] at wholeTextFiles at <console>:68
dataDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string]
+--------------------+
|                  _1|
+--------------------+
|hdfs://sandbox.ho...|
|hdfs://sandbox.ho...|
|hdfs://sandbox.ho...|
+--------------------+

as long as you use wholeTextFiles you should be able to maintain filenames. From the documentation SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.

avatar

@Amal Babu See this Stackoverflow question, I would follow that approach create a case class like they show:

case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
    l =>
      val tokens = l.split(",")
      Person(inputPath, tokens(0), tokens(1).trim().toInt)
  }
rdd.collect().foreach(println)

//and than convert RDD to DF

import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")

http://stackoverflow.com/questions/33293362/how-to-add-source-file-name-to-each-row-in-spark

avatar
New Contributor

inputFileName() will return a column containing the file location info of current dataframe

dont use it with select

avatar
New Contributor

try inputFiles function;

returns an array

for a dataframe df

1-

var locationInfo = df.inputFiles(0) //might give OutOfIndexError

//locationInfo has the complete path like "/FileStore/tables/xyz.csv split it to get the name of file

2-

to add a column to dataframe df with the file name

var df2 = df.withColumn("file_name", input_file_name()) //adds column with complete path of file

//create a UDF if you need on file name

def funFileName: ((String) => String) = { (s) =>(s.split("/")(3))}

import org.apache.spark.sql.functions.udf

val myFileName = udf(funFileName)

var df3 = df..withColumn("file_name",myFileName(input_file_name()))