Support Questions

Find answers, ask questions, and share your expertise

Apache NiFi: FetchFTP - not.found connection not working

avatar
Explorer

We are trying to build flow which will download csv file from FTP server and load it in database. While doing that we have to check for files existance in FTP server. To do so we have used ListFTP->FetchFTP and used connections of FectFTP for routing. Now if file is present on server, flow works correctly on success connection but in case of file not found it is terminating all other components by putting file not found exception at component and log file level. But we want to notify use by email or by saparate log file entry that system was not able to locate file. Please help us achieving this. I am using nifi 1.5

9 REPLIES 9

avatar
Master Mentor
@Ramkrishna Utpat

If I am following your flow description correctly, I sounds like you just need to route the "not.found" relationship from the FetchFTP processor to PutEmail processor.

Thank you,

Matt

avatar
Explorer

@Matt Clarke,

Yes it is correct. But it not just limited to PutEmail, i should be able to write custom message to log file as well (not necessory saparate log file, standard log file will work too).

Thanks

avatar
Master Mentor
@Ramkrishna Utpat

For writing a custom log message to the nifi-app.log, you can use the "LogMessage" processor.

avatar
Explorer

@Matt Clarke,

Used the same, but fetchFTP is not rounting control to LogMessage. Below is some part of log file.

2018-03-15 08:38:58,951 ERROR [Timer-Driven Process Thread-3] o.a.nifi.processors.standard.FetchFTP FetchFTP[id=28050d7f-0162-1000-ffff-ffff8f62fee5] Failed to fetch content for StandardFlowFileRecord[uuid=f138abe6-732d-471e-b931-0c15a64f9cb7,claim=,offset=0,name=6.csv,size=0] from filename data/ram.csv on remote host 172.27.15.28:21 due to java.io.IOException: 550 File not found ; routing to comms.failure: java.io.IOException: 550 File not found java.io.IOException: 550 File not found at org.apache.nifi.processors.standard.util.FTPTransfer.getInputStream(FTPTransfer.java:302) at org.apache.nifi.processors.standard.FetchFileTransfer.onTrigger(FetchFileTransfer.java:238) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1122) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-03-15 08:38:59,253 INFO [Process Cluster Protocol Request-7] o.a.n.c.p.impl.SocketProtocolListener Finished processing request 6966e9f9-55da-4ec3-90d4-8a45c3bafacc (type=HEARTBEAT, length=2949 bytes) from 172.27.56.233:8070 in 0 millis 2018-03-15 08:38:59,253 INFO [Clustering Tasks Thread-1] o.a.n.c.c.ClusterProtocolHeartbeater Heartbeat created at 2018-03-15 08:38:59,252 and sent to 172.27.56.233:8020 at 2018-03-15 08:38:59,253; send took 1 millis 2018-03-15 08:38:59,364 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@6b484770 // Another save pending = false 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.c.s.StandardProcessScheduler Stopping PutFile[id=28063848-0162-1000-0000-00005863f5de] 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.PutFile 2018-03-15 08:39:01,662 INFO [StandardProcessScheduler Thread-6] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling PutFile[id=28063848-0162-1000-0000-00005863f5de] to run 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.c.s.StandardProcessScheduler Stopping LogMessage[id=0dfa30aa-65d8-1034-8f3b-fc27cc88c4bb] 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.LogMessage 2018-03-15 08:39:01,662 INFO [StandardProcessScheduler Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling LogMessage[id=0dfa30aa-65d8-1034-8f3b-fc27cc88c4bb] to run 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.c.s.StandardProcessScheduler Stopping ListFTP[id=280441e3-0162-1000-ffff-ffffec729c2d] 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.ListFTP 2018-03-15 08:39:01,662 INFO [StandardProcessScheduler Thread-7] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling ListFTP[id=280441e3-0162-1000-ffff-ffffec729c2d] to run 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.c.s.StandardProcessScheduler Stopping FetchFTP[id=28050d7f-0162-1000-ffff-ffff8f62fee5] 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.FetchFTP 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.c.s.StandardProcessScheduler Stopping LogMessage[id=08f9300f-a3fd-1126-a12b-b501583020d7] 2018-03-15 08:39:01,662 INFO [NiFi Web Server-1921] o.a.n.controller.StandardProcessorNode Stopping processor: class org.apache.nifi.processors.standard.LogMessage 2018-03-15 08:39:01,662 INFO [StandardProcessScheduler Thread-2] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling FetchFTP[id=28050d7f-0162-1000-ffff-ffff8f62fee5] to run 2018-03-15 08:39:01,663 INFO [StandardProcessScheduler Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent Stopped scheduling LogMessage[id=08f9300f-a3fd-1126-a12b-b501583020d7] to run 2018-03-15 08:39:02,078 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@6b484770 // Another save pending = false

