Support Questions

Find answers, ask questions, and share your expertise

How to read an email using Apache Nifi and extract attachment

avatar
Expert Contributor

Hi , I have a requirement where by I need to read a specific email in my inbox , extract the attachment and push the attachment onto HDFS. At a high level , i think i need to use the comsume imap processor and then the extract email header and extract email attachment processors to achieve this however I need some idea on how to implement this.

@Shu have you worked on this kind of a request before ? If so , then I would love to hear from you . Thanks

1 ACCEPTED SOLUTION

avatar
Master Guru

@Abhinav Joshi

To filter out specific email use RouteonAttribute Processor.
Flow:-

Consume Imap/ConsumePop3 processors --> ExtractEmailHeader Processor --> RouteOnAttribute(filter required email) -->ExtractEmailAttachment(use attachments relation to feed) --> PutHDFS
Example:-
Consume Pop3 Processors:-

72511-consumepop3.png
Configure username,password,change the required settings in gmail and add properties to the processor as

mail.pop3.socketFactory.class

javax.net.ssl.SSLSocketFactory

mail.pop3.socketFactory.fallback

false

use this link for more details about configurations.

Consume Imap processor:-

72512-consumeimap.png

Configure the processor as shown above
use this link for more details about configurations.

Extract Email Headers:-
Change the configs of the processor if you want to add additional headers.

Once the flowfile processed by this processor then below list of attributes will be added to the flowfile.

NameDescription
email.headers.bcc.*Each individual BCC recipient (if available)
email.headers.cc.*Each individual CC recipient (if available)
email.headers.from.*Each individual mailbox contained in the From of the Email (array as per RFC-2822)
email.headers.message-idThe value of the Message-ID header (if available)
email.headers.received_dateThe Received-Date of the message (if available)
email.headers.sent_dateDate the message was sent
email.headers.subjectSubject of the message (if available)
email.headers.to.*Each individual TO recipient (if available)
email.attachment_countNumber of attachments of the message

RouteOnAttribute:-
Add new property

Required mail

${anyMatchingAttribute("email.headers.from.*"):contains("test"):and(${email.attachment_count:gt(0)})}//if any of theattribute contains(i.e substring) test in it and email.attachment_count greater than 0

(or)

${anyMatchingAttribute("email.headers.from.*"):equals("test <test@gmail.com>"):and(${email.attachment_count:gt(0)})} //we are comparing from.* with equals in this case we are checking with exact email from value and attachement count is greater than 0

what is anyMatchingAttribute("email.headers.from.*")? Checks to see if any of the given attributes, match the given condition is email.headers.from.0, email.headers.from.1..etc
As per your requirement you can add condition to filter out the specific email.

Refer this link for more details about NiFi expression language.

ExtractEmailAttachements:-

Use this process to extract the email attachements and each attachement will be splitted into individual flowfiles.

Then use put hdfs processor to store the attachement into HDFS directory.

In addition

if you want to even filter out only the required filenames then use any of the below attributes in RouteOnAttribute processor.

Below are the list of attributes that are added by ExtractEmailAttachements processor

NameDescription
filenameThe filename of the attachment
email.attachment.parent.filenameThe filename of the parent FlowFile
email.attachment.parent.uuidThe UUID of the original FlowFile.
mime.typeThe mime type of the attachment.

View solution in original post

8 REPLIES 8

avatar
Master Guru

@Abhinav Joshi

To filter out specific email use RouteonAttribute Processor.
Flow:-

Consume Imap/ConsumePop3 processors --> ExtractEmailHeader Processor --> RouteOnAttribute(filter required email) -->ExtractEmailAttachment(use attachments relation to feed) --> PutHDFS
Example:-
Consume Pop3 Processors:-

72511-consumepop3.png
Configure username,password,change the required settings in gmail and add properties to the processor as

mail.pop3.socketFactory.class

javax.net.ssl.SSLSocketFactory

mail.pop3.socketFactory.fallback

false

use this link for more details about configurations.

Consume Imap processor:-

72512-consumeimap.png

Configure the processor as shown above
use this link for more details about configurations.

Extract Email Headers:-
Change the configs of the processor if you want to add additional headers.

Once the flowfile processed by this processor then below list of attributes will be added to the flowfile.

NameDescription
email.headers.bcc.*Each individual BCC recipient (if available)
email.headers.cc.*Each individual CC recipient (if available)
email.headers.from.*Each individual mailbox contained in the From of the Email (array as per RFC-2822)
email.headers.message-idThe value of the Message-ID header (if available)
email.headers.received_dateThe Received-Date of the message (if available)
email.headers.sent_dateDate the message was sent
email.headers.subjectSubject of the message (if available)
email.headers.to.*Each individual TO recipient (if available)
email.attachment_countNumber of attachments of the message

