Created on 03-31-2017 09:39 PM - edited 08-17-2019 01:24 PM
FlowFile Continuation
Sometimes you need to backup your current running flow, let that flow run at a later date, or make a backup of what is in process now. You want this in a permanent storage and want to reconstitute it later like Orange Juice. And add it back into the flow or restart it.
This could be do to failures, for integration testing, for testing new versions of components, as a checkpoint or for many other purposes. You don't always want to reprocess the original source or files (they may be gone).
Option 1:
You can save that raw data that came in originally in local files or HDFS. Then read it out of there later.
Option 2: Preferred: MergeContent to FlowFileV3 then Reload with Get* to IdentifyMimeType to UnpackContent
Using MergeContent with FlowFileV3 option. After that step you can PutFile, PutS3Object, PutHDFS or other file saving options. Perhaps send it to an FTP or sFTP server for storage elsewhere.
Now you have a pkg file.
cat /opt/demo/flow/904381478117605.pkg NiFiFF3+tempf73.02sql.args.2.value29.7sql.args.11.type3roll353.9306742667328 mqtt.brokertcp://m13.cloudmqtt.com:14162sql.args.4.type3uuid$9f2f8b6f-2870-40a3-a460-49427cddf9a8 mqtt.topicsensorsql.args.7.type3sql.args.7.value353.9306742667328path./sql.args.4.value33.9sql.args.9.value-0.0sql.args.1.type1humidity29.7pitch14.015266431562901 nf.file.path.mqtt.qos0sql.args.8.type3temp33.9sql.args.1.value34sql.args.2.type3sql.args.10.type3sql.args.8.value128.4983979122009sql.args.5.type3sql.args.6.value14.015266431562901sql.args.3.value1011.1sql.args.10.value-0.0mqtt.isDuplicatefalspressure1011.1mqtt.isRetainedfalseyaw128.4983979122009cputemp3filename904381478117605sql.args.11.value1.0sql.args.9.type3x-0.0y-0.0z1.0sql.args.6.type3 nf.file.name904381478117605sql.args.5.value73.02sql.args.3.type3�[{"tempf": 73.02, "pressure": 1011.1, "pitch": 14.015266431562901, "temp": 33.9, "yaw": 128.4983979122009, "humidity": 29.7, "cputemp": "34", "y": -0.0, "x": -0.0, "z": 1.0, "roll": 353.9306742667328}]%
You can now reload that FlowFileV3 at any time, send it to IdentifyMimeType (so it knows it's a FlowFileV3) and then use UnpackContent to reconstitute into the original flow file. Now you can use it like it never stopped and was sent to disk. Now you have an unlimited queue to store pre or partially processed files. Saving time! You could run really expensive processes once and save the preprocessed items, files or models and reuse everywhere!
Choose: FlowFile Stream, v3
Thanks to Joe Witt for explanation of the process.
Reference:
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.UnpackContent/
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.MergeContent/
Created on 03-31-2017 10:40 PM
Can this be saved by one NiFi cluster and consumed by another?
Created on 04-28-2017 02:58 AM
i have not tried, but there's no reason you couldn't. and you could send it to a remote cluster via site 2 site