avatar
Master Mentor

@Ramkrishna Utpat

Interesting that it is being logged as a comms.failure.
How is the data being written to the FTP server?
Is there maybe a lock file preventing the fetchFTP user from being able to access/delete this file?
Do you confirm the file is really missing from the FTP server?
Are there multiple systems/nodes trying to pull this data?
If the file is still exists, is it successful if you try to fetch files routed to comms.failure again?

avatar
Master Mentor

@Ramkrishna Utpat

Tip: try to avoid staring a new "Answer" when responding to an existing "Answer" thread in HCC.

In a NiFi cluster, the ListFTP processor should only be running on "Primary node" only. FTP is not a cluster friendly protocol. You get yourself in to a race condition by having all nodes running the list based processor components.

To distribute work load across your entire cluster, you should be feeding the listed files to a Remote Process Group (RPG). The RPG should be pointing at same cluster. The list FlowFiles will be load-balanced to all nodes in your cluster by sending to a remote "input port". That input port should feed your FetchFTP processor. That way each node is ingesting unique data from the FPT server and you don't have issues where multiple nodes are trying to retrieve same data.

Helpful links on RPGs:

https://community.hortonworks.com/articles/16461/nifi-understanding-how-to-use-process-groups-and-r....

https://community.hortonworks.com/content/kbentry/109629/how-to-achieve-better-load-balancing-using-...

Thank you,

Matt

avatar
Master Mentor

@Ramkrishna Utpat

My recommendation at this time would be to raise an Apache Jira against NiFi.

The error is being returned by the hive client library to NiFi. NiFi is taking that client response which is the Hive server response to the client library and making a routing decision. Not sure why NiFi feels this specific error is a comms.failure instead of a not.found condition.

May be something in response, may be something in NiFi code itself is missing.

There is nothing else we can configure from the NiFi processor side here.

Thank you,

Matt

avatar
Explorer
@Matt Clarke

Thanks for reply. I have did what you have suggested. But even after that i was not able to route floe to not.found, it is still getting routed to comms.failure. My Requirement is i have download specific file from FTP and if that file not found it should get routed to not.found. Attache flow screenshot for reference.flow.png

avatar
Explorer

@Matt Clarke

Thanks for quick response.

Below are answers for your queries.

How is the data being written to the FTP server?

  • We have created local FTP server using Filezilla on windows machine and accessing this FTP from centos machine. Data is copy pasted by end user to shared folder.

Is there maybe a lock file preventing the fetchFTP user from being able to access/delete this file?

If the file is still exists, is it successful if you try to fetch files routed to comms.failure again?

  • There is no lock on file; file can be accessed by anyone from anywhere. If file is present on FTP server entire flow works perfectly fine including LogMessage and PutEmail components.

Do you confirm the file is really missing from the FTP server?

  • File is not missing but wrong named file is placed in folder. In case of no single file in folder nothing is happening.

Are there multiple systems/nodes trying to pull this data?

About system no, but for nodes it is possible because our NiFi system is cluster of three nodes.