Community Articles

Find and share helpful community-sourced technical articles.
avatar

Introduction

Hortonworks Dataflow (HDF) powered by Apache NiFi, kafka and Storm, collects, curates, analyzes and delivers real-time data from the IoAT to data stores both on-premises and in the cloud. Apache NiFi automates and manages the flow of information between systems. NiFi data flows are made of series of processors each with specific task. NiFi provides hundreds of general purpose processors. NiFi can pull data from various sources. This document mainly discusses in detail the integration with AWS S3 data source.

Amazon S3 is cloud storage for the Internet. To upload your data (photos, videos, documents etc.), you first create a bucket in one of the AWS regions. You can then upload any number of objects to the bucket. S3 buckets and objects are resources, and Amazon S3 provides APIs for you to manage them.

With many existing AWS customers there is need to integrate with S3 to process data across multiple applications. NiFi provides many processors to manage and process S3 objects integrating with S3 buckets. This document outlines the detail setup and configuration to integrate S3 with Apache NiFi.

Business Use cases

Many customers are utilizing the Amazon S3 storage service to build applications.The application types can be backup & archiving, content storage, big data analytics, cloud native application data or DR.

S3 can be utilized as persistent or temporary storage. Many applications need to process the data as lands into a S3 bucket, retrieve the content and log the metadata regarding the S3 object. AWS supports following destinations where it can publish S3 related events.

  • Amazon SNS topic
  • Amazon SQS queue
  • AWS lambda

This document describes putting and extracting data object from amazon S3 using Apache NiFi leveraging the Amazon SQS notifications.

Solution Architecture with Apache NiFi

The main purpose of the document is to showcase the ease of integration with S3 using Apache NiFi.

In the sample demo scenario:

  • The cloud NiFi instance creates the data object to S3 bucket using PutS3Object processor.
  • As the new object is created at S3, it sends out notification in JSON object form to amazon SQS queue.
  • GetSQS processor subscribes to the SQS event and retrieves the newly created S3 object metadata.
  • The FetchS3Object extracts the content of newly created object and sends it downstream systems for further processing.

6334-dataflow-arch.gif

AWS Configurations

This section describes the setup and configuration on AWS side (SQS & S3 bucket) based on the scenario described in the previous section. Make sure to login to the AWS dashboard and select appropriate product section to configure the S3 buckets and SQS queues.

SQS

One of the ways to monitor S3 bucket is to use SQS notifications.

First create the SQS Queue as shown below e.g. NiFiEvent.

6335-sqs-create.gif

Configure the security and appropriate permissions to the SQS queue so that SQS queue can be utilized for the S3 bucket events.

6336-sqs-permission.gif

S3 Bucket

Once the SQS configuration is done, create the S3 bucket (e.g. mphdf).

6338-s3-bucket-create.gif

Adding a folder named "orderEvent" to the S3 bucket. Go to the properties section and make sure to configure Permissions, Event notification and policy to the S3 bucket.

For permissions, add the appropriate account to include list, upload, delete, view and Edit permissions to the bucket.

6339-s3-bucket-permission.gif

Creating an AWS bucket policy, so that specific accounts have permissions to manage the S3 bucket objects.

6340-s3-bucket-policy-create.gif

The policy output will be JSON object which you can update in the bucket configuration.

6341-s3-bucket-policy.gif

Next configure the event notification to publish all the relevant events to SQS queues which we created earlier. Make sure to select the Type as SQS.

6342-bucketevent-config.gif

The AWS side configuration is now complete. Next, build the NiFi dataflow using the NiFi processors for S3 buckets and SQS.

NiFi DataFlow Configuration

To demonstrate the S3 integration I modified the existing NiFi data flow. There are two NiFi dataflows, one to publish the object to S3 and second one is to extract the object content through SQS notification.

Flow 1 : Accept the ERP Events --> transform to JSON --> PutS3Object

Flow 2 : GetSQS notification -->Get Object metadata --> FetchS3Object

The complete flow is shown below.

6343-nifi-dataflow.gif

PutS3Object Processor

This processor creates the object in S3 bucket. The configuration is shown below. Make sure to use the correct bucket name, owner (aws account) , access key and secret key. To get the Access and secret keys go the AWS IAM console -> users ->security credentials->create access keys. You can download or select Show User Security Credentials options to get Access key and Secret access key. Make sure to select the correct Region.

6344-puts3object-processor.gif

GetSQS Processor

IT fetches messages from an AWS SQS queue. The Queue URL you can get from AWS SQS details section.

6345-getsqs-processor.gif

FetchS3Object Processor

Retrieves the contents of an S3 Object and writes it to the content of a FlowFile. The "object key" is nothing but the key attribute value from incoming SQS notification JSON message. The "objectname" property is populated by JsonPath expression $.Records[*].s3.object.key

6346-fetchs3object-processor.gif

Testing NiFi DataFlow

