Support Questions

Find answers, ask questions, and share your expertise

NiFi - convert everything in json to attributes, not one by one (i.e JsonToAttributes)

avatar
Explorer

Is there a way to convert everything in the json message to FlowFile attributes with corresponding values?

Example:

{"ts":1491598800154,"id.orig_h":"172.17.25.52","id.orig_p":59648,"id.resp_h":"82.148.98.187","id.resp_p":80} will automatically create the "ts,id.orig_h ... etc." attributes.

I know how to 'manually' do it one-by-one using EvaluateJsonPath.

1 ACCEPTED SOLUTION

avatar
Master Guru

I second @Wynner 's comment about being cautious. If you determine that you still want all the JSON fields as attributes, you can do it all at once with ExecuteScript. Here is a Groovy script that expects a "flat" JSON file and turns all the fields in into attributes:

import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new groovy.json.JsonSlurper()
def attrs = [:] as Map<String,String>
session.read(flowFile,
    { inputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        obj.each {k,v ->
           attrs[k] = v.toString()
        }
    } as InputStreamCallback)
flowFile = session.putAllAttributes(flowFile, attrs)
session.transfer(flowFile, REL_SUCCESS)

View solution in original post

17 REPLIES 17

avatar

@Sherif Eldeeb

I am not aware of any automatic way for NiFi to convert all of the Json content into one for one attributes.

I will mention you should be cautious when doing this depending on the the size of your Json files and the volume of data in your flow. Having a large number of flow files with very large attributes will require a large size JVM for NiFi to run smoothly. NiFi keeps all of the attributes of the flow files in the graph in the JVM memory configured in the bootstrap.conf file.

avatar
Explorer

Thanks @Wynner for your reply, and for the valuable tips.

It's for log files (BroIDS, to be specific https://www.bro.org/).

Each log entry is one flow file, and the the whole content is around 500 bytes ... so size per flow file shouldn't be an issue (the rate, however, might be because it hits around 10,000 per second).

... I'll keep looking for a solution 🙂

avatar

@Sherif Eldeeb

What are you trying to do with your flow?

avatar
Explorer

@Wynner

Read, Enrich and Ship logs from remote sensors.

We have lots of logs in JSON format (http, dns, conn ... etc.), so, I want to add to the json document few fields (e.g. sensor location), enrich (e.g. geoip) then send them over to elasticsearch.

I couldn't find a straight forward way to deal with the log entry content (like removing/adding fields) without either: a) manipulate the json log entry as 'text', which is far from being optimal ... or b) convert json content as attributes, benefit from attributes flexibility, then back to json again (which has it's own issues still: like having everything converted to strings :/).

Your follow up is appreciated 🙂

avatar

@Sherif Eldeeb

Have you looked at using the JoltTransformJson processor? You could use it with a Jolt specification and add the additional properties at the same time. Also, the Jolt specification can keep the format of the variables.

avatar
Explorer

@Wynner

JoltTransformJson: I couldn't find a way to include flowfile attributes in the jolt spec (so it may be 'default' included in the output JSON).

If I'm missing something, pointing me to the right direction would be highly appreciated.

avatar
Master Guru

The JoltTransformJSON processor accepts NiFi Expression Language in the spec, so you can do something like:

[
  {
    "operation": "default",
    "spec": {
      "newfield": "${my.attr}"
    }
  }
]

And it will add "newfield" to the top level object, with a string value of whatever is in my.attr. Note that it (currently) has to be enclosed in quotes (and thus must be a string field), otherwise the spec validator will try to parse the EL itself and the processor will be marked invalid. This is a bug that will hopefully be fixed in an upcoming release.

avatar
Master Guru

I second @Wynner 's comment about being cautious. If you determine that you still want all the JSON fields as attributes, you can do it all at once with ExecuteScript. Here is a Groovy script that expects a "flat" JSON file and turns all the fields in into attributes:

import org.apache.commons.io.IOUtils
import java.nio.charset.*
def flowFile = session.get();
if (flowFile == null) {
    return;
}
def slurper = new groovy.json.JsonSlurper()
def attrs = [:] as Map<String,String>
session.read(flowFile,
    { inputStream ->
        def text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        def obj = slurper.parseText(text)
        obj.each {k,v ->
           attrs[k] = v.toString()
        }
    } as InputStreamCallback)
flowFile = session.putAllAttributes(flowFile, attrs)
session.transfer(flowFile, REL_SUCCESS)

avatar
Explorer

Whoa! ... thanks, that's a lot to go through (I come from the elastic/logstash universe), I just installed NiFi.

I'll try and feedback (especially on performance).

Thanks a lot.