Support Questions

Find answers, ask questions, and share your expertise

Ensure RunMongoAggregation runs after PutMongo

avatar

For my NiFi data flow, I am taking CSV, converting them into JSON, posting those JSON files into a database (PutMongo), and then running an aggregation function (RunMongoAggregation). Attached are images to show that data file. My question is how can I construct my flow so that I can ensure that RunMongoAggregation is hit after PutMongo? I've looked into implementing some Wait/Notify pattern but got really lost in the implementation. Ideally, I would like to have a RunMongoAggregation triggered whenever new data gets into the PutMongo processor.

If anyone would like to test the processor, I have attached the template file. It is tested with AIS ship data (find online)


putmongo.jpgrunmongoaggregation.jpg
1 ACCEPTED SOLUTION

avatar
Master Guru

@Adnan Chowdhury

Instead of PutMongo processor you can use PutMongoRecord processor and you don't need to split the json objects.

By using PutMongoRecord processor your flow looks some thing like below

85741-flow.png

Configure the PutMongoRecord processor RecordReader controller service as CsvReader then processor will reads and put the json objects in MongoDatabase.

Then you can run RunMongoAggregation processor to run the aggregation.

(or)
With your existing flow:
Use Merge Content processor after PutMOngo processor and configure the merge Content processor Merge Strategy as Defragment then this processor merges all the splitted json objects into one file.

Then use Merged relationship from MergeCOntent processor to trigger RunMongoAggregation.

By using this way we are going to wait until all the fragments are merged into one file then only we are triggering RunMongoAggregation Processor.
Flow:

85743-flow.png

Refer to this link for MergeContent configurations.

-

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

View solution in original post

2 REPLIES 2

avatar
Master Guru

@Adnan Chowdhury

Instead of PutMongo processor you can use PutMongoRecord processor and you don't need to split the json objects.

By using PutMongoRecord processor your flow looks some thing like below

85741-flow.png

Configure the PutMongoRecord processor RecordReader controller service as CsvReader then processor will reads and put the json objects in MongoDatabase.

Then you can run RunMongoAggregation processor to run the aggregation.

(or)
With your existing flow:
Use Merge Content processor after PutMOngo processor and configure the merge Content processor Merge Strategy as Defragment then this processor merges all the splitted json objects into one file.

Then use Merged relationship from MergeCOntent processor to trigger RunMongoAggregation.

By using this way we are going to wait until all the fragments are merged into one file then only we are triggering RunMongoAggregation Processor.
Flow:

85743-flow.png

Refer to this link for MergeContent configurations.

-

If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.

avatar

Thank you Shu for your quick response. I followed your instructions regarding the PutMongoRecord processor. That helped simplify my flow a bit.

If the RunMongoAggregation is working as I think it is, then as a FlowFile runs through the processor, it will "trigger" the processor and then route onto original, while the aggregation works on the database and output into "results" (see image below). Is that your thought process? Also -- I probably have to clear out the database I'm aggregating on between data-sets to prevent duplicates, do I not? @Shu


aggregation-trigger.png