Support Questions

Find answers, ask questions, and share your expertise

How to get notified when a file created in hdfs directory

avatar
Explorer

I have a hdfs directory where files will be created periodically. I need to get notified when there is a file created in its subdirectory so that I can start reading the file from that hdfs location and send the contents to kafka topic. I found this can be done using HDFS iNotify. But there are few disadvantages like instead of a hdfsdirectory, it notifies when the file is created anywhere in the hdfs. It captures all events happening in the hdfs.

 

 

public class HdfsINotifyExample {

	public static void main(String[] args) throws IOException, InterruptedException, MissingEventsException {

		long lastReadTxid = 0;

		if (args.length > 1) {
			lastReadTxid = Long.parseLong(args[1]);
		}

		System.out.println("lastReadTxid = " + lastReadTxid);

		HdfsAdmin admin = new HdfsAdmin(URI.create(args[0]), new Configuration());

		DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream(lastReadTxid);

		while (true) {
			EventBatch batch = eventStream.take();
			System.out.println("TxId = " + batch.getTxid());

			for (Event event : batch.getEvents()) {
				System.out.println("event type = " + event.getEventType());
				switch (event.getEventType()) {
				case CREATE:
					CreateEvent createEvent = (CreateEvent) event;
					System.out.println("  path = " + createEvent.getPath());
					System.out.println("  owner = " + createEvent.getOwnerName());
					System.out.println("  ctime = " + createEvent.getCtime());
					break;
				
				default:
					break;
				}
			}
		}
	}
}

 

 

Instead of monitoring all events type , is there any better way to look only for create file event in a particular hdfs directory. Any shell script or through any other mechanisms?

2 REPLIES 2

avatar
Master Mentor

@nhemamalini 

Within NiFi you could construct a dataflow that uses the listHDFS processor to monitor your directory tree for creation of new files. This processor retains state so it will not list same files over and over each time it checks the HDFS directory tree.  An empty (no content) NiFi FlowFile will be created for each HDFS File listed which you can route to a FetchHDFS processor that would retrieve the content adding it to the FlowFile.   You can then do whatever notifications, transformations, etc via  the available processors in NiFi and use the publishKafika_<version> processor to send the file to your desired Kafka topic.

Hope this helps,

Matt

avatar
Explorer

Thanks Matt. Where the flow file will be created to give to fetch processor. In my scenario , avro files will be created in subdirectories. I saw listhdfs supports hdfs.But i am not able to connect the listhdfs processor to fetch processor