Created on 08-04-2016 11:03 AM - edited 08-17-2019 10:57 AM
Introduction
Hortonworks Dataflow (HDF) powered by Apache NiFi, kafka and Storm, collects, curates, analyzes and delivers real-time data from the IoAT to data stores both on-premises and in the cloud. Apache NiFi automates and manages the flow of information between systems. NiFi data flows are made of series of processors each with specific task. NiFi provides hundreds of general purpose processors. NiFi can pull data from various sources. This document mainly discusses in detail the integration with AWS S3 data source.
Amazon S3 is cloud storage for the Internet. To upload your data (photos, videos, documents etc.), you first create a bucket in one of the AWS regions. You can then upload any number of objects to the bucket. S3 buckets and objects are resources, and Amazon S3 provides APIs for you to manage them.
With many existing AWS customers there is need to integrate with S3 to process data across multiple applications. NiFi provides many processors to manage and process S3 objects integrating with S3 buckets. This document outlines the detail setup and configuration to integrate S3 with Apache NiFi.
Business Use cases
Many customers are utilizing the Amazon S3 storage service to build applications.The application types can be backup & archiving, content storage, big data analytics, cloud native application data or DR.
S3 can be utilized as persistent or temporary storage. Many applications need to process the data as lands into a S3 bucket, retrieve the content and log the metadata regarding the S3 object. AWS supports following destinations where it can publish S3 related events.
This document describes putting and extracting data object from amazon S3 using Apache NiFi leveraging the Amazon SQS notifications.
Solution Architecture with Apache NiFi
The main purpose of the document is to showcase the ease of integration with S3 using Apache NiFi.
In the sample demo scenario:
AWS Configurations
This section describes the setup and configuration on AWS side (SQS & S3 bucket) based on the scenario described in the previous section. Make sure to login to the AWS dashboard and select appropriate product section to configure the S3 buckets and SQS queues.
SQS
One of the ways to monitor S3 bucket is to use SQS notifications.
First create the SQS Queue as shown below e.g. NiFiEvent.
Configure the security and appropriate permissions to the SQS queue so that SQS queue can be utilized for the S3 bucket events.
S3 Bucket
Once the SQS configuration is done, create the S3 bucket (e.g. mphdf).
Adding a folder named "orderEvent" to the S3 bucket. Go to the properties section and make sure to configure Permissions, Event notification and policy to the S3 bucket.
For permissions, add the appropriate account to include list, upload, delete, view and Edit permissions to the bucket.
Creating an AWS bucket policy, so that specific accounts have permissions to manage the S3 bucket objects.
The policy output will be JSON object which you can update in the bucket configuration.
Next configure the event notification to publish all the relevant events to SQS queues which we created earlier. Make sure to select the Type as SQS.
The AWS side configuration is now complete. Next, build the NiFi dataflow using the NiFi processors for S3 buckets and SQS.
NiFi DataFlow Configuration
To demonstrate the S3 integration I modified the existing NiFi data flow. There are two NiFi dataflows, one to publish the object to S3 and second one is to extract the object content through SQS notification.
Flow 1 : Accept the ERP Events --> transform to JSON --> PutS3Object
Flow 2 : GetSQS notification -->Get Object metadata --> FetchS3Object
The complete flow is shown below.
PutS3Object Processor
This processor creates the object in S3 bucket. The configuration is shown below. Make sure to use the correct bucket name, owner (aws account) , access key and secret key. To get the Access and secret keys go the AWS IAM console -> users ->security credentials->create access keys. You can download or select Show User Security Credentials options to get Access key and Secret access key. Make sure to select the correct Region.
GetSQS Processor
IT fetches messages from an AWS SQS queue. The Queue URL you can get from AWS SQS details section.
FetchS3Object Processor
Retrieves the contents of an S3 Object and writes it to the content of a FlowFile. The "object key" is nothing but the key attribute value from incoming SQS notification JSON message. The "objectname" property is populated by JsonPath expression $.Records[*].s3.object.key
Testing NiFi DataFlow
For testing purpose, the existing dataflow is used which:
Following are some data provenance screenshots from NiFi which proves the successful creation of S3 object and then processing the SQS notification to extract and process the S3 object through NiFi in near real time fashion.
PutS3Object data provenance
S3 Bucket browser view (shows the newly created object).
GetSQS data provenance
Showed the SQS event getting triggered for newly created S3 object.
Sample SQS Notification message
The highlighted value shows the object name which will be extracted and used to retrieve the object from S3 bucket.
{"Records":[{"eventVersion":"2.0","eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2016-08-03T14:25:13.147Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AIDAJL3JQI6HZAG3MB6JM"},"requestParameters":{"sourceIPAddress":"71.127.248.137"},"responseElements":{"x-amz-request-id":"9A673206F1BFDE85","x-amz-id-2":"UmcqEKQJyXfH+UlgDTWIMfvQDOhuOWREe/lwUSJdMx9CbgCu7wzPWJL+wCeRzL6dgqsnYTopWrM="},"s3":{"s3SchemaVersion":"1.0","configurationId":"NiFiEvents","bucket":{"name":"mphdf","ownerIdentity":{"principalId":"A1O76DMOXCPR44"},"arn":"arn:aws:s3:::mphdf"},"object":{"key":"orderEvent/651155932749796","size":804,"eTag":"53ca61e19b3223763f1b36ed0e9383fa","sequencer":"0057A1FEC9137FBF3A"}}}]}
FetchS3Bucket data provenance
The content from the newly created S3 object extracted using the FetchS3Bucket processor.
Apache NiFi Benefits
NiFi is data source and destination-agnostic. it can move data from any data platform to any data platform. The S3 example demonstrates it can integrate well with non Hadoop echo system as well.
The NiFi project provides connection processors for many standard data sources like S3. With standard interfaces new processors can be developed with minimal effort.
You can create NiFi dataflow templates to accelerate development.
Apache NiFi is ideal for data sources sitting out on the edge in the cloud or on-prem.
Document References
NiFi System Admin Guide:
https://aws.amazon.com/s3/?nc2=h_l3_sc
https://aws.amazon.com/sqs/?nc2=h_m1
https://nifi.apache.org/docs/nifi-docs/
Created on 02-06-2017 12:55 PM
followed this procedure. getting error as
2017-02-06 05:28:17,735 WARN [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=fdfba74e-0159-1000-c383-ceeac30ec710] Processor Administratively Yielded for 1 sec due to processing failure 2017-02-06 05:28:17,735 WARN [Timer-Driven Process Thread-3] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding ListS3[id=fdfba74e-0159-1000-c383-ceeac30ec710] due to uncaught Exception: com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 4C962ABA0628B5B7), S3 Extended Request ID: K/tm3wLtBIYyPDGqzbDC90IDmqPiqHMYa16GzgRDUcUSLdva20DMdBsgTwi60JNqJ62hA3HggqE= 2017-02-06 05:28:17,737 WARN [Timer-Driven Process Thread-3] o.a.n.c.t.ContinuallyRunProcessorTask com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 4C962ABA0628B5B7) at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1370) ~[na:na] at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:917) ~[na:na] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:695) ~[na:na] at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:447) ~[na:na] at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:409) ~[na:na] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:358) ~[na:na] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3787) ~[na:na] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3729) ~[na:na] at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:606) ~[na:na] at org.apache.nifi.processors.aws.s3.ListS3$S3ObjectBucketLister.listVersions(ListS3.java:314) ~[na:na] at org.apache.nifi.processors.aws.s3.ListS3.onTrigger(ListS3.java:208) ~[na:na] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.1.1.jar:1.1.1] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
thanks in advance for any help. @Matt @milind pandit
Created on 02-06-2017 10:47 PM
@S Mahapatra AWS S3 bucket needs to be created, it seems you are missing some config on AWS side.
Created on 02-07-2017 02:13 AM
the bucket ofcourse created and I could access them s3 browser as well as s3 command line.