- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
[Apache Nifi] Split a flowfile based on json-attribute of each record
- Labels:
-
Apache NiFi
Created ‎11-16-2017 05:07 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have a very simple use case but unable to come up with the right combination of processors. So, consider my flowfile coming from a HDFS json file with the following content-
[{"type":"A","id":"001","content":"abc"}, {"type":"A","id":"001","content":"xyz"}, {"type":"A","id":"002","content":"sdf"}, {"type":"B","id":"004","content":"df"}, {"type":"B","id":"002","content":"dsg"}, {"type":"B","id":"002","content":"sfg"}, {"type":"B","id":"004","content":"sfg"}]
I want to finally store these in the following directory structure back in HDFS.
/A/001/data.txt (data.txt-> [{"type":"A","id":"001","content":"abc"},{"type":"A","id":"001","content":"xyz"}]) /A/002/data.txt (data.txt-> [{"type":"A","id":"002","content":"sdf"}]) /B/002/data.txt (data.txt-> [{"type":"B","id":"002","content":"dsg"},{"type":"B","id":"002","content":"sfg"}]) /B/004/data.txt (data.txt-> [{"type":"B","id":"004","content":"df"},{"type":"B","id":"004","content":"sfg"}])
Any ideas?
Created on ‎11-16-2017 02:49 PM - edited ‎08-17-2019 11:11 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
It appears you want to set the destination path to the value of type, followed by the value of id, followed by data.txt, and in the content of that file you want the single-element JSON array containing the object that provided the values. If that is the case:
As of NiFi 1.3.0, there is a PartitionRecord processor which will do most of what you want. You can create a JsonReader using the following example schema:
{"type":"record","name":"test","namespace":"nifi", "fields": [ {"name":"type","type":"string"}, {"name":"id","type":"string"}, {"name":"content","type":"string"} ] }
You can also create a JsonRecordSetWriter that inherits the schema (as of NiFi 1.4.0) or uses the same one (prior to NiFi 1.4.0). Then in PartitionRecord you would create two user-defined properties, say record.type and record.id, configured as follows:
Given your example data, you will get 4 flow files, each containing the data from the 4 groups you mention above. Additionally you have record.type and record.id attributes on those flow files. You can route them to UpdateAttribute where you set filename to data.txt and absolute.path to /${type}/${id}. Then you can send them to PutHDFS where you set the Directory to ${absolute.path}.
Created on ‎11-16-2017 02:49 PM - edited ‎08-17-2019 11:11 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
It appears you want to set the destination path to the value of type, followed by the value of id, followed by data.txt, and in the content of that file you want the single-element JSON array containing the object that provided the values. If that is the case:
As of NiFi 1.3.0, there is a PartitionRecord processor which will do most of what you want. You can create a JsonReader using the following example schema:
{"type":"record","name":"test","namespace":"nifi", "fields": [ {"name":"type","type":"string"}, {"name":"id","type":"string"}, {"name":"content","type":"string"} ] }
You can also create a JsonRecordSetWriter that inherits the schema (as of NiFi 1.4.0) or uses the same one (prior to NiFi 1.4.0). Then in PartitionRecord you would create two user-defined properties, say record.type and record.id, configured as follows:
Given your example data, you will get 4 flow files, each containing the data from the 4 groups you mention above. Additionally you have record.type and record.id attributes on those flow files. You can route them to UpdateAttribute where you set filename to data.txt and absolute.path to /${type}/${id}. Then you can send them to PutHDFS where you set the Directory to ${absolute.path}.
Created ‎11-16-2017 03:16 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
do we need to have Schema Registry(SR) to use Schemas or can we do this without SR.?
Regards,
Sai
Created ‎11-16-2017 04:31 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Created ‎11-16-2017 04:53 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can do it without a schema registry, if your readers and writers "Use 'Schema Text' Property" and you hardcode the schema into the Schema Text property. Since you're using the same for both reader and writer, it's easier to maintain in a registry, but only a simple copy-paste if you don't want to use the registry.
Created ‎11-17-2017 02:55 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Precisely what I needed. Thanks!
