Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar

Imagine we have a use case of exchanging data with an external party via AWS S3 buckets: we want to push these messages into our internal Kafka cluster, after enriching each message with additional metadata, in an event-driven fashion. In AWS, this is supported by associating notifications with an S3 bucket.

These destinations can make use of a few different destinations, namely, SQS, SNS, and Lambda. We'll focus on the SQS approach, and will make use of NiFi's GetSQS processor.

To configure this in AWS, navigate to the S3 bucket and then to the Properties tab, and scroll down to Advanced settings > Events. You'll need to create an SQS queue for this purpose.

12640-screen-shot-2017-02-18-at-104641-am.png

12639-screen-shot-2017-02-15-at-123908-pm.png

12651-screen-shot-2017-02-18-at-104733-am.png

With this configured, a new SQS message will appear any time an object is created within our S3 bucket.

We need to configure some IAM policies in order for our NiFi data flow to be authorized to read from the S3 bucket and to read from the SQS queue. We will authenticate from NiFi using the Access Key and Secret Key associated with a particular IAM user.

First, the IAM policy for reading from the S3 bucket called nifi:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads"
            ],
            "Resource": [
                "arn:aws:s3:::nifi"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::nifi/*"
            ]
        }
    ]
}

Second, the IAM policy for reading from--as well as deleting from since we'll configure GetSQS to auto-delete received messages--the SQS queue. We'll need the ARN and URL associated with our SQS queue, these can be retrieved from the SQS Management Console and navigating to the SQS queue name we created above.

Note: we could harden this by restricting the permitted SQS actions further.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sqs:*",
            "Resource": "arn:aws:sqs:$REGION:$UUID:$QUEUE_NAME"
        }
    ]
}

You will then need to attach these policies to your IAM user via the IAM Management Console.

We are now ready to build the NiFi data flow:

12652-screen-shot-2017-02-18-at-110842-am.png

For the GetSQS processor, just use the SQS queue URL from the SQS Management console that we retrieved above, the Region, and the Access Key and Secret Key associated with the IAM user.

We'll use SplitJSON to extract the name of the file associated with the SQS notification. We'll need this to fetch the object from S3.

12653-screen-shot-2017-02-18-at-111104-am.png

ExtractText is used to associate the result of the JsonPath expression with a new custom attribute, $filename:

12654-screen-shot-2017-02-18-at-111457-am.png

Which we'll pass into the FetchS3Object processor:

12656-screen-shot-2017-02-18-at-111635-am.png

Finally, we can enrich the message with UpdateAttribute using Advanced > Rules and push to our Kafka topic using the PublishKafka processor.

I'd like to credit this blog post: https://adamlamar.github.io/2016-01-30-monitoring-an-s3-bucket-in-apache-nifi/


screen-shot-2017-02-15-at-123923-pm.png
1,484 Views