Support Questions

Find answers, ask questions, and share your expertise

How can I decompose an XML based on one of the attributes and then create a buffered set of queues hashed by that attribute

In the following XML snippet you will see a field called "sourceFacility". I will receive many such <fltdMessages> in one <dataOutput>. I want to create a buffered output stream that is keyed by the sourceFacility. Each of these buffers would then be published to a different kafka topic. The buffering would be a buffer in time, since I'd like to build up messages per sourceFacility over the course of a roughly 100msec moving window. In other words, I'd receive and process a series of these dataService messages that I'd want to demultiplex into a set of buffered queues.

<?xml version="1.0" encoding="UTF-8"?>
<dataService>   <dataOutput>      <fltdMessage acid="QXE2828" airline="QXE" arrArpt="KSJC" cdmPart="false" depArpt="KPAE" fdTrigger="HCS_TRACK_MSG" flightRef="97606500" msgType="trackInformation" sensitivity="A" sourceFacility="KZSE" sourceTimeStamp="2019-04-05T16:44:30Z">         <trackInformation>            <qualifiedAircraftId>               <aircraftId>QXE2828</aircraftId>               <computerId>                  <facilityIdentifier>KZSE</facilityIdentifier>                  <idNumber>657</idNumber>               </computerId>               <gufi>KS49063601</gufi>               <igtd>2019-04-05T15:30:00Z</igtd>               <departurePoint>                  <airport>KPAE</airport>               </departurePoint>               <arrivalPoint>                  <airport>KSJC</airport>               </arrivalPoint>            </qualifiedAircraftId>            <speed>378</speed>            <reportedAltitude>               <assignedAltitude>                  <simpleAltitude>330C</simpleAltitude>               </assignedAltitude>            </reportedAltitude>            <position>               <latitude>                  <latitudeDMS degrees="41" direction="NORTH" minutes="14" seconds="14" />               </latitude>               <longitude>                  <longitudeDMS degrees="122" direction="WEST" minutes="49" seconds="03" />               </longitude>            </position>            <timeAtPosition>2019-04-05T16:44:30Z</timeAtPosition>            <ncsmTrackData>               <eta etaType="ESTIMATED" timeValue="2019-04-05T17:29:51Z" />               <rvsmData currentCompliance="true" equipped="true" futureCompliance="true" />               <arrivalFixAndTime arrTime="2019-04-05T17:13:34Z" fixName="ZINNN" />               <nextEvent latitudeDecimal="39.125619878405274" longitudeDecimal="-122.59562140202789" />            </ncsmTrackData>         </trackInformation>      </fltdMessage>   </dataOutput>

Thank you in advance for any ideas on how to do this. I imagine that only part of the processing would be done in nifi. Its not clear to me that nifi would be appropriate for the buffering piece.

Update on April 11, 2019

I'd like to update this question, hopefully in order to better articulate my need. I think that I can take a message of the above type, and decompose it into several flowfiles, each flowfile containing an attribute pertaining to the "facilityIdentifier" found in the content (<facilityIdentifier>KZSE</facilityIdentifier>). NIFI then naturally sends each flowfile into a downstream queue. I will use "routetoattribute" to create a queue per source facility.

Is there then a processor that will take multiple flow-files from a queue and create a new flow file that contains a list of the input flowfiles?

The key is that for network performance reasons I do NOT want to send one flow file per each send. I want to send multiple flow files in a single network send to kafka.

So in the above example, I'd have one queue for KZSE, a separate queue for each of the other distinct facilityIdentifiers, and I'd like to find a NIFI processor that will combine a list of single-record KZSE flow files into a multi-record KZSE list flowfile. Id then schedule the processor to run every 100 msec. If there are 5 messages in the queue when the processor awakes it would place them all into a flowfile that combines those 5 into a list that is 5 long.

A further investigation suggest that MergeRecord, or MergeContent may be exactly what I need. However I'd like to make sure that they do not wait for a "queue full" to perform the merge. I want to be able to specify a "max time in input queue" (max time to buffer), to ensure that I dont introduce too much latency.

The flow that I have so far is as follows. See comment below for further detail



Wow.. I figured it out. The essence of the solution was to reduce my data to the form seen in the following, and then to create an attribute on each flow file that corresponded to the facilityIdentifier. Then I was able to use MergeRecord with an appropriate binning strategy.

I've gotten the flow to a point where I've reduced my flow-file record to the following:

  "aircraftId" : "JBU975",
  "facilityIdentifier" : "KZDC",
  "departure_airport" : "KPHL",
  "arrival_airport" : "KFLL",
  "gufi" : "KN53659300",
  "igtd" : "2019-04-09T18:00:00Z",
  "speed" : 428,
  "assigned_altitude" : 245,
  "latitude" : "MapRecord[{seconds=41, minutes=47, degrees=38, direction=NORTH}]",
  "longitude" : "MapRecord[{seconds=35, minutes=26, degrees=075, direction=WEST}]",
  "timeAtPosition" : "2019-04-09T18:28:53Z"

I have a queue of these that I now need to process. My goal is to effectively route on the facilityIdentifier value.

Each flow-file has an attribute that is the facilityIdentifier. In the following you see that FACILITY_ID is KZDC.


Now the MergeRecord configuration (notice the Correlation Attribute Name):


The end result, is I placed over 90 flow-files into just 9

107826-1555012506352.pngMergeRecord preserved the attribute FACILITY_ID. So now I can use this to push data to an appropriate Kafka topic

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.