Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

NiFi: How to rename a getFile flowfile based on a ExecuteSQL_main query

avatar
Contributor

Hi, I am new to NiFi and want to rename a file based on a SQL query. The incoming jpg file has a GUID.jpg filename which I then extract the filename and query a table which returns the new filename. I then want to rename the file with this new filename. However when I use the ExecuteSQL_main processor the flowfile changes to the AVRO output and the original file is lost. The MERGE processor merges the content of the image incoming file with the avro query output which is not what I need.

Any help would be appreciated....

Thanks..

1 ACCEPTED SOLUTION

avatar
Super Collaborator

@Zaheer N

I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:

GetFile
  --> UpdateAttribute (copy "filename" to a new attribute called "origFilename")
#You will use the origFilename attribute to merge later.
#Your UpdateAttribute success should feed both of these processors:
     --> MergeContent path (correlate on origFilename and have it TAR the files)
     --> ExecuteSQL.

#After ExecuteSQL and before the MergeContent:
  a) add attribute "newFilename" = the name from the DB.
  b) update attribute "filename" = "DELETE.ME" (hard coded value)

#At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" 
#and it will tar them so you have everything in one flowFile.

MergeContent
 --> UnpackContent
   --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME
     --> UpdateAttribute (set filename=${newFilename}
      --> PutFile()

According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.

View solution in original post

15 REPLIES 15

avatar

You're probably wanting to also route the original file to an UpdateAttribute processor which replaces the filename attribute with the name extracted from the AVRO the sql query produces. Make sense?

avatar
Contributor

Will try that now...

avatar

any luck @Zaheer N ?

avatar
Contributor

I tried it, found that since 2 flow files arrive at the UpdateAttribute processor, the later file overwrites the former file. So since the image file arrives straight from getFile processor it is overwritten by the avro file. Will look setting an attribute for the avro and find some logic to prevent it from being sent forward.

avatar
Super Collaborator

Could you use listFile to feed the SQL query and deal with renaming the file after the executeSQL?

avatar
Super Collaborator

I posted a script for this above.

avatar
Super Collaborator

@Zaheer N

I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:

GetFile
  --> UpdateAttribute (copy "filename" to a new attribute called "origFilename")
#You will use the origFilename attribute to merge later.
#Your UpdateAttribute success should feed both of these processors:
     --> MergeContent path (correlate on origFilename and have it TAR the files)
     --> ExecuteSQL.

#After ExecuteSQL and before the MergeContent:
  a) add attribute "newFilename" = the name from the DB.
  b) update attribute "filename" = "DELETE.ME" (hard coded value)

#At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" 
#and it will tar them so you have everything in one flowFile.

MergeContent
 --> UnpackContent
   --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME
     --> UpdateAttribute (set filename=${newFilename}
      --> PutFile()

According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.

avatar
Contributor

Thanks James for taking the time in the explanation. I think I'm getting the picture so let me try this.

Would have been awesome if there was a setting in the ExecuteSQL processor that offered the query destination to be either ATTRIBUTES or CONTENT. I could have chosen ATTRIBUTES and the original file content would have remained in the flowfile. I guess that's why it's open source , but I'm still too much of a novice to attempt to change this behaviour.

avatar
Super Collaborator

@Zaheer N

Below is a simpler way of doing what you want without the MergeContent processor, which makes the flow a lot easier. So, create an ExecuteScript processor, add a property "databaseConnectionPoolName" with the value of your database connection pool name (e.g. "MySQL-DBCPConnectionPool"). Now set the Script Engine to "Groovy" and add the script below either in a file or in "Script Body".

Note that there is probably some more to do to make this solid code, but it does work. I know very little about Groovy. I based this on something on Matt Burgess's site http://funnifi.blogspot.com/. @mburgess

Let me know how it goes with either method or if you figure out something better. There's always a better way.

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}

if (!dbcpServiceId) return


def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
    //flowFile = session.create() DO this if you want to create a NEW flowfile (losing content, etc)
    def flowFile = session.get()
    if (!flowFile) return

    def sql = new Sql(conn)
    def filenameBase = flowFile.getAttribute('filenameBase')

    def rows = sql.rows("select val as myval from junk where k = $filenameBase")
    //You may want to do something if the row is not found.
    def newval = rows[0].myval

    flowFile = session.putAttribute(flowFile, 'filename', newval)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('MY ERROR!!! *******************', e)
    session.transfer(flowFile, REL_FAILURE)
} finally {
   conn?.close()
}