Support Questions

Find answers, ask questions, and share your expertise

NiFi: How to rename a getFile flowfile based on a ExecuteSQL_main query

Explorer

Hi, I am new to NiFi and want to rename a file based on a SQL query. The incoming jpg file has a GUID.jpg filename which I then extract the filename and query a table which returns the new filename. I then want to rename the file with this new filename. However when I use the ExecuteSQL_main processor the flowfile changes to the AVRO output and the original file is lost. The MERGE processor merges the content of the image incoming file with the avro query output which is not what I need.

Any help would be appreciated....

Thanks..

1 ACCEPTED SOLUTION

Super Collaborator

@Zaheer N

I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:

GetFile
  --> UpdateAttribute (copy "filename" to a new attribute called "origFilename")
#You will use the origFilename attribute to merge later.
#Your UpdateAttribute success should feed both of these processors:
     --> MergeContent path (correlate on origFilename and have it TAR the files)
     --> ExecuteSQL.

#After ExecuteSQL and before the MergeContent:
  a) add attribute "newFilename" = the name from the DB.
  b) update attribute "filename" = "DELETE.ME" (hard coded value)

#At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" 
#and it will tar them so you have everything in one flowFile.

MergeContent
 --> UnpackContent
   --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME
     --> UpdateAttribute (set filename=${newFilename}
      --> PutFile()

According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.

View solution in original post

15 REPLIES 15

You're probably wanting to also route the original file to an UpdateAttribute processor which replaces the filename attribute with the name extracted from the AVRO the sql query produces. Make sense?

Explorer

Will try that now...

any luck @Zaheer N ?

Explorer

I tried it, found that since 2 flow files arrive at the UpdateAttribute processor, the later file overwrites the former file. So since the image file arrives straight from getFile processor it is overwritten by the avro file. Will look setting an attribute for the avro and find some logic to prevent it from being sent forward.

Super Collaborator

Could you use listFile to feed the SQL query and deal with renaming the file after the executeSQL?

Super Collaborator

I posted a script for this above.

Super Collaborator

@Zaheer N

I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:

GetFile
  --> UpdateAttribute (copy "filename" to a new attribute called "origFilename")
#You will use the origFilename attribute to merge later.
#Your UpdateAttribute success should feed both of these processors:
     --> MergeContent path (correlate on origFilename and have it TAR the files)
     --> ExecuteSQL.

#After ExecuteSQL and before the MergeContent:
  a) add attribute "newFilename" = the name from the DB.
  b) update attribute "filename" = "DELETE.ME" (hard coded value)

#At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" 
#and it will tar them so you have everything in one flowFile.

MergeContent
 --> UnpackContent
   --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME
     --> UpdateAttribute (set filename=${newFilename}
      --> PutFile()

According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.

Explorer

Thanks James for taking the time in the explanation. I think I'm getting the picture so let me try this.

Would have been awesome if there was a setting in the ExecuteSQL processor that offered the query destination to be either ATTRIBUTES or CONTENT. I could have chosen ATTRIBUTES and the original file content would have remained in the flowfile. I guess that's why it's open source , but I'm still too much of a novice to attempt to change this behaviour.

Super Collaborator

@Zaheer N

Below is a simpler way of doing what you want without the MergeContent processor, which makes the flow a lot easier. So, create an ExecuteScript processor, add a property "databaseConnectionPoolName" with the value of your database connection pool name (e.g. "MySQL-DBCPConnectionPool"). Now set the Script Engine to "Groovy" and add the script below either in a file or in "Script Body".

Note that there is probably some more to do to make this solid code, but it does work. I know very little about Groovy. I based this on something on Matt Burgess's site http://funnifi.blogspot.com/. @mburgess

Let me know how it goes with either method or if you figure out something better. There's always a better way.

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}

if (!dbcpServiceId) return


def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
    //flowFile = session.create() DO this if you want to create a NEW flowfile (losing content, etc)
    def flowFile = session.get()
    if (!flowFile) return

    def sql = new Sql(conn)
    def filenameBase = flowFile.getAttribute('filenameBase')

    def rows = sql.rows("select val as myval from junk where k = $filenameBase")
    //You may want to do something if the row is not found.
    def newval = rows[0].myval

    flowFile = session.putAttribute(flowFile, 'filename', newval)
    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('MY ERROR!!! *******************', e)
    session.transfer(flowFile, REL_FAILURE)
} finally {
   conn?.close()
}



Explorer

Thanks James, this sounds groovy 🙂 .... I'm still playing with the merge solution so will try that later today.

Super Collaborator

@Zaheer N

Any luck? If you're having issues with either solution, let me know and I may be able to clarify. There were some details I left off in the first solution, particularly with the mergeContent (e.g. NumberOfMaxEntries=2, and I'm not sure what would happen if you have NumberOfMaxBins too low.).

@james.jones @Zaheer N I tried that and it works really well. Definitely Groovy !

Super Collaborator

Awesome. Can you accept the answer if that works out for you to close this out? Cheers.

Explorer

Apologies guys, I had to move on to another project and didn't get a chance to test the groovy solution. Seeing that it works as mentioned by Shishir I will accept the solution and try it first chance I get. Thanks once again for the effort. Cheers.

Super Collaborator

Thanks, @Zaheer N. FYI, one final option I'll throw out there is to use listFile (which was my original suggestion) and do a move/rename in Groovy. Here is a working groovy script in an ExecuteScript processor:

import org.apache.nifi.controller.ControllerService
import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.StandardCopyOption

/*

WARNING!!! This will OVERWRITE any existing file in the destination directory

REQUIRED FLOWFILE PROPERTIES:
     outDir             - directory name where the files will go (e.g. /tmp/junk/outdir)
     newFilename        - what you want the file named (e.g. mynew.jpg)
     changeFilenameAttr - optional boolean - default= 1, true or null sets origFilanem=${filename}
                          and filename=${newFilename}
                          value of false, 0 or n does not change the filename property.

     Also requires filename and orig.filename. These come from the getFile processor.

     Note - I wasn't able to getting properties defined in the ExecuteScript processor to work
            with Nifi Expression Language (e.g. referencing a property X=${filename} resolved to '${filename}'
            rather than the value of filename). The expressions were not evaluated for some reason.
*/


def flowFile = session.get()
if (!flowFile) return

log.info('*************** STARTING RENAME SCRIPT: ')


//*********
def origFilename = flowFile.getAttribute('filename')
def origPath = flowFile.getAttribute('absolute.path')

def sourceFilename = origPath + '/' + origFilename


def outDir = flowFile.getAttribute('outDir')
def newFilename = flowFile.getAttribute('newFilename')

def destFilename = outDir + '/' + newFilename
//*********

try {

   log.info('*************** MOVING FILE: ' + sourceFilename + ' TO ' + destFilename)

   def source = Paths.get(sourceFilename)
   def dest = Paths.get(destFilename)
   Files.move(source, dest, StandardCopyOption.REPLACE_EXISTING)

    if (flowFile.getAttribute('changeFilename')?.value?.toBoolean) {
       //This needs testing/verification
       flowFile = session.putAttribute(flowFile, 'origFilename', origFilename)
       flowFile = session.putAttribute(flowFile, 'filename', newFilename)
    }

    log.info('SUCCESSFULLY MOVED FILE: ' + sourceFilename + ' TO ' + destFilename)

    session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('FAILED moveFile.groovy: ' + sourceFilename + ' TO ' + destFilename, e)
    session.transfer(flowFile, REL_FAILURE)
}

And, thanks @Shishir Saxena for testing the previous script out.