Support Questions

Find answers, ask questions, and share your expertise

Nifi Execute Groovy Script

avatar
Contributor

Hi All, 

I am facing issues with Execute Script processor. I am executing Groovy script which have oracle  DB in Execute Script processor but its not closing the DB connection.

Issues: Database connection are not closing and its making Inactive connection in database. 

The Groovy script we added proper open and close connections. Could you help fix this issues, also let me know where I am not correct . 

 

DB Connection Configuration is like that :

Capture.JPG

Execute Script Process are configured as fallowing: 

ExecuteScriptProcessor.JPG

Groovy Script as following : 

import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import oracle.jdbc.OracleTypes
import java.sql.ResultSet


def lookup = context.controllerServiceLookup
def dbServiceName = ConnectionPoolScriptExecution.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}

def conn = lookup.getControllerService(dbcpServiceId)?.getConnection();
def sql = new Sql(conn);

def flowFile = session.get()
if(!flowFile) return
//Session Values
strApplication = flowFile.getAttribute('application')
strUsername = flowFile.getAttribute('username')
strPassword = flowFile.getAttribute('password')


// SQL Query Creation and execution

def ValidOrNot

String sqlString = "select FN_HP_IS_VALID_APP('${strApplication}','${strUsername}','${strPassword}') as ValidOrNot from dual";
try {
rowNum = 0
sql.eachRow(sqlString, { dataResult ->
//println(dataResult.ValidOrNot)
ValidOrNot = dataResult.ValidOrNot
})

}
catch(Exception ex){

conn?.close()
}
finally {

conn?.close()
}


conn?.close()
// Assign values to session object
def attrMap = ['VALIDORNOT':ValidOrNot,'sqlString':sqlString]
flowFile = session.putAllAttributes(flowFile, attrMap)
session.transfer(flowFile, REL_SUCCESS)

 

4 REPLIES 4

avatar
Contributor

 

@mburgess  Hope you doing well, could you help on my issues. My script running fine but its process keeping oracle connection inactive and not closing, when i am checking in database its showing inactive  session which NIFI created by the NIFI user. 

 

 

 

 

avatar
New Contributor

++

We are facing same problem.

Are there anyone who can fix this?

avatar
New Contributor

I am getting the same issues.could you help
Please 😞

avatar
Explorer

Hello everyone!

In the company where I work they had the same problem, this time I had some time to review the script and do a proof of concept to try to solve the problem.

In my test, after a few flowfiles the processor generates the oracle database connection error.

Searching groovy documentation I found that the example that is floating around the internet is not correctly using the closure for the connection (this incident for example).

The solution is to close the instance along with the connection, currently it is trying to close the connection that is declared at the beginning of the script but the instance is ignored. In my tests, when closing the connection instance, the process executed 30,000 flowfiles without generating any problems. Instead, for the definition of this script, after 14,000 flowfiles, the processor started to generate a connection error.

You can find more info in the groovy documentancion:
Connecting to HSQLDB 
1.jpg
Well, what was expected arrived... the solution is as follows:
You need to add the finally statement and close sql

//Imports
import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import java.sql.ResultSet


//Get the conncation string from NIFI conncation pool
def lookup = context.controllerServiceLookup
//Este valor se crea en el procesador con el nombre del controlador de la conexion
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == dbServiceName
}

def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
sql = Sql.newInstance(conn);

////Get the session values from Nifi flow Start 
try {
    flowFile = session.get()
    if(!flowFile) return

    PRIMER_PARAMETRO = flowFile.getAttribute('PRIMER_PARAMETRO')
    SEGUNDO_PARAMETRO = flowFile.getAttribute('SEGUNDO_PARAMETRO')

    // Procedimiento con 2 parametros de entrada y 2 de salida
    String sqlString ="""{call PRUEBA_CONCEPTO.PROC_DUMMY_DUAL(?, ?, ?, ?)}""";

    //Parametros de salida
    def  CODIGO
    def  MENSAJE

    // Lista de parametros del procedimiento
    def parametersList = [PRIMER_PARAMETRO, SEGUNDO_PARAMETRO, Sql.NUMERIC, Sql.VARCHAR];

    //Ejecucion del procedimiento
    sql.call(sqlString, parametersList) {out_status_code, out_status_desc ->
        CODIGO = out_status_desc 
        MENSAJE = out_status_code
    };

    //Set the session values start 
    def attrMap = ['status_desc':CODIGO, 'status_code':String.valueOf(MENSAJE)]
    flowFile = session.putAllAttributes(flowFile, attrMap)
    session.transfer(flowFile, REL_SUCCESS)
} catch (e){
    log.error('Scripting error', e)
    flowFile = session.putAttribute(flowFile, "error", e.getMessage())
    session.transfer(flowFile, REL_FAILURE)
} finally {
    if (conn != null) conn.close();
    if (sql != null) sql.close();
}

 
In my case I use SQL.instance with a DBCPConnectionPool but it should be the same for this case.


2.jpg

Solution
3.png

Issue
4.png

Dummy procedure:

CREATE OR REPLACE 
PACKAGE PRUEBA_CONCEPTO AS 

  /* TODO enter package declarations (types, exceptions, methods etc) here */ 
    PROCEDURE PROC_DUMMY_DUAL ( PRIMER_PARAMETRO IN VARCHAR2 
                              , SEGUNDO_PARAMETRO IN NUMBER 
                              , CODIGO OUT NUMBER 
                              , MENSAJE OUT VARCHAR2 
                            );
END PRUEBA_CONCEPTO;
/
CREATE OR REPLACE
PACKAGE BODY PRUEBA_CONCEPTO AS

  PROCEDURE PROC_DUMMY_DUAL ( PRIMER_PARAMETRO IN VARCHAR2 
                              , SEGUNDO_PARAMETRO IN NUMBER 
                              , CODIGO OUT NUMBER 
                              , MENSAJE OUT VARCHAR2 
                            ) AS
  BEGIN
    -- TAREA: Se necesita implantación para PROCEDURE PRUEBA_CONCEPTO.PROC_DUMMY_DUAL
    CODIGO := SEGUNDO_PARAMETRO;
    MENSAJE := PRIMER_PARAMETRO;
  END PROC_DUMMY_DUAL;

END PRUEBA_CONCEPTO;


Test Procedure

SET SERVEROUTPUT ON;
DECLARE
PRIMER_PARAMETRO VARCHAR2(100);
SEGUNDO_PARAMETRO NUMBER;
CODIGO NUMBER;
MENSAJE VARCHAR2(100);
BEGIN

PRIMER_PARAMETRO := 'eSTO ES UN MENSAJE';
SEGUNDO_PARAMETRO := 1;

PRUEBA_CONCEPTO.PROC_DUMMY_DUAL(PRIMER_PARAMETRO, SEGUNDO_PARAMETRO, CODIGO, MENSAJE);

DBMS_OUTPUT.PUT_LINE('CODIGO : ' || CODIGO);
DBMS_OUTPUT.PUT_LINE('MENSAJE : ' || MENSAJE);

END;