Member since
11-02-2018
2
Posts
0
Kudos Received
0
Solutions
03-13-2020
06:55 PM
Hi,
I have an interesting puzzle for Cloudera Dataflow/Apache Nifi. I am reassembling physician notes on a number of different types of content, including radiology notes. I want to do some section detection (e.g., identifying “Findings”, “Summary”, “Lab Results”, etc.). I have a working flow that reassembles the notes from a data structure that has each line as a separate row (not my fault), gets it out of XML format, normalizes the whitespace, determines what type of note it is, attaches metadata, and creates a single flow file per note. I then split a note up into separate paragraphs and identify (from the first line of the paragraph) if it is a particular type of section based on a set of regex, and set a custom attribute to the section name if there is a match. Now each flow file is a paragraph, which might be the beginning of a section. Subsequent paragraphs should be considered part of the same section that was identified earlier until a new section is detected.
I use EnforceOrder to guarantee that the paragraphs are delivered sequentially.
What I now need to:
Create a header section for the first paragraph, and then append any subsequent paragraphs to that one to create a single flow file until the first real section is identified.
Create a single flow file for each section that’s detected and all of its subsequent paragraphs.
I’ve been trying to do this without writing a script or a custom processor. I feel like this might be possible with a Wait/Release paradigm, but don’t know how to approach this as the attribute name will be different (I’m thinking that I could create each attribute with a common name, like “section_findings” or “section_summary” and see when the name changes. I don’t know how to approach the initial section which will have no attribute for section—I would want to apply an attribute to just the first flow file in the sequence.
Any insight as to the best was to approach this?
... View more
- Tags:
- NiFi
Labels:
01-20-2020
06:06 PM
Hello,
I am trying to perform a merging of a very flat XML structure where each line of a note is broken up into individual records. These lines need to be concatenated together into a single string for processing in the flow. I've read the XML and converted into JSON with the following AVRO schema:
{ "namespace": "nifi", "name": "ROW", "type": "record", "fields": [ { "name": "NOTE_ID", "type": "long" }, { "name": "LINE", "type": "long" }, { "name": "NOTE_TEXT", "type": "string" }, { "name": "personid", "type": "string" }, { "name": "note_dt", "type": "string" } ] }
I can guarantee that the order present in the source file has it so that all NOTE_IDs are found contiguous, and in order with LINE incrementing (as an integer). I use SplitXML to get all of these rows extracted from their enclosing <ROWSET>, and am trying to get a single FlowFile for each NOTE_ID, where the NOTE_TEXT is just the concatenated NOTE_TEXT from each line, so the resulting schema is the same as the parent with the elimination of the LINE attribute. Thus:
{ [{"NOTE_ID":" 123456 ","LINE":" 1 ","NOTE_TEXT":"Contents of line ","personid":" 995600123 ","note_dt":" 2013-01-10 "}], [{"NOTE_ID":" 123456 ","LINE":" 2 ","NOTE_TEXT":"are continued ","personid":" 995600123 ","note_dt":" 2013-01-10 "}], [{"NOTE_ID":" 123456 ","LINE":" 3 ","NOTE_TEXT":"throughout.","personid":" 995600123 ","note_dt":" 2013-01-10 "}], [{"NOTE_ID":" 456789 ","LINE":" 1 ","NOTE_TEXT":"This is another ","personid":" 995600123 ","note_dt":" 2013-01-11 "}], [{"NOTE_ID":" 456789 ","LINE":" 2 ","NOTE_TEXT":"line example.","personid":" 995600123 ","note_dt":" 2013-01-11 "}] }
should become:
{ [{"NOTE_ID":" 123456 ","NOTE_TEXT":"Contents of line are continued throughout.","personid":" 995600123 ","note_dt":" 2013-01-10 "}], [{"NOTE_ID":" 456789 ","NOTE_TEXT":"This is another line example.","personid":" 995600123 ","note_dt":" 2013-01-11 "}] }
The source file here consists initially of millions of notes (broken up into their constituent lines, of course), so it's a fairly large file, but operationally will be much smaller when it goes into day-to-day production. The use-case that I'm trying to put together should handle both sizes of input.
I have a Python script that does this using SAX processing and it works fine, but I'd like this to be fully in NiFi.
In an ideal flow, this could be accomplished with a MergeContent processor with a defragment merge strategy. However, the number of fragments (individual lines) change from note to note. If I could find a way to attach the line count (fragment.count) to each FlowFile after the SplitXML, I think everything else would work. Being new to the NiFi paradigm, I was wondering how to do this. Alternatively, I suppose that I could execute the Python SAX script at the beginning of the process and write each concatenated note out as a FlowFile.
Any thoughts on the best approach to accomplish what feels like it's a common problem (but searching hasn't yielded any hits yet)?
Thanks---Lawrence
... View more
Labels: