Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Please see the Cloudera blog for information on the Cloudera Response to CVE-2021-4428

Nifi Execute script - Table locked

Hi,

I have Nifi workflow with execute script ( Groovy). I have the Insert stmt inside the script to insert rows in to SQL table. this flow is working fine but when this flow running, it locked the entire table and i couldnt read the value of that table. does any one experience the same issue ? if so please help me to solve this issue

3 REPLIES 3

Super Guru

Can you share your script (or at least the relevant parts)? Also, what database are you using and which version of NiFi and the JDBC driver for your database?

Thanks for your response. please find the code snapshot.


def tquery=    " IF OBJECT_ID('tempdb..#tmpTable') IS NOT NULL" +
        "  DROP TABLE #tmpTable "  +
        "     SELECT -1   [ID]     "+
        "     ,GETDATE() [AS_OF_DATETIME]       ,"+
        "     'test' [REDIS_KEY]      ,"+
        "     [SRC_SYSTEM_CODE]      ,"+
        "     [SRC_GLOBAL__ID]     "+
        "    into #tmpTable"+
        "     FROM OPENJSON( %msg% ) "+
        "     with ("+
        "       [SRC_SYSTEM_CODE] [varchar](60) '\$.payload.trade.contextual.srcSystemCode',"+
        "       [SRC_GLOBAL_TRADE_ID] [varchar](40) '\$.payload.trade.contextual.srcGlobalTradeId',"+
        "       [SRC_PARENT_TRADE_ID] [varchar](40) '\$.payload.trade.contextual.srcParentTradeId')"+
        "     INSERT into [dbo].[ttable]"+
        "     "+
        "     SELECT (SELECT ISNULL((SELECT skt.SK_KEY FROM [dbo].keytable skt WHERE skt.SRC_SYSTEM_CODE =tt.[SRC_SYSTEM_CODE] and "+
        "     skt.NATURAL_KEY=tt.[SRC_GLOBAL_TRADE_ID]) , -1))   [XVA_TRADE_ID]      "+
        "     ,[TRADE_UPDATE_DATETIME] [AS_OF_DATETIME]       ,"+
        "     'test' [REDIS_KEY]      ,"+
        "     [SRC_SYSTEM_CODE]      ,"+
        "     [SRC_GLOBAL_TRADE_ID]     "

def sqlconnection


def getConnection = { ->
    try{
        def lookup = context.controllerServiceLookup
        def dbServiceName = databaseConnectionPoolName.value
        def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
            cs -> lookup.getControllerServiceName(cs) == dbServiceName
        }
        lookup.getControllerService(dbcpServiceId)?.getConnection()
    }

    catch(Exception ex){

    }
}


try{
    def flowFile = session.get();
    if (!flowFile)
        returnsession.read(flowFile, {inputStream ->
        try{
                   log.info "opening up SQL connection"
         sqlconnection=getConnection();

            log.info "reading query from config"
   String     querystring = tquery;
            log.info querystring

            log.info "reading input stream"String content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
            log.info content
            querystring= querystring.replace("%msg%","'"+ content +"'")

            log.info querystring

            log.info "inserting trade values to reporting DB"def sql = new Sql(sqlconnection)
            def xy= sql.execute(querystring)
            log.info xy
        }


        catch(Exception ex)
        {
            log.info ("error + " + ex.toString())
        }
    } as InputStreamCallback)
    session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception ex)
{
    log.info 'error' + ex.toString()
    session.transfer(flowFile, REL_FAILURE)
}
finally{
    sqlconnection?.close()
}

i found the issue, table got locked by some external process. nothing wrong with Nifi execute script. Thanks for your response Matt .