Support Questions

Find answers, ask questions, and share your expertise

[Apache Nifi] Split a flowfile based on json-attribute of each record

avatar

I have a very simple use case but unable to come up with the right combination of processors. So, consider my flowfile coming from a HDFS json file with the following content-

[{"type":"A","id":"001","content":"abc"},
{"type":"A","id":"001","content":"xyz"},
{"type":"A","id":"002","content":"sdf"},
{"type":"B","id":"004","content":"df"},
{"type":"B","id":"002","content":"dsg"},
{"type":"B","id":"002","content":"sfg"},
{"type":"B","id":"004","content":"sfg"}]


I want to finally store these in the following directory structure back in HDFS.

/A/001/data.txt (data.txt-> [{"type":"A","id":"001","content":"abc"},{"type":"A","id":"001","content":"xyz"}])
/A/002/data.txt (data.txt-> [{"type":"A","id":"002","content":"sdf"}])
/B/002/data.txt (data.txt-> [{"type":"B","id":"002","content":"dsg"},{"type":"B","id":"002","content":"sfg"}])
/B/004/data.txt (data.txt-> [{"type":"B","id":"004","content":"df"},{"type":"B","id":"004","content":"sfg"}])

Any ideas?

1 ACCEPTED SOLUTION

avatar
Master Guru

It appears you want to set the destination path to the value of type, followed by the value of id, followed by data.txt, and in the content of that file you want the single-element JSON array containing the object that provided the values. If that is the case:

As of NiFi 1.3.0, there is a PartitionRecord processor which will do most of what you want. You can create a JsonReader using the following example schema:

{"type":"record","name":"test","namespace":"nifi",
  "fields": [
    {"name":"type","type":"string"},
    {"name":"id","type":"string"},
    {"name":"content","type":"string"}
  ]
}

You can also create a JsonRecordSetWriter that inherits the schema (as of NiFi 1.4.0) or uses the same one (prior to NiFi 1.4.0). Then in PartitionRecord you would create two user-defined properties, say record.type and record.id, configured as follows:

43617-partitionrecordexample.png

Given your example data, you will get 4 flow files, each containing the data from the 4 groups you mention above. Additionally you have record.type and record.id attributes on those flow files. You can route them to UpdateAttribute where you set filename to data.txt and absolute.path to /${type}/${id}. Then you can send them to PutHDFS where you set the Directory to ${absolute.path}.

View solution in original post

5 REPLIES 5

avatar
Master Guru

It appears you want to set the destination path to the value of type, followed by the value of id, followed by data.txt, and in the content of that file you want the single-element JSON array containing the object that provided the values. If that is the case:

As of NiFi 1.3.0, there is a PartitionRecord processor which will do most of what you want. You can create a JsonReader using the following example schema:

{"type":"record","name":"test","namespace":"nifi",
  "fields": [
    {"name":"type","type":"string"},
    {"name":"id","type":"string"},
    {"name":"content","type":"string"}
  ]
}

You can also create a JsonRecordSetWriter that inherits the schema (as of NiFi 1.4.0) or uses the same one (prior to NiFi 1.4.0). Then in PartitionRecord you would create two user-defined properties, say record.type and record.id, configured as follows:

43617-partitionrecordexample.png

Given your example data, you will get 4 flow files, each containing the data from the 4 groups you mention above. Additionally you have record.type and record.id attributes on those flow files. You can route them to UpdateAttribute where you set filename to data.txt and absolute.path to /${type}/${id}. Then you can send them to PutHDFS where you set the Directory to ${absolute.path}.

avatar
Super Collaborator

@Matt Burgess,

do we need to have Schema Registry(SR) to use Schemas or can we do this without SR.?

Regards,

Sai

avatar
Super Collaborator

@Matt Burgess

Nevermind , I was able to do this using AvroSchemaRegistry. Thank you.

Regards,

Sai

avatar
Master Guru

You can do it without a schema registry, if your readers and writers "Use 'Schema Text' Property" and you hardcode the schema into the Schema Text property. Since you're using the same for both reader and writer, it's easier to maintain in a registry, but only a simple copy-paste if you don't want to use the registry.

avatar

Precisely what I needed. Thanks!