Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Extract values from json and convert into a new schema with generated values in NiFI

avatar
Contributor

I have JSON data like in the attached file and trying to extract some values and convert it to a new schema with some generated values:

sampledata.txt

{
  "id": 2920236,
  "sampling_rate": null,
  "timestamp": "2017-12-22 15:05:31",
  "location": {
    "id": 944,
    "latitude": "51.575",
    "longitude": "6.981",
    "country": "DE"
  },
  "sensor": {
    "id": 1887,
    "pin": "1",
    "sensor_type": {
      "id": 14,
      "name": "SDS011",
      "manufacturer": "Nova Fitness"
    }
  },
  "sensordatavalues": [
    {
      "id": 1312395485,
      "value": "116.03",
      "value_type": "P1"
    },
    {
      "id": 1312395486,
      "value": "38.10",
      "value_type": "P2"
    }
  ],
  "coord": {
    "lon": 6.98,
    "lat": 51.58
  },
  "weather": [
    {
      "id": 741,
      "main": "Fog",
      "description": "fog",
      "icon": "50n"
    },
    {
      "id": 701,
      "main": "Mist",
      "description": "mist",
      "icon": "50n"
    }
  ],
  "base": "stations",
  "main": {
    "temp": 281.9,
    "pressure": 1034,
    "humidity": 100,
    "temp_min": 281.15,
    "temp_max": 282.15
  },
  "visibility": 9000,
  "wind": {
    "speed": 2.1,
    "deg": 340
  },
  "clouds": {
    "all": 75
  },
  "dt": 1513957800,
  "sys": {
    "type": 1,
    "id": 4886,
    "message": 0.0043,
    "country": "DE",
    "sunrise": 1513928193,
    "sunset": 1513956338
  },
  "name": "Gladbeck",
  "cod": 200
}
|-- PM10 (which is the value of sensordatavalues where value_type=P1 )
|-- timestamp (which is timestamp)
|-- sensor_id (which is the id of sensordatavalues where value_type=P1 )
|-- is_rain (is endcodes as whether 1 or 0 depending on the value of weather.cod)
|-- wind_deg (which is weather.wind.deg)
|-- wind_speed (which is weather.wind.speed)
|-- humidity (which is weather.main.humidity)
|-- pressure (which is weather.main.pressure)
|-- temperature (which is weather.main.temp)
|-- lon (which is location.longitude)
|-- lat (which is location.latitude)
|-- season (which is a value between 1 and 4 depending on the month)
|-- is_rushHour (which is a value 1 or 0 depending on the time)

Any ideas how to transform my data?

1 ACCEPTED SOLUTION

avatar
Master Guru

You have a few different kinds of transformations going on there:

1) Value -> Category, such as is_rain, season, and is_rushHour

2) Hoisting values from nested fields (possibly renaming the field), such as wind_speed

3) Fetch on match, such as PM10 and sensor_id, using nested values when a particular value is P1)

In NiFi, some processors are better at different transformations than others. For the first kind, I was able to achieve this with EvaluateJsonPath followed by two UpdateAttribute processors:

- EvaluateJsonPath extracts the "timestamp" field into an attribute called "dt". I couldn't use the actual "dt" field because it appears to be number of seconds since midnight or something, the date evaluates to 1/18/1970:

45784-cjt-evaluatejsonpath.png

- UpdateAttribute 1 extracts the hour and month values from the timestamp. Note the use of GMT timestamp in the functions, you may need to set that to something different (or exclude it altogether):

45785-cjt-updateattribute1.png

- UpdateAttribute 2 performs the categorization logic, by checking if the month is between 12 and 2 (1 = winter), 3-5 (2 = spring), 6-8 (3=summer), 9-11 (4=fall), and also if the hour of day is between 8-9 AM or 5-6 PM for is_rushHour:

45786-cjt-updateattribute2.png

The Expression for rush.hour is as follows:

${hour:ge(8):and(${hour:le(9)}):ifElse(1,${hour:ge(17):and(${hour:lt(19)}):ifElse(1,0)})}

The Expression for season is as follows:

${month:gt(11):or(${month:lt(3)}):ifElse(1,
${month:ge(3):and(${month:lt(6)}):ifElse(2,
${month:ge(6):and(${month:lt(9)}):ifElse(3,4)})})}

