Support Questions

Find answers, ask questions, and share your expertise

how to read fixed length files in Spark

avatar
Expert Contributor

I have a fixed length file ( a sample is shown below) and I want to read this file using DataFrames API in Spark(1.6.0).

56 apple     TRUE 0.56
45 pear      FALSE1.34
34 raspberry TRUE 2.43
34 plum      TRUE 1.31
53 cherry    TRUE 1.4 
23 orange    FALSE2.34
56 persimmon FALSE23.2

The fixed width of each columns are 3, 10, 5, 4

Please suggest your opinion.

1 ACCEPTED SOLUTION

avatar
Super Collaborator

Under the assumption that the file is Text and each line represent one record, you could read the file line by line and map each line to a Row. Then you can create a data frame form the RDD[Row]

something like

sqlContext.createDataFrame(sc.textFile("<file path>").map { x => getRow(x) }, schema)

I have the below basic definition for creating the Row from your line using substring. But you can use your own implementation.

def getRow(x : String) : Row={    
val columnArray = new Array[String](4)
columnArray(0)=x.substring(0,3)
columnArray(1)=x.substring(3,13)
columnArray(2)=x.substring(13,18)
columnArray(3)=x.substring(18,22)
Row.fromSeq(columnArray)  
}

If the records are not delimited by a new line, you may need to use a FixedLengthInputFormat and read the record one at a time and apply the similar logic as above. The fixedlengthinputformat.record.length in that case will be your total length, 22 in this example. Instead of textFile, you may need to read as sc.newAPIHadoopRDD

View solution in original post

11 REPLIES 11

avatar
Super Collaborator

Under the assumption that the file is Text and each line represent one record, you could read the file line by line and map each line to a Row. Then you can create a data frame form the RDD[Row]

something like

sqlContext.createDataFrame(sc.textFile("<file path>").map { x => getRow(x) }, schema)

I have the below basic definition for creating the Row from your line using substring. But you can use your own implementation.

def getRow(x : String) : Row={    
val columnArray = new Array[String](4)
columnArray(0)=x.substring(0,3)
columnArray(1)=x.substring(3,13)
columnArray(2)=x.substring(13,18)
columnArray(3)=x.substring(18,22)
Row.fromSeq(columnArray)  
}

If the records are not delimited by a new line, you may need to use a FixedLengthInputFormat and read the record one at a time and apply the similar logic as above. The fixedlengthinputformat.record.length in that case will be your total length, 22 in this example. Instead of textFile, you may need to read as sc.newAPIHadoopRDD

avatar
Expert Contributor

Thanks Arun however I have a problem while creating getRow function. Not sure what exactly does it refers to.

Here is the error

<console>:26: error: not found: type Row
         def getRow(x : String) : Row={
                                  ^
<console>:32: error: not found: value Row
       Row.fromSeq(columnArray)

avatar
Super Collaborator

Hi @Alex Raj Row is org.apache.spark.sql.Row. You need to add the import statement.

avatar

Hi All,

in scala dataframe ,I want to read row level total record size till maximum 1060 byte. as SQL table have also max length of record as 1060.do we have function which we can apply on scala data frame to read the file row level record only till 1060 character and extra record can be skip.Please suggest

avatar

Sorry it's max 8060 characters

avatar
Expert Contributor

Great, that fixes the problem but another arises.

scala> sqlContext.createDataFrame(sc.textFile("/user/cloudera/data/fruit_fixedwidth.txt").map { x => getRow(x) }, schema)
<console>:31: error: package schema is not a value
              sqlContext.createDataFrame(sc.textFile("/user/cloudera/data/fruit_fixedwidth.txt").map { x => getRow(x) }, schema)
                                                                                                                         ^

I am really getting excited now. What is the schema all about in this context?

avatar
Super Collaborator

Well, schema is somewhat like the header. say id, fruitName, isAvailable, unitPrice in your case. You can specify the schema programmatically. Have a quick reference here

avatar
Super Collaborator

You can do something like

val schemaString = "id,fruitName,isAvailable,unitPrice" 
val fields = schemaString.split(",")
  .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
val schema = StructType(fields)

avatar
Rising Star

Hi Alex, Can you clarify which version of Spark you are using?