For testing purpose, the existing dataflow is used which:

  • pulls the ERP events from JMS queue. The event xml is mapped to JSON object and the object is pushed to AWS S3 bucket.
  • The successful creation of S3 object triggers a notification to SQS queue.
  • The second NiFi dataflow reads the SQS event through GetSQS processor. It parses the JSON metadata information and extracts the object name from SQS message.
  • Next it uses the fetchS3Object NiFi processor to extract the S3 object content and then writes it to the local file system.

Following are some data provenance screenshots from NiFi which proves the successful creation of S3 object and then processing the SQS notification to extract and process the S3 object through NiFi in near real time fashion.

PutS3Object data provenance

6348-puts3object-data-provenance.gif

S3 Bucket browser view (shows the newly created object).

6350-s3-object-browser.gif

GetSQS data provenance

Showed the SQS event getting triggered for newly created S3 object.

6351-getsqs-provenance.gif

Sample SQS Notification message

The highlighted value shows the object name which will be extracted and used to retrieve the object from S3 bucket.

{"Records":[{"eventVersion":"2.0","eventSource":"aws:s3","awsRegion":"us-east-1","eventTime":"2016-08-03T14:25:13.147Z","eventName":"ObjectCreated:Put","userIdentity":{"principalId":"AWS:AIDAJL3JQI6HZAG3MB6JM"},"requestParameters":{"sourceIPAddress":"71.127.248.137"},"responseElements":{"x-amz-request-id":"9A673206F1BFDE85","x-amz-id-2":"UmcqEKQJyXfH+UlgDTWIMfvQDOhuOWREe/lwUSJdMx9CbgCu7wzPWJL+wCeRzL6dgqsnYTopWrM="},"s3":{"s3SchemaVersion":"1.0","configurationId":"NiFiEvents","bucket":{"name":"mphdf","ownerIdentity":{"principalId":"A1O76DMOXCPR44"},"arn":"arn:aws:s3:::mphdf"},"object":{"key":"orderEvent/651155932749796","size":804,"eTag":"53ca61e19b3223763f1b36ed0e9383fa","sequencer":"0057A1FEC9137FBF3A"}}}]}

FetchS3Bucket data provenance

The content from the newly created S3 object extracted using the FetchS3Bucket processor.

6352-fetchs3object-data-provenance.gif

Apache NiFi Benefits

NiFi is data source and destination-agnostic. it can move data from any data platform to any data platform. The S3 example demonstrates it can integrate well with non Hadoop echo system as well.

The NiFi project provides connection processors for many standard data sources like S3. With standard interfaces new processors can be developed with minimal effort.

You can create NiFi dataflow templates to accelerate development.

Apache NiFi is ideal for data sources sitting out on the edge in the cloud or on-prem.

Document References

NiFi System Admin Guide:

http://dev.hortonworks.com.s3.amazonaws.com/HDPDocuments/HDF1/HDF-1-trunk/bk_AdminGuide/content/ch_a...

https://aws.amazon.com/s3/?nc2=h_l3_sc

https://aws.amazon.com/sqs/?nc2=h_m1

https://nifi.apache.org/docs/nifi-docs/


puts3object-data-provenance.gif
38,908 Views
Comments
avatar
Contributor

followed this procedure. getting error as

2017-02-06 05:28:17,735 WARN [Timer-Driven Process Thread-3] org.apache.nifi.processors.aws.s3.ListS3 ListS3[id=fdfba74e-0159-1000-c383-ceeac30ec710] Processor Administratively Yielded for 1 sec due to processing failure 2017-02-06 05:28:17,735 WARN [Timer-Driven Process Thread-3] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding ListS3[id=fdfba74e-0159-1000-c383-ceeac30ec710] due to uncaught Exception: com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 4C962ABA0628B5B7), S3 Extended Request ID: K/tm3wLtBIYyPDGqzbDC90IDmqPiqHMYa16GzgRDUcUSLdva20DMdBsgTwi60JNqJ62hA3HggqE= 2017-02-06 05:28:17,737 WARN [Timer-Driven Process Thread-3] o.a.n.c.t.ContinuallyRunProcessorTask com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 4C962ABA0628B5B7) at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1370) ~[na:na] at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:917) ~[na:na] at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:695) ~[na:na] at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:447) ~[na:na] at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:409) ~[na:na] at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:358) ~[na:na] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3787) ~[na:na] at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3729) ~[na:na] at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:606) ~[na:na] at org.apache.nifi.processors.aws.s3.ListS3$S3ObjectBucketLister.listVersions(ListS3.java:314) ~[na:na] at org.apache.nifi.processors.aws.s3.ListS3.onTrigger(ListS3.java:208) ~[na:na] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.1.1.jar:1.1.1] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1099) ~[nifi-framework-core-1.1.1.jar:1.1.1] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.1.1.jar:1.1.1] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.1.1.jar:1.1.1] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.1.1.jar:1.1.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_121] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

thanks in advance for any help. @Matt @milind pandit

avatar

@S Mahapatra AWS S3 bucket needs to be created, it seems you are missing some config on AWS side.

avatar
Contributor

the bucket ofcourse created and I could access them s3 browser as well as s3 command line.