- JoltTransformJSON performs the other two types of transformation (with a "shift" spec), along with injecting the categorical fields using Expression Language (in a "default" spec), combined as a "Chain" spec:

[
  {
    "operation": "shift",
    "spec": {
      "timestamp": "timestamp",
      "wind": {
        "deg": "wind_deg",
        "speed": "wind_speed"
      },
      "main": {
        "humidity": "humidity",
        "pressure": "pressure",
        "temp": "temperature"
      },
      "location": {
        "longitude": "long",
        "latitude": "lat"
      },
      "sensordatavalues": {
        "*": {
          "value_type": {
            "P1": {
              "@(2,value)": "PM10",
              "@(2,id)": "sensor_id"
            }
          }
        }
      },
      "cod": {
        "200": {
          "#1": "isRain"
        },
        "*": {
          "#0": "isRain"
        }
      }
    }
  },
  {
    "operation": "default",
    "spec": {
      "season": "${season}",
      "is_rushHour": "${rush.hour}"
    }
  }
]

Note that I am assuming a "cod" value of 200 is rain and everything else is not. If there are other codes, you can add other blocks to the "cod" section of the spec after the 200 block. The * block handles anything not matched (basically an "else" clause).

The flow is put together in the aforementioned order:

45787-complex-json-transforms.png

View solution in original post

1 REPLY 1

avatar
Master Guru

You have a few different kinds of transformations going on there:

1) Value -> Category, such as is_rain, season, and is_rushHour

2) Hoisting values from nested fields (possibly renaming the field), such as wind_speed

3) Fetch on match, such as PM10 and sensor_id, using nested values when a particular value is P1)

In NiFi, some processors are better at different transformations than others. For the first kind, I was able to achieve this with EvaluateJsonPath followed by two UpdateAttribute processors:

- EvaluateJsonPath extracts the "timestamp" field into an attribute called "dt". I couldn't use the actual "dt" field because it appears to be number of seconds since midnight or something, the date evaluates to 1/18/1970:

45784-cjt-evaluatejsonpath.png

- UpdateAttribute 1 extracts the hour and month values from the timestamp. Note the use of GMT timestamp in the functions, you may need to set that to something different (or exclude it altogether):

45785-cjt-updateattribute1.png

- UpdateAttribute 2 performs the categorization logic, by checking if the month is between 12 and 2 (1 = winter), 3-5 (2 = spring), 6-8 (3=summer), 9-11 (4=fall), and also if the hour of day is between 8-9 AM or 5-6 PM for is_rushHour:

45786-cjt-updateattribute2.png

The Expression for rush.hour is as follows:

${hour:ge(8):and(${hour:le(9)}):ifElse(1,${hour:ge(17):and(${hour:lt(19)}):ifElse(1,0)})}

The Expression for season is as follows:

${month:gt(11):or(${month:lt(3)}):ifElse(1,
${month:ge(3):and(${month:lt(6)}):ifElse(2,
${month:ge(6):and(${month:lt(9)}):ifElse(3,4)})})}

- JoltTransformJSON performs the other two types of transformation (with a "shift" spec), along with injecting the categorical fields using Expression Language (in a "default" spec), combined as a "Chain" spec:

[
  {
    "operation": "shift",
    "spec": {
      "timestamp": "timestamp",
      "wind": {
        "deg": "wind_deg",
        "speed": "wind_speed"
      },
      "main": {
        "humidity": "humidity",
        "pressure": "pressure",
        "temp": "temperature"
      },
      "location": {
        "longitude": "long",
        "latitude": "lat"
      },
      "sensordatavalues": {
        "*": {
          "value_type": {
            "P1": {
              "@(2,value)": "PM10",
              "@(2,id)": "sensor_id"
            }
          }
        }
      },
      "cod": {
        "200": {
          "#1": "isRain"
        },
        "*": {
          "#0": "isRain"
        }
      }
    }
  },
  {
    "operation": "default",
    "spec": {
      "season": "${season}",
      "is_rushHour": "${rush.hour}"
    }
  }
]

Note that I am assuming a "cod" value of 200 is rain and everything else is not. If there are other codes, you can add other blocks to the "cod" section of the spec after the 200 block. The * block handles anything not matched (basically an "else" clause).

The flow is put together in the aforementioned order:

45787-complex-json-transforms.png