Created on 02-08-2020 10:55 PM - edited 02-08-2020 10:56 PM
I have a hdfs directory where files will be created periodically. I need to get notified when there is a file created in its subdirectory so that I can start reading the file from that hdfs location and send the contents to kafka topic. I found this can be done using HDFS iNotify. But there are few disadvantages like instead of a hdfsdirectory, it notifies when the file is created anywhere in the hdfs. It captures all events happening in the hdfs.
public class HdfsINotifyExample {
public static void main(String[] args) throws IOException, InterruptedException, MissingEventsException {
long lastReadTxid = 0;
if (args.length > 1) {
lastReadTxid = Long.parseLong(args[1]);
}
System.out.println("lastReadTxid = " + lastReadTxid);
HdfsAdmin admin = new HdfsAdmin(URI.create(args[0]), new Configuration());
DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream(lastReadTxid);
while (true) {
EventBatch batch = eventStream.take();
System.out.println("TxId = " + batch.getTxid());
for (Event event : batch.getEvents()) {
System.out.println("event type = " + event.getEventType());
switch (event.getEventType()) {
case CREATE:
CreateEvent createEvent = (CreateEvent) event;
System.out.println(" path = " + createEvent.getPath());
System.out.println(" owner = " + createEvent.getOwnerName());
System.out.println(" ctime = " + createEvent.getCtime());
break;
default:
break;
}
}
}
}
}
Instead of monitoring all events type , is there any better way to look only for create file event in a particular hdfs directory. Any shell script or through any other mechanisms?
Created 02-10-2020 05:57 AM
@nhemamalini
Within NiFi you could construct a dataflow that uses the listHDFS processor to monitor your directory tree for creation of new files. This processor retains state so it will not list same files over and over each time it checks the HDFS directory tree. An empty (no content) NiFi FlowFile will be created for each HDFS File listed which you can route to a FetchHDFS processor that would retrieve the content adding it to the FlowFile. You can then do whatever notifications, transformations, etc via the available processors in NiFi and use the publishKafika_<version> processor to send the file to your desired Kafka topic.
Hope this helps,
Matt
Created 02-10-2020 07:14 AM
Thanks Matt. Where the flow file will be created to give to fetch processor. In my scenario , avro files will be created in subdirectories. I saw listhdfs supports hdfs.But i am not able to connect the listhdfs processor to fetch processor