Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
Master Guru

FlowFile Continuation

Sometimes you need to backup your current running flow, let that flow run at a later date, or make a backup of what is in process now. You want this in a permanent storage and want to reconstitute it later like Orange Juice. And add it back into the flow or restart it.

This could be do to failures, for integration testing, for testing new versions of components, as a checkpoint or for many other purposes. You don't always want to reprocess the original source or files (they may be gone).

Option 1:

You can save that raw data that came in originally in local files or HDFS. Then read it out of there later.

Option 2: Preferred: MergeContent to FlowFileV3 then Reload with Get* to IdentifyMimeType to UnpackContent

Using MergeContent with FlowFileV3 option. After that step you can PutFile, PutS3Object, PutHDFS or other file saving options. Perhaps send it to an FTP or sFTP server for storage elsewhere.

Now you have a pkg file.

cat /opt/demo/flow/904381478117605.pkg
NiFiFF3+tempf73.02sql.args.2.value29.7sql.args.11.type3roll353.9306742667328
                                                                            mqtt.brokertcp://m13.cloudmqtt.com:14162sql.args.4.type3uuid$9f2f8b6f-2870-40a3-a460-49427cddf9a8
mqtt.topicsensorsql.args.7.type3sql.args.7.value353.9306742667328path./sql.args.4.value33.9sql.args.9.value-0.0sql.args.1.type1humidity29.7pitch14.015266431562901
                                                                                                                                                                  nf.file.path.mqtt.qos0sql.args.8.type3temp33.9sql.args.1.value34sql.args.2.type3sql.args.10.type3sql.args.8.value128.4983979122009sql.args.5.type3sql.args.6.value14.015266431562901sql.args.3.value1011.1sql.args.10.value-0.0mqtt.isDuplicatefalspressure1011.1mqtt.isRetainedfalseyaw128.4983979122009cputemp3filename904381478117605sql.args.11.value1.0sql.args.9.type3x-0.0y-0.0z1.0sql.args.6.type3
                                                                                                                                                                                                                          nf.file.name904381478117605sql.args.5.value73.02sql.args.3.type3�[{"tempf": 73.02, "pressure": 1011.1, "pitch": 14.015266431562901, "temp": 33.9, "yaw": 128.4983979122009, "humidity": 29.7, "cputemp": "34", "y": -0.0, "x": -0.0, "z": 1.0, "roll": 353.9306742667328}]%

14276-reconstitueflow.png

You can now reload that FlowFileV3 at any time, send it to IdentifyMimeType (so it knows it's a FlowFileV3) and then use UnpackContent to reconstitute into the original flow file. Now you can use it like it never stopped and was sent to disk. Now you have an unlimited queue to store pre or partially processed files. Saving time! You could run really expensive processes once and save the preprocessed items, files or models and reuse everywhere!

Choose: FlowFile Stream, v3

14277-flowfilereconstituded.png

14278-flowfile3recon2.png

14279-reloaded.png

Thanks to Joe Witt for explanation of the process.

Reference:

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.UnpackContent/

https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.MergeContent/

4,155 Views
Comments

Can this be saved by one NiFi cluster and consumed by another?

i have not tried, but there's no reason you couldn't. and you could send it to a remote cluster via site 2 site