Created 04-18-2016 11:28 AM
Hi, I am new to NiFi and want to rename a file based on a SQL query. The incoming jpg file has a GUID.jpg filename which I then extract the filename and query a table which returns the new filename. I then want to rename the file with this new filename. However when I use the ExecuteSQL_main processor the flowfile changes to the AVRO output and the original file is lost. The MERGE processor merges the content of the image incoming file with the avro query output which is not what I need.
Any help would be appreciated....
Thanks..
Created 04-19-2016 10:15 PM
I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:
GetFile --> UpdateAttribute (copy "filename" to a new attribute called "origFilename") #You will use the origFilename attribute to merge later. #Your UpdateAttribute success should feed both of these processors: --> MergeContent path (correlate on origFilename and have it TAR the files) --> ExecuteSQL. #After ExecuteSQL and before the MergeContent: a) add attribute "newFilename" = the name from the DB. b) update attribute "filename" = "DELETE.ME" (hard coded value) #At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" #and it will tar them so you have everything in one flowFile. MergeContent --> UnpackContent --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME --> UpdateAttribute (set filename=${newFilename} --> PutFile()
According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.
Created 04-18-2016 01:05 PM
You're probably wanting to also route the original file to an UpdateAttribute processor which replaces the filename attribute with the name extracted from the AVRO the sql query produces. Make sense?
Created 04-18-2016 01:19 PM
Will try that now...
Created 04-19-2016 03:15 PM
any luck @Zaheer N ?
Created 04-19-2016 05:54 PM
I tried it, found that since 2 flow files arrive at the UpdateAttribute processor, the later file overwrites the former file. So since the image file arrives straight from getFile processor it is overwritten by the avro file. Will look setting an attribute for the avro and find some logic to prevent it from being sent forward.
Created 04-18-2016 01:41 PM
Could you use listFile to feed the SQL query and deal with renaming the file after the executeSQL?
Created 04-25-2016 06:58 PM
I posted a script for this above.
Created 04-19-2016 10:15 PM
I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:
GetFile --> UpdateAttribute (copy "filename" to a new attribute called "origFilename") #You will use the origFilename attribute to merge later. #Your UpdateAttribute success should feed both of these processors: --> MergeContent path (correlate on origFilename and have it TAR the files) --> ExecuteSQL. #After ExecuteSQL and before the MergeContent: a) add attribute "newFilename" = the name from the DB. b) update attribute "filename" = "DELETE.ME" (hard coded value) #At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" #and it will tar them so you have everything in one flowFile. MergeContent --> UnpackContent --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME --> UpdateAttribute (set filename=${newFilename} --> PutFile()
According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.
Created 04-20-2016 03:29 PM
Thanks James for taking the time in the explanation. I think I'm getting the picture so let me try this.
Would have been awesome if there was a setting in the ExecuteSQL processor that offered the query destination to be either ATTRIBUTES or CONTENT. I could have chosen ATTRIBUTES and the original file content would have remained in the flowfile. I guess that's why it's open source , but I'm still too much of a novice to attempt to change this behaviour.
Created 04-21-2016 07:29 PM
Below is a simpler way of doing what you want without the MergeContent processor, which makes the flow a lot easier. So, create an ExecuteScript processor, add a property "databaseConnectionPoolName" with the value of your database connection pool name (e.g. "MySQL-DBCPConnectionPool"). Now set the Script Engine to "Groovy" and add the script below either in a file or in "Script Body".
Note that there is probably some more to do to make this solid code, but it does work. I know very little about Groovy. I based this on something on Matt Burgess's site http://funnifi.blogspot.com/. @mburgess
Let me know how it goes with either method or if you figure out something better. There's always a better way.
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql def lookup = context.controllerServiceLookup def dbServiceName = databaseConnectionPoolName.value def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } if (!dbcpServiceId) return def conn = lookup.getControllerService(dbcpServiceId)?.getConnection() try { //flowFile = session.create() DO this if you want to create a NEW flowfile (losing content, etc) def flowFile = session.get() if (!flowFile) return def sql = new Sql(conn) def filenameBase = flowFile.getAttribute('filenameBase') def rows = sql.rows("select val as myval from junk where k = $filenameBase") //You may want to do something if the row is not found. def newval = rows[0].myval flowFile = session.putAttribute(flowFile, 'filename', newval) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('MY ERROR!!! *******************', e) session.transfer(flowFile, REL_FAILURE) } finally { conn?.close() }