Reply
Explorer
Posts: 21
Registered: ‎03-28-2017

How to split the dataframe of multiple files into multiple smaller dataframes in Spark?

In a directory, I have sub directories which are created everyday. My requirement is to work on the files that are created yesterday. To do that, I came up with a logic that will get the latest dirs. In my case yesterday's dirs. I was able to do it using the below code.

 

val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = Instant.now                                                                           // Gets current date in the format:2017-12-13T09:40:29.920Z
val today = now.toEpochMillival yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMillival todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"
val folders = getFileTree(new File("dailylogs")).filterNot(_.getName.endsWith(".log"))  // Gets the date of dir
val folderCrtDateDesc = folders.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFolder = folderCrtDateDesc.map(y=>(y._1,t(y._2)))
val folderToday = latestFolder.filter(y => y._2==todaySimpDate)

 

Now I have the latest dir in folderToday which looks like: "dailylogs/auditlogsdec27". Using the above code I can load the whole dir into spark, which in turn loads all the files into spark in a single dataframe. Each file starts with the record: "JobID" and ends with the record: 

"[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper..."

There are 3 kinds of status in files in that directory. They are error, success, failure

The status for 'error' can be identified from the third line. For 'success' & 'failure' the same could be found on sixth line in the file.

 

file1: status: errorJobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column [Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...file2: status: success
 JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...file3: status: failure
 JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...

 

I know how to load a single file into spark and work on that dataframe. Since there are huge number of files in the dir everyday, I want to follow this approach of loading the whole dir into a single dataframe and then work on the data inside it rather open and read every small file. I want to split the dataframe based on the last record as the delimiter (in this case, each file ends with ... ) and create three separate dataframes for the error, success & failure (three dataframes of their own). Can anyone tell me how can I implement that ? 

Cloudera Employee
Posts: 481
Registered: ‎08-11-2014

Re: How to split the dataframe of multiple files into multiple smaller dataframes in Spark?

You can make a DataFrame over all the files and then filter out the lines you don't want.

You can make a DataFrame for just the files you want, then union them together.

Both are viable.

 

If you're saying different data types are mixed into sections of each file, that's harder, as you need to use something like mapPartitions to carefully process each file 3 times.

 

 

Explorer
Posts: 21
Registered: ‎03-28-2017

Re: How to split the dataframe of multiple files into multiple smaller dataframes in Spark?

The files will not be in a specific order. Is this a solution: Load all the files into Spark & create a dataframe out of it and then split this main dataframe into smaller ones by using the delimiter("...") which is present at the end of each file. Once this is done, map the dataframes by checking if the third line of each file contains the words: "SEVERE: Error" and group/merge them together. Similarly following the approach for the other cases and finally have three separate dataframes exclusice for each case.

Is this approach viable or is there any better way I can follow.
Announcements