Member since
04-06-2018
3
Posts
0
Kudos Received
0
Solutions
05-30-2018
10:45 PM
hello How can I make a flow (executeScript in groovy) be transferred to a destination that I want? Something similar with routeOnAttribute, but by code
sorry for the inconvenience
... View more
Labels:
- Labels:
-
Apache NiFi
04-07-2018
12:07 PM
thanks @Matt Burgess, I made the modification, but it stays there and does nothing else ..
What I want to do from several tables, execute select and pass the flow. but not wait to have everything and pass it, I want queued. i am sorry my english is not very good. my lenguaje native is spanish And does nothing. to perform the queue, do I have to create session for each flow that I want to pass? that is, for each result? Attached image.
... View more
04-06-2018
09:53 PM
Hello if you could help me with this error. Since I do not understand the error. Attached image. Regards My code: import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
def flowFileIn = session.get()
log.info("flow in : "+flowFileIn.getAttribute('Tablas'))
def tables = flowFileIn.getAttribute('Tablas')
def rutas = flowFileIn.getAttribute('Ruta')
flowFile = session.create()
def schemaName = schema.evaluateAttributeExpressions(flowFile).value
//def tables = ruta.evaluateAttributeExpressions(flowFile).value
//def rutas = tablas.evaluateAttributeExpressions(flowFile).value
log.info("schemaName property value: "+schemaName)
log.info("tablas in : "+tables)
log.info("rutas in : "+rutas)
def sql = new Sql(conn)
def tablesArray = tables.trim().split(',');
def rutasArray = rutas.trim().split(',');
log.info("Table atrribute value: "+tablesArray)
log.info("rutas atrribute value: "+rutasArray)
for ( file in rutasArray ) {
sql.execute("begin dbms_logmnr.add_logfile( logfilename =>'"+file+"'); end;")
//log.info("archivo a buscar: "+file)
}
sql.execute(' begin dbms_logmnr.start_logmnr(options => dbms_logmnr.dict_from_online_catalog); end;')
for ( table in tablesArray ) {
def countRow = 0
flowFile = session.write(flowFile, {out ->
query = "select sql_redo from v\$logmnr_contents where OPERATION = 'INSERT' and SEG_OWNER = '"+schemaName+"' and TABLE_NAME = '"+table+"' "
log.info("query a lanzar: "+query)
sql.rows("select sql_redo from v\$logmnr_contents where OPERATION = 'INSERT' and SEG_OWNER = '"+schemaName+"' and TABLE_NAME = '"+table+"' ").eachWithIndex { row, idx ->
if(idx == 0) { out.write(((row.keySet() as List).join(',') + "\n").getBytes()) }
out.write((row.values().join(',') + "\n").getBytes())
countRow = countRow + 1
}
log.info("cantidad"+countRow+" para la tabla: "+table)
} as OutputStreamCallback)
if (countRow >= 1){
//flowFile = session.putAttribute(flowFile, 'table', table)
session.transfer(flowFile, REL_SUCCESS)
} else {
conn?.close()
log.info('No hay datos para la tabla: '+table)
session.transfer(flowFile, REL_FAILURE)
}
}
sql.execute(' begin dbms_logmnr.end_logmnr(); end;')
} catch(e) {
conn?.close()
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn?.close()
... View more
Labels:
- Labels:
-
Apache NiFi