Created 04-18-2016 11:28 AM
Hi, I am new to NiFi and want to rename a file based on a SQL query. The incoming jpg file has a GUID.jpg filename which I then extract the filename and query a table which returns the new filename. I then want to rename the file with this new filename. However when I use the ExecuteSQL_main processor the flowfile changes to the AVRO output and the original file is lost. The MERGE processor merges the content of the image incoming file with the avro query output which is not what I need.
Any help would be appreciated....
Thanks..
Created 04-19-2016 10:15 PM
I'm sure there is a better way to do this, but the following will work. I hope the way I describe it makes sense:
GetFile --> UpdateAttribute (copy "filename" to a new attribute called "origFilename") #You will use the origFilename attribute to merge later. #Your UpdateAttribute success should feed both of these processors: --> MergeContent path (correlate on origFilename and have it TAR the files) --> ExecuteSQL. #After ExecuteSQL and before the MergeContent: a) add attribute "newFilename" = the name from the DB. b) update attribute "filename" = "DELETE.ME" (hard coded value) #At this point, your MergeContent will receive two filesnames: <orginal> and "DELETE.ME" #and it will tar them so you have everything in one flowFile. MergeContent --> UnpackContent --> RouteOnAttribute (add property DELETEME=${filename:equals("DELETEME")} and auto-terminate DELETEME --> UpdateAttribute (set filename=${newFilename} --> PutFile()
According to the documentation, I think you need a single processor to feed the MergeContent processor so you can use an empty updateAttribute processor.
Created 04-22-2016 08:11 AM
Thanks James, this sounds groovy 🙂 .... I'm still playing with the merge solution so will try that later today.
Created 04-25-2016 01:52 PM
Any luck? If you're having issues with either solution, let me know and I may be able to clarify. There were some details I left off in the first solution, particularly with the mergeContent (e.g. NumberOfMaxEntries=2, and I'm not sure what would happen if you have NumberOfMaxBins too low.).
Created 04-25-2016 02:58 PM
@james.jones @Zaheer N I tried that and it works really well. Definitely Groovy !
Created 04-25-2016 03:00 PM
Awesome. Can you accept the answer if that works out for you to close this out? Cheers.
Created 04-25-2016 04:49 PM
Apologies guys, I had to move on to another project and didn't get a chance to test the groovy solution. Seeing that it works as mentioned by Shishir I will accept the solution and try it first chance I get. Thanks once again for the effort. Cheers.
Created 04-25-2016 06:56 PM
Thanks, @Zaheer N. FYI, one final option I'll throw out there is to use listFile (which was my original suggestion) and do a move/rename in Groovy. Here is a working groovy script in an ExecuteScript processor:
import org.apache.nifi.controller.ControllerService import java.nio.file.Files import java.nio.file.Paths import java.nio.file.StandardCopyOption /* WARNING!!! This will OVERWRITE any existing file in the destination directory REQUIRED FLOWFILE PROPERTIES: outDir - directory name where the files will go (e.g. /tmp/junk/outdir) newFilename - what you want the file named (e.g. mynew.jpg) changeFilenameAttr - optional boolean - default= 1, true or null sets origFilanem=${filename} and filename=${newFilename} value of false, 0 or n does not change the filename property. Also requires filename and orig.filename. These come from the getFile processor. Note - I wasn't able to getting properties defined in the ExecuteScript processor to work with Nifi Expression Language (e.g. referencing a property X=${filename} resolved to '${filename}' rather than the value of filename). The expressions were not evaluated for some reason. */ def flowFile = session.get() if (!flowFile) return log.info('*************** STARTING RENAME SCRIPT: ') //********* def origFilename = flowFile.getAttribute('filename') def origPath = flowFile.getAttribute('absolute.path') def sourceFilename = origPath + '/' + origFilename def outDir = flowFile.getAttribute('outDir') def newFilename = flowFile.getAttribute('newFilename') def destFilename = outDir + '/' + newFilename //********* try { log.info('*************** MOVING FILE: ' + sourceFilename + ' TO ' + destFilename) def source = Paths.get(sourceFilename) def dest = Paths.get(destFilename) Files.move(source, dest, StandardCopyOption.REPLACE_EXISTING) if (flowFile.getAttribute('changeFilename')?.value?.toBoolean) { //This needs testing/verification flowFile = session.putAttribute(flowFile, 'origFilename', origFilename) flowFile = session.putAttribute(flowFile, 'filename', newFilename) } log.info('SUCCESSFULLY MOVED FILE: ' + sourceFilename + ' TO ' + destFilename) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('FAILED moveFile.groovy: ' + sourceFilename + ' TO ' + destFilename, e) session.transfer(flowFile, REL_FAILURE) }
And, thanks @Shishir Saxena for testing the previous script out.