Support Questions

Find answers, ask questions, and share your expertise

Flatten out nested Json Document in Spark2 with scala

I am trying to parse a json file as csv file.

The structure is a little bit complex and I wrote a spark program in scala to accomplish this task. Like the document does not contain a json object per line I decided to use the wholeTextFiles method as suggested in some answers and posts I’ve found.

val jsonRDD  = spark.sparkContext.wholeTextFiles(fileInPath).map(x => x._2)

Then I read the json content in a dataframe

val dwdJson = spark.read.json(jsonRDD)

Then I would like to navigate the json and flatten out the data. This is the schema from dwdJson

root
 |-- meta: struct (nullable = true)
 |    |-- dimensions: struct (nullable = true)
 |    |    |-- lat: long (nullable = true)
 |    |    |-- lon: long (nullable = true)
 |    |    |-- time: long (nullable = true)
 |    |-- directory: string (nullable = true)
 |    |-- filename: string (nullable = true)
 |-- records: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- grids: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- gPt: array (nullable = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- time: string (nullable = true)
<br>

This is my best approach:

val dwdJson_e1 = dwdJson.select($"meta.filename", explode($"records").as("records_flat"))
val dwdJson_e2 = dwdJson_e1.select($"filename", $"records_flat.time",explode($"records_flat.grids").as("gPt")) 
val dwdJson_e3 = dwdJson_e2.select($"filename", $"time", $"gPt.gPt") 
val dwdJson_flat = dwdJson_e3.select($"filename",$"time",$"gPt".getItem(0).as("lat1"),$"gPt".getItem(1).as("long1"),$"gPt".getItem(2).as("lat2"),$"gPt".getItem(3).as("long2"),$"gPt".getItem(4).as("value"))

I am a scala rookie and I am wondering if I can avoid create the intermediate dataframes (dwdJson_e1, dwdJson_e2, dwdJson_e3) that seems to be inefficient and the program runs very slowly (compare with a java parser running in a laptop).

On the other side I could not find I way how to unbind these nested arrays.

  • spark version: 2.0.0
  • scala: 2.11.8
  • java: 1.8
  • HDP: 2.5

sample Json file I want to convert:

{
  "meta" : {
    "directory" : "weather/cosmo/de/grib/12/aswdir_s",
    "filename" : "COSMODE_single_level_elements_ASWDIR_S_2018022312_000.grib2.bz2",
    "dimensions" : {
      "lon" : 589,
      "time" : 3,
      "lat" : 441
    }
   },
  "records" : [ {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 3.366295E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 3.366295E-7 ]
    } ],
    "time" : "2018-02-23T12:15:00Z"
  }, {
    "grids" : [ {
      "gPt" : [ 45.175, 13.55, 45.2, 13.575, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.575, 45.2, 13.6, 4.545918E-7 ]
    }, {
      "gPt" : [ 45.175, 13.6, 45.2, 13.625, 4.545918E-7 ]
    }
    ],
    "time" : "2018-02-23T12:30:00Z"
	}
	]
}

This is a sample output from the json above:

filename, time, lat1, long1, lat2, long2, value
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.55,45.2,13.575,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.575,45.2,13.6,3.366295E-7
ASWDIR_S_...,2018-02-23T12:15:00Z,45.175,13.6,45.2,13.625,3.366295E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.55,45.2,13.575,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.575,45.2,13.6,4.545918E-7
ASWDIR_S_...,2018-02-23T12:30:00Z,45.175,45.175,13.6,45.2,13.625,4.545918E-7

Any help will be appreciated.

Kind regards,

Paul

6 REPLIES 6

Expert Contributor

Hi, @Paul Hernandez. Do you want the following? Since you are using Spark 2.0 on HDP 2.5, I think you can install Apache Spark 2.3 there, too.

scala> spark.read.option("multiLine", "true").json("/tmp/data.json").select($"meta.filename", explode($"records")).select($"filename", $"col.time", explode($"col.grids")).select($"filename", $"time", $"col.gPt").select($"filename", $"time", $"gPt"(0), $"gPt"(1), $"gPt"(2), $"gPt"(3), $"gPt"(4)).show
+--------------------+--------------------+------+------+------+------+-----------+
|            filename|                time|gPt[0]|gPt[1]|gPt[2]|gPt[3]|     gPt[4]|
+--------------------+--------------------+------+------+------+------+-----------+
|COSMODE_single_le...|2018-02-23T12:15:00Z|45.175| 13.55|  45.2|13.575|3.366295E-7|
|COSMODE_single_le...|2018-02-23T12:15:00Z|45.175|13.575|  45.2|  13.6|3.366295E-7|
|COSMODE_single_le...|2018-02-23T12:15:00Z|45.175|  13.6|  45.2|13.625|3.366295E-7|
|COSMODE_single_le...|2018-02-23T12:30:00Z|45.175| 13.55|  45.2|13.575|4.545918E-7|
|COSMODE_single_le...|2018-02-23T12:30:00Z|45.175|13.575|  45.2|  13.6|4.545918E-7|
|COSMODE_single_le...|2018-02-23T12:30:00Z|45.175|  13.6|  45.2|13.625|4.545918E-7|
+--------------------+--------------------+------+------+------+------+-----------+

Expert Contributor

If you can upgrade your cluster, you can use the above in HDP 2.6.4 with Spark 2.2.1, too.

Hi @Dongjoon Hyun, thanks for your answer. We are planing an spark upgrade in the next sprints. I would like to improve the performance of my current script anyway. BR. Paul

Super Guru

You can use the FlattenJson 1.5.0.3.1.0.0-564 processor in Apache NiFi 1.5

Hi @Timothy Spann, thanks for your answer.

We made an effort a couple of month ago to upgrade NiFi 1.0 to 1.4 and now the processor we need is in a newer version 😕

We will consider it. BTW: how is the performance of this processor according to your experience?

BR. Paul

Super Guru

The performance has been very good especially considering it can be on any size node or cluster. Give it a try, upgrade to NiFi 1.5. Certainly easier then recompiling Spark programs. If it doesn't meet your needs go back to Spark.