Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Execute script groovy error: is not known in this session and is already marked for transfer

avatar
New Contributor

Hello

if you could help me with this error. Since I do not understand the error.

Attached image.

Regards

65046-nifi.png

My code:

import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value

def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
 
def flowFileIn = session.get()

log.info("flow in : "+flowFileIn.getAttribute('Tablas'))

def tables = flowFileIn.getAttribute('Tablas')
def rutas = flowFileIn.getAttribute('Ruta')

flowFile = session.create()

def schemaName = schema.evaluateAttributeExpressions(flowFile).value
//def tables = ruta.evaluateAttributeExpressions(flowFile).value
//def rutas = tablas.evaluateAttributeExpressions(flowFile).value

 log.info("schemaName property value: "+schemaName)
 log.info("tablas in : "+tables)
 log.info("rutas in : "+rutas)

 def sql = new Sql(conn)
 
 
 def tablesArray = tables.trim().split(',');
 def rutasArray = rutas.trim().split(',');
 
 log.info("Table atrribute value: "+tablesArray)
 log.info("rutas atrribute value: "+rutasArray)


 
 for ( file in rutasArray ) {
  
 
 sql.execute("begin dbms_logmnr.add_logfile( logfilename =>'"+file+"');  end;")
 //log.info("archivo a buscar: "+file)
 
 }
 
 sql.execute(' begin dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog); end;') 
 
 for ( table in tablesArray ) {


  def countRow = 0
 
  flowFile = session.write(flowFile, {out -> 
  
  
  query = "select sql_redo from v\$logmnr_contents where OPERATION = 'INSERT' and SEG_OWNER = '"+schemaName+"' and TABLE_NAME = '"+table+"' "


  log.info("query a lanzar: "+query)
 
  sql.rows("select sql_redo from v\$logmnr_contents where OPERATION = 'INSERT' and SEG_OWNER = '"+schemaName+"' and TABLE_NAME = '"+table+"' ").eachWithIndex { row, idx ->
   if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
   out.write((row.values().join(',') + "\n").getBytes())
   countRow = countRow + 1
  } 
  
  log.info("cantidad"+countRow+" para la tabla: "+table)
  
   
   } as OutputStreamCallback)
   
    if (countRow >= 1){
   //flowFile = session.putAttribute(flowFile, 'table', table)
   session.transfer(flowFile, REL_SUCCESS)
     
    } else {    
   conn?.close()
   log.info('No hay datos para la tabla: '+table)
   session.transfer(flowFile, REL_FAILURE)
        
    } 
   
 
 }


 sql.execute(' begin dbms_logmnr.end_logmnr(); end;')
 
 
} catch(e) {
 conn?.close()
    log.error('Scripting error', e)
    session.transfer(flowFile, REL_FAILURE)
 
}
conn?.close()


3 REPLIES 3

avatar
Master Guru

You use flowFileIn to get some attributes but don't use it later and don't transfer it either. The former means you'll break the provenance chain and the latter means you'll get an error since flowFileIn exists in the session. For the former, I recommend passing it into session.create(flowFileIn), such that the child flow file will inherit from the parent (keeping the provenance chain). For the latter, since you are transferring the child flow file, you can remove the incoming flow file with session.remove(flowFileIn) at the end of the script.

I am just eyeballing the code so if that doesn't fix the errors please let me know and I'll edit my answer after running with your script and looking more at it.

avatar
New Contributor

thanks @Matt Burgess, I made the modification, but it stays there and does nothing else .. What I want to do from several tables, execute select and pass the flow. but not wait to have everything and pass it, I want queued.

i am sorry my english is not very good. my lenguaje native is spanish

And does nothing.

to perform the queue, do I have to create session for each flow that I want to pass? that is, for each result?

Attached image.

65055-captura-de-pantalla-de-2018-04-07-08-50-14.png

65056-captura-de-pantalla-de-2018-04-07-08-50-01.png

avatar
Master Guru

You only need one session per execution of the script. Using that session, you can get, create, remove, and transfer as many flow files as you want. If you get or create a flow file from the session, then you must transfer or remove it before the end of the script, or else you will get a "Transfer relationship not specified" error. Also you can only transfer each flow file once, if you attempt to transfer the same flow file more than once, you will get the error you describe above.