- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Controller Service is disabled
- Labels:
-
Apache NiFi
Created on ‎10-26-2017 07:21 AM - edited ‎08-17-2019 05:16 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Dear Forum
I have the below Nifi flow to fetch data from mysql table and then load it to another mysql table (data enrichment)
I used executescript processor to connect to DB and manipulate the data as below (image 1.png)
this is my code
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql def lookup = context.controllerServiceLookup def dbServiceName = databaseConnectionPoolName.value def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } def conn = lookup.getControllerService(dbcpServiceId).getConnection() def sql = new Sql(conn) def sqlInsert = new Sql(conn) try { flowFile = session.get() if(!flowFile) return //log.error("ExecuteScript: Line records Exception: " + dbServiceName ) flowFile = session.write(flowFile, {inputStream, outputStream -> inputStream.eachLine { line -> def String[] lineCellArr = ["", "", "", "", "", "", ""] def sqlSelectArr = "" if (line != null){ //log.error("ExecuteScript: Line records Exception: " + dbServiceName ) lineArr = line.split("\,", -1) for(int i = 0; i < lineArr.length ; i++) { lineCellArr[i] = lineArr[i]; } sql.rows("SELECT * FROM Polygon.cells where cellId='" + lineCellArr[6] +"'").eachWithIndex { row, idx -> sqlSelectArr = row.values().join(';').split("\;", -1) outputStream.write((line.toString() + row.values().join(';') + ";" + sqlSelectArr[1]).getBytes()) def insertResult1 = sqlInsert.executeUpdate("INSERT IGNORE INTO Polygon.recharges (rechargeNumber, rechargeTime, serviceClass, rechargeValue, rechargeType, balanceAfter, rechargeLocation, towerName) VALUES ('" + lineCellArr[0] + "','" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','"+ lineCellArr[4] + "','"+ lineCellArr[5] + "','"+ lineCellArr[6] + "','"+ sqlSelectArr[1] + "')") def insertResult2 = sqlInsert.executeUpdate("INSERT INTO Polygon.rechargesSummary (rechargeDate, serviceClass, rechargeValue, rechargeCount, rechargeType, rechargeLocation, towerName, latitude , longitude, region, subRegion) VALUES ('" + lineCellArr[1] + "','"+ lineCellArr[2] + "','"+ lineCellArr[3] + "','1','"+ lineCellArr[4] + "','"+ lineCellArr[6] + "','" + sqlSelectArr[1] + "','"+ sqlSelectArr[2] + "','"+ sqlSelectArr[3] + "','"+ sqlSelectArr[4] + "','"+ sqlSelectArr[5] + "') on DUPLICATE KEY UPDATE rechargeValue = rechargeValue + '"+ lineCellArr[3] +"', rechargeCount = rechargeCount + 1") //write results outputStream.write((";" + insertResult1.toString() +insertResult2.toString() + " \n").getBytes()) } attrMap = ['Type': "Vouchers", 'Company': "Zain"] } } } as StreamCallback) flowFile = session.putAllAttributes(flowFile, attrMap) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Scripting error', e) session.transfer(flowFile, REL_FAILURE) } conn.close()
Everything works fine, but after couple of hours I start getting the below error
Caused by: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:159) at javax.script.AbstractScriptEngine.eval(AbstractScriptEngine.java:264) at org.apache.nifi.script.impl.GroovyScriptEngineConfigurator.eval(GroovyScriptEngineConfigurator.java:54) at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:220) ... 11 common frames omitted Caused by: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:355) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:153) ... 14 common frames omitted Caused by: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:84) at com.sun.proxy.$Proxy77.getConnection(Unknown Source) at org.apache.nifi.dbcp.DBCPService$getConnection.call(Unknown Source) at Script4.run(Script4.groovy:19) at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:352) ... 15 common frames omitted 2017-10-26 09:55:16,873 ERROR [Timer-Driven Process Thread-12] o.a.nifi.processors.script.ExecuteScript ExecuteScript[id=7e50130c-1000-115f-dab6-0ef65aa69de2] Failed to process session due to org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled: {} org.apache.nifi.processor.exception.ProcessException: javax.script.ScriptException: javax.script.ScriptException: java.lang.IllegalStateException: Cannot invoke method public abstract java.sql.Connection org.apache.nifi.dbcp.DBCPService.getConnection() throws org.apache.nifi.processor.exception.ProcessException on Controller Service with identifier 7e50134b-1000-115f-d673-ccc9fbd344fe because the Controller Service is disabled at org.apache.nifi.processors.script.ExecuteScript.onTrigger(ExecuteScript.java:230) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
When I restart the connectionpool, everything works fine for couple of hours till I get this error again,
could you please advise
Thanks a lot
Created on ‎10-26-2017 12:14 PM - edited ‎08-17-2019 05:15 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you add validation query in connection pool, as this Validation query used to validate connections before returning them. When connection is invalid, it get's dropped and new valid connection will be returned.
Query:
select CURRENT_TIMESTAMP()
This validation query will take care of invalid connections and drops invalid connections and re enables connections,
Validation Query helps you to maintain connection pools never get disabled.
Note:- Using validation might have some performance penalty.
Created on ‎10-26-2017 12:14 PM - edited ‎08-17-2019 05:15 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you add validation query in connection pool, as this Validation query used to validate connections before returning them. When connection is invalid, it get's dropped and new valid connection will be returned.
Query:
select CURRENT_TIMESTAMP()
This validation query will take care of invalid connections and drops invalid connections and re enables connections,
Validation Query helps you to maintain connection pools never get disabled.
Note:- Using validation might have some performance penalty.
Created ‎10-26-2017 01:33 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks @Shu, please check below
Created on ‎10-26-2017 01:32 PM - edited ‎08-17-2019 05:15 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks @Shu
I tried your suggestion as below
. However, I still get my connection disconnected... and I can't see in the log that "select CURRENT_TIMESTAMP()" is being executed ? is this normal ?
Also I dunno why the ConnectionPool is not showing that ExecuteScript is refering to it ? see below
Thanks a lot...
Created ‎10-26-2017 05:04 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
"Referencing Components" is done by the framework when you explicitly refer to a Component from another, such as selecting that DBCPConnectionPool from a drop-down list in PutSQL for example. With a scripting processor, the framework does not know what the script is doing, including whether it references a particular component or not.
Also DBCPConnectionPool does not log that it executes the Validation Query, that is performed by Apache DBCP "under the hood" when a connection is requested from the pool.
What is very weird about your situation is that the Controller Service (DBCPConnectionPool) becomes disabled, that should not be related to whether the connection pool gives back good/bad connections. Is there an error bulletin or something in the logs from the DBCPConnectionPool itself or the framework (not the ExecuteScript)?
Created ‎10-26-2017 05:16 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you once make sure you are referring to the correct controller service id that having validation query in it.
I think you have added dynamic property in execute script processor with controller service id in it and you are using the same controller service when you did your fetch and loading.
In your question the controller service id is 7e50134b-1000-115f-d673-ccc9fbd344fe but in the above screenshot
controller service id(588fa4bo...etc) is different.
I think that is the reason why you are having facing issues still..!!
Created ‎10-27-2017 06:26 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks @Shu and @Matt for your answers,
@Shu, the controller service Id is different because I recreated it couple of times before posting the questions and comments 🙂
What I did is that I moved this part ( flowFile = session.get() if(!flowFile) return ) before defining the connection, code is working since yesterday but Im not sure if this was the issue !
@Matt, for your question... I was not able to find any other logs, I got this behaviour only on Nifi 1.4, version 1.0 is working fine.
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql flowFile = session.get() if(!flowFile) return def lookup = context.controllerServiceLookup def dbServiceName = databaseConnectionPoolName.value def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == dbServiceName } def conn = lookup.getControllerService(dbcpServiceId).getConnection() def sql = new Sql(conn) def sqlInsert = new Sql(conn)
Created ‎10-28-2017 07:01 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
One silly question plz.. where can I find the below import source files ? which folder if I need to check the content or add more imports to them ?
import org.apache.nifi.controller.ControllerService import groovy.sql.Sql
