Created 03-24-2017 04:06 PM
Hi,
We are trying to work on a Proof Of Concept ( POC) to upload a file from an S3 bucket to RDS instance (in our case , it is Postgres) . I have uploaded the file on my local desktop using the execute script processor and using the psql copy command provided by postgresql. The psql command looks like this:
import java.sql.DriverManager;
import java.sql.Connection;
import java.sql.SQLException;
import groovy.sql.Sql;
def file ="C:/Users/kdasari/Desktop/NIFI/files/kiran.csv DELIMITER ',' CSV HEADER"
def copyCommand = /psql --host=<AWS RDS> --port=5432 --dbname=<AWS DB name> --username=<AWS DB user> --no-password --command="copy kiran_test.poc_test from $file"/
println copyCommand
//run psql tool as an external process
def process = copyCommand.execute()I am trying to retrieve the flow file attributes (S3 file name ) and substitute in the groovy code "file" variable (see the code snippet above) , but with no success.
I used fetchS3object processor followed by the executescript processor.After reading the NIFI documentation, I understand the execute processor has access to the process session and process context , I am unable to figure out how to instantiate the"file " variable in the groovy code.
I am trying to understand the following:
.FetchS3Object : It reads the content of the S3 file and updates the flowfile attribute filename .
Please let me know.
Thanks,
Kiran
Created 03-24-2017 09:02 PM
It is Option #2 above, the file contents are put in the content repo, and the filename is added as an attribute. If you want the file to reside on the local file system, you can use PutFile before your ExecuteScript processor, and you can get at the filename by getting the flow file from the session:
def flowFile = session.get()
and getting the "filename" attribute from it:
def filename = flowFile.getAttribute(CoreAttributes.FILENAME.key())
In your case, it doesn't seem like you really need a file per se; rather you need to get its contents to the "psql" command. For that I'd recommend using ProcessBuilder instead of just copyCommand.execute(). You might be able to do something like this (NOTE: not a complete script, just a snippet):
import java.io.*
def pb = new ProcessBuilder('psql --host=<AWS RDS> --port=5432 --dbname=<AWS DB name> --username=<AWS DB user> --no-password --command="copy kiran_test.poc_test from STDIN"')
def p = pb.start()
def psqlStdin = p.getOutputStream()
// Something like:
// def bw = new BufferedWriter(new OutputStreamWriter(psqlStdIn))
// if you are using the Writer paradigm, or use PipedInputStream and PipedOutputStream
session.read(flowFile, {inputStream ->
// write from inputStream (i.e. flow file contents) to psqlStdIn (aka "pipe to psql")
} as InputStreamCallback)
This should alleviate the need to write out the file(s) to the file system, although it becomes a more complex script. A complete alternative is a PostgreSQL Bulk Loader processor or something, that can do all this with the driver 🙂