Created on 10-26-2017 07:21 AM - edited 08-17-2019 05:16 PM
Dear Forum
I have the below Nifi flow to fetch data from mysql table and then load it to another mysql table (data enrichment)
I used executescript processor to connect to DB and manipulate the data as below (image 1.png)
this is my code
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql def lookup = context.controllerServiceLookup def dbServiceName = databaseConnectionPoolName.value def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } def conn = lookup.getControllerService(dbcpServiceId).getConnection() def sql = new Sql(conn) def sqlInsert = new Sql(conn) try { flowFile = session.get() if(!flowFile) return //log.error("ExecuteScript: Line records Exception: " + dbServiceName ) flowFile = session.write(flowFile, {inputStream, outputStream -> inputStream.eachLine { line -> def String[] lineCellArr = ["", "", "", "", "", "", ""] def sqlSelectArr = "" if (line != null){ //log.error("ExecuteScript: Line records Exception: " + dbServiceName ) lineArr = line.split("\,", -1) for(int i = 0; i < lineArr.length ; i++) { lineCellArr[i] = lineArr[i]; } sql.rows("SELECT * FROM Polygon.cells where cellId='" + lineCellArr[6] +"'").eachWithIndex { row, idx -> sqlSelectArr = row.values().join(';').split("\;", -1) outputStream.write((line.toString() + row.values().join(';') + ";" + sqlSelectArr[1]).getBytes()) def insertResult1 = sqlInsert.executeUpdate("INSERT IGNORE INTO Polygon.recharges (rechargeNumber, rechargeTime, serviceClass, rechargeValue, rechargeType, balanceAfter, rechargeLocation, towerName) VALUES ('" + lineCellArr[0] + "','" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','"+ lineCellArr[4] + "','"+ lineCellArr[5] + "','"+ lineCellArr[6] + "','"+ sqlSelectArr[1] + "')") def insertResult2 = sqlInsert.executeUpdate("INSERT INTO Polygon.rechargesSummary (rechargeDate, serviceClass, rechargeValue, rechargeCount, rechargeType, rechargeLocation, towerName, latitude , longitude, region, subRegion) VALUES ('" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','1','"+ lineCellArr[4] + "','"+ lineCellArr[6] + "','" + sqlSelectArr[1] + "','"+ sqlSelectArr[2] + "','"+ sqlSelectArr[3] + "','"+ sqlSelectArr[4] + "','"+ sqlSelectArr[5] + "') on DUPLICATE KEY UPDATE rechargeValue = rechargeValue + '"+ lineCellArr[3] +"', rechargeCount = rechargeCount + 1") //write results outputStream.write((";" + insertResult1.toString() +insertResult2.toString() + " \n").getBytes()) } attrMap = ['Type': "Vouchers", 'Company': "Zain"] } } } as StreamCallback) flowFile = session.putAllAttributes(flowFile, attrMap) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Scripting error', e) session.transfer(flowFile, REL_FAILURE) } conn.close()
Everything works fine, but after couple of hours I start getting the below error
Caused by: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159) at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264) at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54) at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220) ... 11 common frames omitted Caused by: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:355) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:153) ... 14 common frames omitted Caused by: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:84) at com.sun.proxy.$Proxy77.getConnection(Unknown Source) at org.apache.nifi.dbcp.DBCPService$getConnection.call(Unknown Source) at Script4.run(Script4.groovy:19) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:352) ... 15 common frames omitted 2017-10-26 09:55:16,873 ERROR [Timer-Driven Process Thread-12] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=7e50130c-1000-115f-dab6-0ef65aa69de2] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled: {} org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:230) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
When I restart the connectionpool, everything works fine for couple of hours till I get this error again,
could you please advise
Thanks a lot
Created on 10-26-2017 12:14 PM - edited 08-17-2019 05:15 PM
Can you add validation query in connection pool, as this Validation query used to validate connections before returning them. When connection is invalid, it get's dropped and new valid connection will be returned.
Query:
select CURRENT_TIMESTAMP()
This validation query will take care of invalid connections and drops invalid connections and re enables connections,
Validation Query helps you to maintain connection pools never get disabled.
Note:- Using validation might have some performance penalty.
Created on 10-26-2017 12:14 PM - edited 08-17-2019 05:15 PM
Can you add validation query in connection pool, as this Validation query used to validate connections before returning them. When connection is invalid, it get's dropped and new valid connection will be returned.
Query:
select CURRENT_TIMESTAMP()
This validation query will take care of invalid connections and drops invalid connections and re enables connections,
Validation Query helps you to maintain connection pools never get disabled.
Note:- Using validation might have some performance penalty.
Created 10-26-2017 01:33 PM
Thanks @Shu, please check below
Created on 10-26-2017 01:32 PM - edited 08-17-2019 05:15 PM
Thanks @Shu
I tried your suggestion as below
. However, I still get my connection disconnected... and I can't see in the log that "select CURRENT_TIMESTAMP()" is being executed ? is this normal ?
Also I dunno why the ConnectionPool is not showing that ExecuteScript is refering to it ? see below
Thanks a lot...
Created 10-26-2017 05:04 PM
"Referencing Components" is done by the framework when you explicitly refer to a Component from another, such as selecting that DBCPConnectionPool from a drop-down list in PutSQL for example. With a scripting processor, the framework does not know what the script is doing, including whether it references a particular component or not.
Also DBCPConnectionPool does not log that it executes the Validation Query, that is performed by Apache DBCP "under the hood" when a connection is requested from the pool.
What is very weird about your situation is that the Controller Service (DBCPConnectionPool) becomes disabled, that should not be related to whether the connection pool gives back good/bad connections. Is there an error bulletin or something in the logs from the DBCPConnectionPool itself or the framework (not the ExecuteScript)?
Created 10-26-2017 05:16 PM
Can you once make sure you are referring to the correct controller service id that having validation query in it.
I think you have added dynamic property in execute script processor with controller service id in it and you are using the same controller service when you did your fetch and loading.
In your question the controller service id is 7e50134b-1000-115f-d673-ccc9fbd344fe but in the above screenshot
controller service id(588fa4bo...etc) is different.
I think that is the reason why you are having facing issues still..!!
Created 10-27-2017 06:26 PM
Thanks @Shu and @Matt for your answers,
@Shu, the controller service Id is different because I recreated it couple of times before posting the questions and comments 🙂
What I did is that I moved this part ( flowFile = session.get() if(!flowFile) return ) before defining the connection, code is working since yesterday but Im not sure if this was the issue !
@Matt, for your question... I was not able to find any other logs, I got this behaviour only on Nifi 1.4, version 1.0 is working fine.
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql flowFile = session.get() if(!flowFile) return def lookup = context.controllerServiceLookup def dbServiceName = databaseConnectionPoolName.value def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } def conn = lookup.getControllerService(dbcpServiceId).getConnection() def sql = new Sql(conn) def sqlInsert = new Sql(conn)
Created 10-28-2017 07:01 PM
One silly question plz.. where can I find the below import source files ? which folder if I need to check the content or add more imports to them ?
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql