Member since
06-07-2017
6
Posts
0
Kudos Received
0
Solutions
10-19-2017
08:48 PM
Hi All, I have a scenario where i need to consume data from Elastic search and do some processing (like filter, enriching etc). I have to store this processed information to Redis. I have implementd the flow like Scroll ES Processer -- > Execute script . This is working well.As a part of ETL operation, i have to record total pass & failed record count at end of the process few question, 1. how can i track total count of success and failure 2. how can i implement the workflow to wait until all the data consumed from elastic search then update the success and failure count to DB? your help will be much appreciated. Thanks, SK
... View more
Labels:
- Labels:
-
Apache NiFi
06-27-2017
02:52 PM
i found the issue, table got locked by some external process. nothing wrong with Nifi execute script. Thanks for your response Matt .
... View more
06-26-2017
07:41 PM
Thanks for your response. please find the code snapshot.
def tquery= " IF OBJECT_ID('tempdb..#tmpTable') IS NOT NULL" +
" DROP TABLE #tmpTable " +
" SELECT -1 [ID] "+
" ,GETDATE() [AS_OF_DATETIME] ,"+
" 'test' [REDIS_KEY] ,"+
" [SRC_SYSTEM_CODE] ,"+
" [SRC_GLOBAL__ID] "+
" into #tmpTable"+
" FROM OPENJSON( %msg% ) "+
" with ("+
" [SRC_SYSTEM_CODE] [varchar](60) '\$.payload.trade.contextual.srcSystemCode',"+
" [SRC_GLOBAL_TRADE_ID] [varchar](40) '\$.payload.trade.contextual.srcGlobalTradeId',"+
" [SRC_PARENT_TRADE_ID] [varchar](40) '\$.payload.trade.contextual.srcParentTradeId')"+
" INSERT into [dbo].[ttable]"+
" "+
" SELECT (SELECT ISNULL((SELECT skt.SK_KEY FROM [dbo].keytable skt WHERE skt.SRC_SYSTEM_CODE =tt.[SRC_SYSTEM_CODE] and "+
" skt.NATURAL_KEY=tt.[SRC_GLOBAL_TRADE_ID]) , -1)) [XVA_TRADE_ID] "+
" ,[TRADE_UPDATE_DATETIME] [AS_OF_DATETIME] ,"+
" 'test' [REDIS_KEY] ,"+
" [SRC_SYSTEM_CODE] ,"+
" [SRC_GLOBAL_TRADE_ID] "
def sqlconnection
def getConnection = { ->
try{
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
lookup.getControllerService(dbcpServiceId)?.getConnection()
}
catch(Exception ex){
}
}
try{
def flowFile = session.get();
if (!flowFile)
returnsession.read(flowFile, {inputStream ->
try{
log.info "opening up SQL connection" sqlconnection=getConnection();
log.info "reading query from config" String querystring = tquery;
log.info querystring
log.info "reading input stream"String content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8)
log.info content
querystring= querystring.replace("%msg%","'"+ content +"'")
log.info querystring
log.info "inserting trade values to reporting DB"def sql = new Sql(sqlconnection)
def xy= sql.execute(querystring)
log.info xy
}
catch(Exception ex)
{
log.info ("error + " + ex.toString())
}
} as InputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception ex)
{
log.info 'error' + ex.toString()
session.transfer(flowFile, REL_FAILURE)
}
finally{
sqlconnection?.close()
}
... View more
06-26-2017
05:15 PM
Hi, I have Nifi workflow with execute script ( Groovy). I have the Insert stmt inside the script to insert rows in to SQL table. this flow is working fine but when this flow running, it locked the entire table and i couldnt read the value of that table. does any one experience the same issue ? if so please help me to solve this issue
... View more
Labels:
- Labels:
-
Apache NiFi