Support Questions
Find answers, ask questions, and share your expertise

Nifi MergeContent to concatenate lines in flat structure

Nifi MergeContent to concatenate lines in flat structure

New Contributor

Hello,

I am trying to perform a merging of a very flat XML structure where each line of a note is broken up into individual records. These lines need to be concatenated together into a single string for processing in the flow. I've read the XML and converted into JSON with the following AVRO schema:

 

{
"namespace": "nifi",
"name": "ROW",
"type": "record",
"fields": [
{ "name": "NOTE_ID", "type": "long" },
{ "name": "LINE", "type": "long" },
{ "name": "NOTE_TEXT", "type": "string" },
{ "name": "personid", "type": "string" },
{ "name": "note_dt", "type": "string" }
]
}

I can guarantee that the order present in the source file has it so that all NOTE_IDs are found contiguous, and in order with LINE incrementing (as an integer). I use SplitXML to get all of these rows extracted from their enclosing <ROWSET>, and am trying to get a single FlowFile for each NOTE_ID, where the NOTE_TEXT is just the concatenated NOTE_TEXT from each line, so the resulting schema is the same as the parent with the elimination of the LINE attribute. Thus:

{
[{"NOTE_ID":" 123456 ","LINE":" 1 ","NOTE_TEXT":"Contents of line ","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 123456 ","LINE":" 2 ","NOTE_TEXT":"are continued ","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 123456 ","LINE":" 3 ","NOTE_TEXT":"throughout.","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 456789 ","LINE":" 1 ","NOTE_TEXT":"This is another ","personid":" 995600123 ","note_dt":" 2013-01-11 "}],
[{"NOTE_ID":" 456789 ","LINE":" 2 ","NOTE_TEXT":"line example.","personid":" 995600123 ","note_dt":" 2013-01-11 "}]
}

should become:

{
[{"NOTE_ID":" 123456 ","NOTE_TEXT":"Contents of line are continued throughout.","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 456789 ","NOTE_TEXT":"This is another line example.","personid":" 995600123 ","note_dt":" 2013-01-11 "}]
}

The source file here consists initially of millions of notes (broken up into their constituent lines, of course), so it's a fairly large file, but operationally will be much smaller when it goes into day-to-day production. The use-case that I'm trying to put together should handle both sizes of input.

 

I have a Python script that does this using SAX processing and it works fine, but I'd like this to be fully in NiFi.

 

In an ideal flow, this could be accomplished with a MergeContent processor with a defragment merge strategy. However, the number of fragments (individual lines) change from note to note. If I could find a way to attach the line count (fragment.count) to each FlowFile after the SplitXML, I think everything else would work. Being new to the NiFi paradigm, I was wondering how to do this. Alternatively, I suppose that I could execute the Python SAX script at the beginning of the process and write each concatenated note out as a FlowFile.

Any thoughts on the best approach to accomplish what feels like it's a common problem (but searching hasn't yielded any hits yet)?

 

Thanks---Lawrence

Don't have an account?