- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
how to read fixed length files in Spark
- Labels:
-
Apache Spark
Created ‎08-04-2016 04:51 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have a fixed length file ( a sample is shown below) and I want to read this file using DataFrames API in Spark(1.6.0).
56 apple TRUE 0.56 45 pear FALSE1.34 34 raspberry TRUE 2.43 34 plum TRUE 1.31 53 cherry TRUE 1.4 23 orange FALSE2.34 56 persimmon FALSE23.2
The fixed width of each columns are 3, 10, 5, 4
Please suggest your opinion.
Created ‎08-04-2016 06:54 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Under the assumption that the file is Text and each line represent one record, you could read the file line by line and map each line to a Row. Then you can create a data frame form the RDD[Row]
something like
sqlContext.createDataFrame(sc.textFile("<file path>").map { x => getRow(x) }, schema)
I have the below basic definition for creating the Row from your line using substring. But you can use your own implementation.
def getRow(x : String) : Row={ val columnArray = new Array[String](4) columnArray(0)=x.substring(0,3) columnArray(1)=x.substring(3,13) columnArray(2)=x.substring(13,18) columnArray(3)=x.substring(18,22) Row.fromSeq(columnArray) }
If the records are not delimited by a new line, you may need to use a FixedLengthInputFormat and read the record one at a time and apply the similar logic as above. The fixedlengthinputformat.record.length in that case will be your total length, 22 in this example. Instead of textFile, you may need to read as sc.newAPIHadoopRDD
Created ‎08-04-2016 06:54 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Under the assumption that the file is Text and each line represent one record, you could read the file line by line and map each line to a Row. Then you can create a data frame form the RDD[Row]
something like
sqlContext.createDataFrame(sc.textFile("<file path>").map { x => getRow(x) }, schema)
I have the below basic definition for creating the Row from your line using substring. But you can use your own implementation.
def getRow(x : String) : Row={ val columnArray = new Array[String](4) columnArray(0)=x.substring(0,3) columnArray(1)=x.substring(3,13) columnArray(2)=x.substring(13,18) columnArray(3)=x.substring(18,22) Row.fromSeq(columnArray) }
If the records are not delimited by a new line, you may need to use a FixedLengthInputFormat and read the record one at a time and apply the similar logic as above. The fixedlengthinputformat.record.length in that case will be your total length, 22 in this example. Instead of textFile, you may need to read as sc.newAPIHadoopRDD
Created ‎08-05-2016 05:33 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks Arun however I have a problem while creating getRow function. Not sure what exactly does it refers to.
Here is the error
<console>:26: error: not found: type Row def getRow(x : String) : Row={ ^ <console>:32: error: not found: value Row Row.fromSeq(columnArray)
Created ‎08-05-2016 06:22 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @Alex Raj Row is org.apache.spark.sql.Row. You need to add the import statement.
Created ‎04-21-2021 03:31 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi All,
in scala dataframe ,I want to read row level total record size till maximum 1060 byte. as SQL table have also max length of record as 1060.do we have function which we can apply on scala data frame to read the file row level record only till 1060 character and extra record can be skip.Please suggest
Created ‎04-21-2021 03:45 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Sorry it's max 8060 characters
Created ‎08-05-2016 06:49 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Great, that fixes the problem but another arises.
scala> sqlContext.createDataFrame(sc.textFile("/user/cloudera/data/fruit_fixedwidth.txt").map { x => getRow(x) }, schema) <console>:31: error: package schema is not a value sqlContext.createDataFrame(sc.textFile("/user/cloudera/data/fruit_fixedwidth.txt").map { x => getRow(x) }, schema) ^
I am really getting excited now. What is the schema all about in this context?
Created ‎08-08-2016 12:26 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Well, schema is somewhat like the header. say id, fruitName, isAvailable, unitPrice in your case. You can specify the schema programmatically. Have a quick reference here
Created ‎08-08-2016 03:44 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can do something like
val schemaString = "id,fruitName,isAvailable,unitPrice" val fields = schemaString.split(",") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)
Created ‎08-08-2016 09:10 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Alex, Can you clarify which version of Spark you are using?
