Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Nifi MergeContent to concatenate lines in flat structure

Nifi MergeContent to concatenate lines in flat structure

New Contributor

Hello,

I am trying to perform a merging of a very flat XML structure where each line of a note is broken up into individual records. These lines need to be concatenated together into a single string for processing in the flow. I've read the XML and converted into JSON with the following AVRO schema:

 

{
"namespace": "nifi",
"name": "ROW",
"type": "record",
"fields": [
{ "name": "NOTE_ID", "type": "long" },
{ "name": "LINE", "type": "long" },
{ "name": "NOTE_TEXT", "type": "string" },
{ "name": "personid", "type": "string" },
{ "name": "note_dt", "type": "string" }
]
}

I can guarantee that the order present in the source file has it so that all NOTE_IDs are found contiguous, and in order with LINE incrementing (as an integer). I use SplitXML to get all of these rows extracted from their enclosing <ROWSET>, and am trying to get a single FlowFile for each NOTE_ID, where the NOTE_TEXT is just the concatenated NOTE_TEXT from each line, so the resulting schema is the same as the parent with the elimination of the LINE attribute. Thus:

{
[{"NOTE_ID":" 123456 ","LINE":" 1 ","NOTE_TEXT":"Contents of line ","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 123456 ","LINE":" 2 ","NOTE_TEXT":"are continued ","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 123456 ","LINE":" 3 ","NOTE_TEXT":"throughout.","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 456789 ","LINE":" 1 ","NOTE_TEXT":"This is another ","personid":" 995600123 ","note_dt":" 2013-01-11 "}],
[{"NOTE_ID":" 456789 ","LINE":" 2 ","NOTE_TEXT":"line example.","personid":" 995600123 ","note_dt":" 2013-01-11 "}]
}

should become:

{
[{"NOTE_ID":" 123456 ","NOTE_TEXT":"Contents of line are continued throughout.","personid":" 995600123 ","note_dt":" 2013-01-10 "}],
[{"NOTE_ID":" 456789 ","NOTE_TEXT":"This is another line example.","personid":" 995600123 ","note_dt":" 2013-01-11 "}]
}

The source file here consists initially of millions of notes (broken up into their constituent lines, of course), so it's a fairly large file, but operationally will be much smaller when it goes into day-to-day production. The use-case that I'm trying to put together should handle both sizes of input.

 

I have a Python script that does this using SAX processing and it works fine, but I'd like this to be fully in NiFi.

 

In an ideal flow, this could be accomplished with a MergeContent processor with a defragment merge strategy. However, the number of fragments (individual lines) change from note to note. If I could find a way to attach the line count (fragment.count) to each FlowFile after the SplitXML, I think everything else would work. Being new to the NiFi paradigm, I was wondering how to do this. Alternatively, I suppose that I could execute the Python SAX script at the beginning of the process and write each concatenated note out as a FlowFile.

Any thoughts on the best approach to accomplish what feels like it's a common problem (but searching hasn't yielded any hits yet)?

 

Thanks---Lawrence

Don't have an account?
Coming from Hortonworks? Activate your account here