Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Nifi PutParquet writing files to s3

avatar
Rising Star

I have a nifi workflow that reads avro messages from kafka and writes them as snappy compressed parquet files to s3. I'm noticing some behavior I did not expect with this:

  1. In PutParquet I'm specifying the directory as s3://mybucketname/events/${kafka.topic}/${event}/${event_timestamp:format("yyyy/MM/dd/HH")} but it seems to be creating a directory with a blank name at the top level of my bucket before moving on to events/topic_name/etc.
  2. For all subsequent sub directories, it seems to be creating a file in addition to the directory with the same name.
  3. It is creating block files at the top level of the bucket, i.e. "block_1903392314985430358"
  4. The files it creates don't have a .snappy or a .parquet extension.

Can anyone shed any light into what is happening here and how I can fix it? I'm using Apache Nifi 1.4.0

My core-site.xml has:

<property>
    <name>fs.defaultFS</name>
    <value>s3://mybucketname</value>
</property>
<property>
  <name>fs.s3.impl</name>
  <value>org.apache.hadoop.fs.s3.S3FileSystem</value>
</property>



1 REPLY 1

avatar

Hi @Frank Maritato

I am working on a similar use case. Trying to write content of Flowfiles coming in PutParquet directly on S3 bucket.

I have mentioned the directory as bucket's path(including folder subdirectory) and have core-site.xml similar to yours(fs.defaultFS has only bucket name). Also, I am using s3a URI scheme instead of s3 (all paths have appropriate URI).

Also, I have added jars in 'Additional Resources ' attribute according to this link http://apache-nifi-users-list.2361937.n4.nabble.com/PutParquet-with-S3-td3632.html

But on execution I am facing error: 'ERROR PutParquet[Id=...] failed to invoke @OnScheduled method due to java.lang.RunTimeException: Failed while executing one of processor's onScheduled task.; processor will not be scheduled to run for 30 seconds: java.lang.RunTimeException: Failed while executing one of processor's OnScheduled task'

On inspection of logs i can see the Error as : 'AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: , AWS Error Code: null, AWS Error Message: Bad Request'

I have also mentioned s3a endpoint as s3.ap-south-1.amazonaws.com because I have Nifi server and bucket in the same region, Mumbai

Can you help me out with your configuration's or any additional jars?