Created on 08-04-201611:03 AM - edited 08-17-201910:57 AM
Dataflow (HDF) powered by Apache NiFi, kafka and Storm, collects, curates,
analyzes and delivers real-time data from the IoAT to data stores both
on-premises and in the cloud. Apache
NiFi automates and manages the flow of information between systems. NiFi data
flows are made of series of processors each with specific task. NiFi provides
hundreds of general purpose processors. NiFi can pull data from various sources.
This document mainly discusses in detail the integration with AWS S3 data
S3 is cloud storage for the Internet. To upload your data (photos, videos,
documents etc.), you first create a bucket in one of the AWS regions. You can
then upload any number of objects to the bucket. S3 buckets and objects are
resources, and Amazon S3 provides APIs for you to manage them.
many existing AWS customers there is need to integrate with S3 to process data
across multiple applications. NiFi provides many processors to manage and
process S3 objects integrating with S3 buckets. This document outlines the
detail setup and configuration to integrate S3 with Apache NiFi.
Business Use cases
Many customers are utilizing the Amazon S3 storage service to build applications.The application types can be backup & archiving, content storage,
big data analytics, cloud native application data or DR.
S3 can be utilized as persistent or temporary
storage. Many applications need to process the data as lands into a S3 bucket,
retrieve the content and log the metadata regarding the S3 object. AWS supports
following destinations where it can publish S3 related events.
Amazon SNS topic
Amazon SQS queue
This document describes putting
and extracting data object from amazon S3 using Apache NiFi leveraging the
Amazon SQS notifications.
Architecture with Apache NiFi
The main purpose of the document is to showcase
the ease of integration with S3 using Apache NiFi.
In the sample demo scenario:
NiFi instance creates the data object to S3 bucket using PutS3Object
As the new
object is created at S3, it sends out notification in JSON object form to
amazon SQS queue.
processor subscribes to the SQS event and retrieves the newly created S3 object
FetchS3Object extracts the content of newly created object and sends it downstream systems for further processing.
describes the setup and configuration on AWS side (SQS & S3 bucket) based
on the scenario described in the previous section. Make sure to login to the AWS
dashboard and select appropriate product section to configure the S3 buckets
and SQS queues.
One of the
ways to monitor S3 bucket is to use SQS notifications.
the SQS Queue as shown below e.g. NiFiEvent.
the security and appropriate permissions to the SQS queue so that SQS queue can
be utilized for the S3 bucket events.
the SQS configuration is done, create the S3 bucket (e.g. mphdf).
Adding a folder named "orderEvent" to the S3
bucket. Go to the properties section and make sure to configure Permissions,
Event notification and policy to the S3 bucket.
permissions, add the appropriate account to include list, upload, delete, view
and Edit permissions to the bucket.
Creating an AWS
bucket policy, so that specific accounts have permissions to manage the S3
output will be JSON object which you can update in the bucket configuration.
configure the event notification to publish all the relevant events to SQS
queues which we created earlier. Make sure to select the Type as SQS.
The AWS side configuration is now complete. Next, build the NiFi dataflow using the NiFi processors
for S3 buckets and SQS.
NiFi DataFlow Configuration
demonstrate the S3 integration I modified the existing NiFi data flow. There
are two NiFi dataflows, one to publish the object to S3 and second one is to
extract the object content through SQS notification.
1 : Accept the ERP Events --> transform to JSON --> PutS3Object
processor creates the object in S3 bucket. The configuration is shown
below. Make sure to use the correct
bucket name, owner (aws account) , access key and secret key. To get the Access
and secret keys go the AWS IAM console -> users ->security
credentials->create access keys. You can download or select Show User
Security Credentials options to get Access key and Secret access key. Make sure
to select the correct Region.
fetches messages from an AWS SQS queue. The Queue URL you can get from AWS SQS
Retrieves the contents of an S3
Object and writes it to the content of a FlowFile. The "object key" is nothing
but the key attribute value from incoming SQS notification JSON message. The "objectname" property is populated by JsonPath expression $.Records[*].s3.object.key
Testing NiFi DataFlow
purpose, the existing dataflow is used which:
pulls the ERP events from JMS queue. The
event xml is mapped to JSON object and the object is pushed to AWS S3 bucket.
The successful creation of S3 object triggers
a notification to SQS queue.
The second NiFi dataflow reads the SQS event
through GetSQS processor. It parses the JSON metadata information and extracts
the object name from SQS message.
Next it uses the fetchS3Object NiFi processor
to extract the S3 object content and then writes it to the local file system.
are some data provenance screenshots from NiFi which proves the successful
creation of S3 object and then processing the SQS notification to extract and
process the S3 object through NiFi in near real time fashion.
PutS3Object data provenance
browser view (shows the newly created object).
GetSQS data provenance
Showed the SQS event getting triggered for newly created S3
Sample SQS Notification message
The highlighted value shows
the object name which will be extracted and used to retrieve the object from S3