Hi,
I am moving data between hdfs directories to pick the latest updated flow file. The code should verify on the source hdfs directories to pick the latest merged json files time greater than 2 hrs and process to the target along with the sub folders if the target directory doesn't have the directories available. Push the files and append sequence number for every new file received on the same date and delete it from the source directory after processing it. If new files received, then reprocess with new sequence number.
Source hdfs path:-
- /data/json/incoming/year=2019/month=10/day=22/$flow-file
Target hdfs path:-
- /data/json/final/$path/$flow-file
Filename(received) :- source_es_2019_10_21.jsonl
Filename(required post processing) :-
source_es_2019_10_21_1.jsonl
source_es_2019_10_21_2.jsonl
source_es_2019_10_21_3.jsonl
I am currently using nififlow
listhdfs->updateattribute->puthdfs->deletehdfs
I have completed all the activates except generating sequence number for each received flow file for the same date. Could you please check and help.