Support Questions

Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Data Analytics (CDA) group hub.

Nifi PutParquet writing files to s3

Contributor

I have a nifi workflow that reads avro messages from kafka and writes them as snappy compressed parquet files to s3. I'm noticing some behavior I did not expect with this:

  1. In PutParquet I'm specifying the directory as s3://mybucketname/events/${kafka.topic}/${event}/${event_timestamp:format("yyyy/MM/dd/HH")} but it seems to be creating a directory with a blank name at the top level of my bucket before moving on to events/topic_name/etc.
  2. For all subsequent sub directories, it seems to be creating a file in addition to the directory with the same name.
  3. It is creating block files at the top level of the bucket, i.e. "block_1903392314985430358"
  4. The files it creates don't have a .snappy or a .parquet extension.

Can anyone shed any light into what is happening here and how I can fix it? I'm using Apache Nifi 1.4.0

My core-site.xml has:

<property>
    <name>fs.defaultFS</name>
    <value>s3://mybucketname</value>
</property>
<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>



1 REPLY 1

Hi @Frank Maritato

I am working on a similar use case. Trying to write content of Flowfiles coming in PutParquet directly on S3 bucket.

I have mentioned the directory as bucket's path(including folder subdirectory) and have core-site.xml similar to yours(fs.defaultFS has only bucket name). Also, I am using s3a URI scheme instead of s3 (all paths have appropriate URI).

Also, I have added jars in 'Additional Resources ' attribute according to this link http://apache-nifi-users-list.2361937.n4.nabble.com/PutParquet-with-S3-td3632.html

But on execution I am facing error: 'ERROR PutParquet[Id=...] failed to invoke @OnScheduled method due to java.lang.RunTimeException: Failed while executing one of processor's onScheduled task.; processor will not be scheduled to run for 30 seconds: java.lang.RunTimeException: Failed while executing one of processor's OnScheduled task'

On inspection of logs i can see the Error as : 'AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: , AWS Error Code: null, AWS Error Message: Bad Request'

I have also mentioned s3a endpoint as s3.ap-south-1.amazonaws.com because I have Nifi server and bucket in the same region, Mumbai

Can you help me out with your configuration's or any additional jars?

Take a Tour of the Community
Don't have an account?
Your experience may be limited. Sign in to explore more.