I'm trying to create a flow that can received multiple separate messages that each of part of a "complete" record. Once all the parts of a given record are received, then processes that record. The parts of the record can be received out of order and intermixed together. Example data (2 parts to data in example. Some records have more then 2 parts):
Each record is received on a different input. The order of which comes first is not guaranteed. Multiple location or file records may be received before the other part of a given id is received.
My thought right now is to have a custom service that maintains a map of <id,parts[pair(part_label,part_value)]>. Then have custom processors:
1) Put- when flow file is received from each input, add given attribute (measurement_file, measurement_location, etc) into map using id as key. attribute to store and attribute for key would be properties. Access to map is provided via service api
2) Get- Loop over map at a given interval and look for any id that has required number of pieces (2 in this example). For each id that has all pieces, create file flow and transfer for processing.
I'd like to have the storage is something beyond just memory so it can persist.
Is there a way to do this with existing processors/services/cachemaps?
Any thoughts, comments or suggestions for better approaches are greatly appreciated.