Created 03-06-2018 08:12 AM
I am trying to parse a json file as csv file.
The structure is a little bit complex and I wrote a spark program in scala to accomplish this task. Like the document does not contain a json object per line I decided to use the wholeTextFiles method as suggested in some answers and posts I’ve found.
val jsonRDD = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2)
Then I read the json content in a dataframe
val dwdJson = spark.read.json(jsonRDD)
Then I would like to navigate the json and flatten out the data. This is the schema from dwdJson
root |-- meta: struct (nullable = true) | |-- dimensions: struct (nullable = true) | | |-- lat: long (nullable = true) | | |-- lon: long (nullable = true) | | |-- time: long (nullable = true) | |-- directory: string (nullable = true) | |-- filename: string (nullable = true) |-- records: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- grids: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- gPt: array (nullable = true) | | | | | |-- element: double (containsNull = true) | | |-- time: string (nullable = true) <br>
This is my best approach:
val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat")) val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt")) val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt") val dwdJson_flat = dwdJson_e3.select($"filename",$"time",$"gPt".getItem(0).as("lat1"),$"gPt".getItem(1).as("long1"),$"gPt".getItem(2).as("lat2"),$"gPt".getItem(3).as("long2"),$"gPt".getItem(4).as("value"))
I am a scala rookie and I am wondering if I can avoid create the intermediate dataframes (dwdJson_e1, dwdJson_e2, dwdJson_e3) that seems to be inefficient and the program runs very slowly (compare with a java parser running in a laptop).
On the other side I could not find I way how to unbind these nested arrays.
sample Json file I want to convert:
{ "meta" : { "directory" : "weather/cosmo/de/grib/12/aswdir_s", "filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2", "dimensions" : { "lon" : 589, "time" : 3, "lat" : 441 } }, "records" : [ { "grids" : [ { "gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ] }, { "gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ] }, { "gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ] } ], "time" : "2018-02-23T12:15:00Z" }, { "grids" : [ { "gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ] }, { "gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ] }, { "gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ] } ], "time" : "2018-02-23T12:30:00Z" } ] }
This is a sample output from the json above:
filename, time, lat1, long1, lat2, long2, value ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55,45.2,13.575,3.366295E-7 ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575,45.2,13.6,3.366295E-7 ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6,45.2,13.625,3.366295E-7 ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.55,45.2,13.575,4.545918E-7 ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7 ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7
Any help will be appreciated.
Kind regards,
Paul
Created 03-06-2018 07:19 PM
Hi, @Paul Hernandez. Do you want the following? Since you are using Spark 2.0 on HDP 2.5, I think you can install Apache Spark 2.3 there, too.
scala> spark.read.option("multiLine", "true").json("/tmp/data.json").select($"meta.filename", explode($"records")).select($"filename", $"col.time", explode($"col.grids")).select($"filename", $"time", $"col.gPt").select($"filename", $"time", $"gPt"(0), $"gPt"(1), $"gPt"(2), $"gPt"(3), $"gPt"(4)).show +--------------------+--------------------+------+------+------+------+-----------+ | filename| time|gPt[0]|gPt[1]|gPt[2]|gPt[3]| gPt[4]| +--------------------+--------------------+------+------+------+------+-----------+ |COSMODE_single_le...|2018-02-23T12:15:00Z|45.175| 13.55| 45.2|13.575|3.366295E-7| |COSMODE_single_le...|2018-02-23T12:15:00Z|45.175|13.575| 45.2| 13.6|3.366295E-7| |COSMODE_single_le...|2018-02-23T12:15:00Z|45.175| 13.6| 45.2|13.625|3.366295E-7| |COSMODE_single_le...|2018-02-23T12:30:00Z|45.175| 13.55| 45.2|13.575|4.545918E-7| |COSMODE_single_le...|2018-02-23T12:30:00Z|45.175|13.575| 45.2| 13.6|4.545918E-7| |COSMODE_single_le...|2018-02-23T12:30:00Z|45.175| 13.6| 45.2|13.625|4.545918E-7| +--------------------+--------------------+------+------+------+------+-----------+
Created 03-06-2018 07:22 PM
If you can upgrade your cluster, you can use the above in HDP 2.6.4 with Spark 2.2.1, too.
Created 03-06-2018 08:11 PM
Hi @Dongjoon Hyun, thanks for your answer. We are planing an spark upgrade in the next sprints. I would like to improve the performance of my current script anyway. BR. Paul
Created 03-06-2018 07:20 PM
You can use the FlattenJson 1.5.0.3.1.0.0-564 processor in Apache NiFi 1.5
Created 03-06-2018 08:09 PM
Hi @Timothy Spann, thanks for your answer.
We made an effort a couple of month ago to upgrade NiFi 1.0 to 1.4 and now the processor we need is in a newer version 😕
We will consider it. BTW: how is the performance of this processor according to your experience?
BR. Paul
Created 03-06-2018 08:49 PM
The performance has been very good especially considering it can be on any size node or cluster. Give it a try, upgrade to NiFi 1.5. Certainly easier then recompiling Spark programs. If it doesn't meet your needs go back to Spark.