Created on 04-06-2018 09:53 PM - edited 08-17-2019 09:03 PM
Hello
if you could help me with this error. Since I do not understand the error.
Attached image.
Regards
My code:
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql def lookup = context.controllerServiceLookup def dbServiceName = databaseConnectionPoolName.value def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } def conn = lookup.getControllerService(dbcpServiceId)?.getConnection() try { def flowFileIn = session.get() log.info("flow in : "+flowFileIn.getAttribute('Tablas')) def tables = flowFileIn.getAttribute('Tablas') def rutas = flowFileIn.getAttribute('Ruta') flowFile = session.create() def schemaName = schema.evaluateAttributeExpressions(flowFile).value //def tables = ruta.evaluateAttributeExpressions(flowFile).value //def rutas = tablas.evaluateAttributeExpressions(flowFile).value log.info("schemaName property value: "+schemaName) log.info("tablas in : "+tables) log.info("rutas in : "+rutas) def sql = new Sql(conn) def tablesArray = tables.trim().split(','); def rutasArray = rutas.trim().split(','); log.info("Table atrribute value: "+tablesArray) log.info("rutas atrribute value: "+rutasArray) for ( file in rutasArray ) { sql.execute("begin dbms_logmnr.add_logfile( logfilename =>'"+file+"'); end;") //log.info("archivo a buscar: "+file) } sql.execute(' begin dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog); end;') for ( table in tablesArray ) { def countRow = 0 flowFile = session.write(flowFile, {out -> query = "select sql_redo from v\$logmnr_contents where OPERATION = 'INSERT' and SEG_OWNER = '"+schemaName+"' and TABLE_NAME = '"+table+"' " log.info("query a lanzar: "+query) sql.rows("select sql_redo from v\$logmnr_contents where OPERATION = 'INSERT' and SEG_OWNER = '"+schemaName+"' and TABLE_NAME = '"+table+"' ").eachWithIndex { row, idx -> if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) } out.write((row.values().join(',') + "\n").getBytes()) countRow = countRow + 1 } log.info("cantidad"+countRow+" para la tabla: "+table) } as OutputStreamCallback) if (countRow >= 1){ //flowFile = session.putAttribute(flowFile, 'table', table) session.transfer(flowFile, REL_SUCCESS) } else { conn?.close() log.info('No hay datos para la tabla: '+table) session.transfer(flowFile, REL_FAILURE) } } sql.execute(' begin dbms_logmnr.end_logmnr(); end;') } catch(e) { conn?.close() log.error('Scripting error', e) session.transfer(flowFile, REL_FAILURE) } conn?.close()
Created 04-06-2018 11:00 PM
You use flowFileIn to get some attributes but don't use it later and don't transfer it either. The former means you'll break the provenance chain and the latter means you'll get an error since flowFileIn exists in the session. For the former, I recommend passing it into session.create(flowFileIn), such that the child flow file will inherit from the parent (keeping the provenance chain). For the latter, since you are transferring the child flow file, you can remove the incoming flow file with session.remove(flowFileIn) at the end of the script.
I am just eyeballing the code so if that doesn't fix the errors please let me know and I'll edit my answer after running with your script and looking more at it.
Created on 04-07-2018 12:07 PM - edited 08-17-2019 09:03 PM
thanks @Matt Burgess, I made the modification, but it stays there and does nothing else .. What I want to do from several tables, execute select and pass the flow. but not wait to have everything and pass it, I want queued.
i am sorry my english is not very good. my lenguaje native is spanish
And does nothing.
to perform the queue, do I have to create session for each flow that I want to pass? that is, for each result?
Attached image.
Created 04-26-2018 02:58 PM
You only need one session per execution of the script. Using that session, you can get, create, remove, and transfer as many flow files as you want. If you get or create a flow file from the session, then you must transfer or remove it before the end of the script, or else you will get a "Transfer relationship not specified" error. Also you can only transfer each flow file once, if you attempt to transfer the same flow file more than once, you will get the error you describe above.