Created 11-16-2017 05:07 AM
I have a very simple use case but unable to come up with the right combination of processors. So, consider my flowfile coming from a HDFS json file with the following content-
[{"type":"A","id":"001","content":"abc"}, {"type":"A","id":"001","content":"xyz"}, {"type":"A","id":"002","content":"sdf"}, {"type":"B","id":"004","content":"df"}, {"type":"B","id":"002","content":"dsg"}, {"type":"B","id":"002","content":"sfg"}, {"type":"B","id":"004","content":"sfg"}]
I want to finally store these in the following directory structure back in HDFS.
/A/001/data.txt (data.txt-> [{"type":"A","id":"001","content":"abc"},{"type":"A","id":"001","content":"xyz"}]) /A/002/data.txt (data.txt-> [{"type":"A","id":"002","content":"sdf"}]) /B/002/data.txt (data.txt-> [{"type":"B","id":"002","content":"dsg"},{"type":"B","id":"002","content":"sfg"}]) /B/004/data.txt (data.txt-> [{"type":"B","id":"004","content":"df"},{"type":"B","id":"004","content":"sfg"}])
Any ideas?
Created on 11-16-2017 02:49 PM - edited 08-17-2019 11:11 PM
It appears you want to set the destination path to the value of type, followed by the value of id, followed by data.txt, and in the content of that file you want the single-element JSON array containing the object that provided the values. If that is the case:
As of NiFi 1.3.0, there is a PartitionRecord processor which will do most of what you want. You can create a JsonReader using the following example schema:
{"type":"record","name":"test","namespace":"nifi", "fields": [ {"name":"type","type":"string"}, {"name":"id","type":"string"}, {"name":"content","type":"string"} ] }
You can also create a JsonRecordSetWriter that inherits the schema (as of NiFi 1.4.0) or uses the same one (prior to NiFi 1.4.0). Then in PartitionRecord you would create two user-defined properties, say record.type and record.id, configured as follows:
Given your example data, you will get 4 flow files, each containing the data from the 4 groups you mention above. Additionally you have record.type and record.id attributes on those flow files. You can route them to UpdateAttribute where you set filename to data.txt and absolute.path to /${type}/${id}. Then you can send them to PutHDFS where you set the Directory to ${absolute.path}.
Created on 11-16-2017 02:49 PM - edited 08-17-2019 11:11 PM
It appears you want to set the destination path to the value of type, followed by the value of id, followed by data.txt, and in the content of that file you want the single-element JSON array containing the object that provided the values. If that is the case:
As of NiFi 1.3.0, there is a PartitionRecord processor which will do most of what you want. You can create a JsonReader using the following example schema:
{"type":"record","name":"test","namespace":"nifi", "fields": [ {"name":"type","type":"string"}, {"name":"id","type":"string"}, {"name":"content","type":"string"} ] }
You can also create a JsonRecordSetWriter that inherits the schema (as of NiFi 1.4.0) or uses the same one (prior to NiFi 1.4.0). Then in PartitionRecord you would create two user-defined properties, say record.type and record.id, configured as follows:
Given your example data, you will get 4 flow files, each containing the data from the 4 groups you mention above. Additionally you have record.type and record.id attributes on those flow files. You can route them to UpdateAttribute where you set filename to data.txt and absolute.path to /${type}/${id}. Then you can send them to PutHDFS where you set the Directory to ${absolute.path}.
Created 11-16-2017 03:16 PM
do we need to have Schema Registry(SR) to use Schemas or can we do this without SR.?
Regards,
Sai
Created 11-16-2017 04:31 PM
Created 11-16-2017 04:53 PM
You can do it without a schema registry, if your readers and writers "Use 'Schema Text' Property" and you hardcode the schema into the Schema Text property. Since you're using the same for both reader and writer, it's easier to maintain in a registry, but only a simple copy-paste if you don't want to use the registry.
Created 11-17-2017 02:55 AM
Precisely what I needed. Thanks!