RouteOnAttribute:-
Add new property

Required mail

${anyMatchingAttribute("email.headers.from.*"):contains("test"):and(${email.attachment_count:gt(0)})}//if any of theattribute contains(i.e substring) test in it and email.attachment_count greater than 0

(or)

${anyMatchingAttribute("email.headers.from.*"):equals("test <test@gmail.com>"):and(${email.attachment_count:gt(0)})} //we are comparing from.* with equals in this case we are checking with exact email from value and attachement count is greater than 0

what is anyMatchingAttribute("email.headers.from.*")? Checks to see if any of the given attributes, match the given condition is email.headers.from.0, email.headers.from.1..etc
As per your requirement you can add condition to filter out the specific email.

Refer this link for more details about NiFi expression language.

ExtractEmailAttachements:-

Use this process to extract the email attachements and each attachement will be splitted into individual flowfiles.

Then use put hdfs processor to store the attachement into HDFS directory.

In addition

if you want to even filter out only the required filenames then use any of the below attributes in RouteOnAttribute processor.

Below are the list of attributes that are added by ExtractEmailAttachements processor

NameDescription
filenameThe filename of the attachment
email.attachment.parent.filenameThe filename of the parent FlowFile
email.attachment.parent.uuidThe UUID of the original FlowFile.
mime.typeThe mime type of the attachment.

avatar
New Contributor

@Shu_ashu @abhinav_joshi  Could you please help me on what processor to be used for only extract limited items from flow file after Extract from header processor processed. My purpose is to get only limited items like from, to subject, attachment count and body (word cloud)

 

Could you please provide the Flow file if possible ?

 

I did not below steps. should i create new processor to fetch details from each flow file ?

 

Extract Email Headers:-
Change the configs of the processor if you want to add additional headers.

Once the flowfile processed by this processor then below list of attributes will be added to the flowfile.

Name Description

email.headers.bcc.*Each individual BCC recipient (if available)
email.headers.cc.*Each individual CC recipient (if available)
email.headers.from.*Each individual mailbox contained in the From of the Email (array as per RFC-2822)
email.headers.message-idThe value of the Message-ID header (if available)
email.headers.received_dateThe Received-Date of the message (if available)
email.headers.sent_dateDate the message was sent
email.headers.subjectSubject of the message (if available)
email.headers.to.*Each individual TO recipient (if available)
email.attachment_countNumber of attachments of the message

avatar
Expert Contributor

Hi @Shu_ashu 

 

We have an issue with our Consume Imap Processor. While this has been running good for the last 2 years , these days we see that we have an email that has an attachment that is of size 5 MB.  The consume IMAP processor seems to take ages to read this file almost 3 hours .  We have observed that if the attachment is less than 5 MB , the reading is quick however for 5 MB and greater it seems to get stuck. 

 

We have increased the JVM memory as well. Added Java tuning parameters to the bootstrap conf file . However nothing is helping. Can you suggest an improvement tweak here please 

 

 

 

 

avatar
New Contributor

Hi.
From an outlook.live.com account, I want to read another folder other than INBOX, specifically one called VEGA and located inside INBOX. I have tried INBOX/VEGA INBOX\VEGA and others in different ways, but it returns an error.
2022-08-12 18:03:18,309 ERROR [Timer-Driven Process Thread-8] o.a.nifi.processors.email.ConsumePOP3 ConsumePOP3[id=92a5a620-0182-1000-f4ca-a827c5fd2f4f] Processing halted: yielding [1 sec]
java.lang.IllegalStateException: no such folder [INBOX\VEGA]


I need help.
Thanks.

 

 

avatar
Expert Contributor

Hey Thanks alot @Shu . As always the answer is detailed and still to the point .One quick question before I start using an exchange server or for that case gmail. I will have to open the firewalls isnt it so that my Nifi Linux server can listen to the Gmail or any other exchange server isnt it ?

avatar
Master Guru
@Abhinav Joshi

Yes if the ports are blocked we need to open them from NiFi servers.The above example i tried on my local NiFi instance and there are no ports are blocked.

avatar
Expert Contributor

Thanks @Shu .. I will definitely get this done and then share the template and all the details so that others can benefit just as others are benefiting with your detailed answers ..

avatar
New Contributor
@Abhinav Joshi

Did you manage to get it working?

@Shu I followed your flow example with the configurations and didn't encounter any error or warning. However, the first processor (ConsumeIMAP/POP3) seems to run indefinitely and the flow is stuck there. I double-checked the fetch size is 10. Do you have any idea as to what could've gone wrong?