Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Loading a file from S3 to RDS postgres

avatar
Explorer

Hi,

We are trying to work on a Proof Of Concept ( POC) to upload a file from an S3 bucket to RDS instance (in our case , it is Postgres) . I have uploaded the file on my local desktop using the execute script processor and using the psql copy command provided by postgresql. The psql command looks like this:

import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.SQLException;
import groovy.sql.Sql;
 def file ="C:/Users/kdasari/Desktop/NIFI/files/kiran.csv DELIMITER ',' CSV HEADER"
 def copyCommand = /psql --host=<AWS RDS> --port=5432 --dbname=<AWS DB name> --username=<AWS DB user> --no-password --command="copy kiran_test.poc_test from  $file"/
 println copyCommand 
        //run psql tool as an external process
        def process = copyCommand.execute()

I am trying to retrieve the flow file attributes (S3 file name ) and substitute in the groovy code "file" variable (see the code snippet above) , but with no success.

I used fetchS3object processor followed by the executescript processor.After reading the NIFI documentation, I understand the execute processor has access to the process session and process context , I am unable to figure out how to instantiate the"file " variable in the groovy code.

I am trying to understand the following:

.FetchS3Object : It reads the content of the S3 file and updates the flowfile attribute filename .

  1. Is NIFI still pointing to the physical file in the S3 instance and just updates the flow file attributes. (or)
  2. Is NIFI, physically copying the file from S3 and placing it in the NIFI content repository . If so, will it retain the filename ? (In my example, it would be kiran.csv)
  3. How do I refer to the file in the subsequent processors?
  4. In my example above , how do I include NIFI file reference in the groovy code.

Please let me know.

Thanks,

Kiran

1 REPLY 1

avatar
Master Guru

It is Option #2 above, the file contents are put in the content repo, and the filename is added as an attribute. If you want the file to reside on the local file system, you can use PutFile before your ExecuteScript processor, and you can get at the filename by getting the flow file from the session:

 def flowFile = session.get() 

and getting the "filename" attribute from it:

def filename = flowFile.getAttribute(CoreAttributes.FILENAME.key())

In your case, it doesn't seem like you really need a file per se; rather you need to get its contents to the "psql" command. For that I'd recommend using ProcessBuilder instead of just copyCommand.execute(). You might be able to do something like this (NOTE: not a complete script, just a snippet):

import java.io.*

def pb = new ProcessBuilder('psql --host=<AWS RDS> --port=5432 --dbname=<AWS DB name> --username=<AWS DB user> --no-password --command="copy kiran_test.poc_test from STDIN"')
def p = pb.start()
def psqlStdin = p.getOutputStream()
// Something like:
//   def bw = new BufferedWriter(new OutputStreamWriter(psqlStdIn))
// if you are using the Writer paradigm, or use PipedInputStream and PipedOutputStream
session.read(flowFile, {inputStream ->
  // write from inputStream (i.e. flow file contents) to psqlStdIn (aka "pipe to psql")
} as InputStreamCallback)

This should alleviate the need to write out the file(s) to the file system, although it becomes a more complex script. A complete alternative is a PostgreSQL Bulk Loader processor or something, that can do all this with the driver 🙂