Member since
12-23-2020
7
Posts
0
Kudos Received
0
Solutions
03-24-2025
05:47 AM
@hus There are two controller services you are using for your map cache: DistributedMapCacheServer - This controller service when started creates a separate map cache server on every node in a NiFi cluster. These map cache servers do not share cached entries between them. In Apache NiFi 2.x+ "Distributed" has been removed from their name to avoid confusion. The "Max cache Entries" and "Eviction Strategy" control how cached entries are removed from the cache. DistributedMapCacheClientService - This Controller Service is used to write data to the specific Map cache server (server hostname). It also has "distributed" removed from its name as of Apache NiFi 2.x. You are using the DetectDuplicate processor to interact with the above Controller services. While the DetectDuplicate processor has a configurable "Age Off Duration" setting, ONLY cached entries where both the following conditions have been met will have the cache entry removed at that configured age off: At least one duplicate has been detected. Age off duration has expired. So any cached entires for which a duplicate has not yet been detected, that entry will remain in the cache server until the "Max cache Entries" and "Eviction Strategy" settings result in the entry removal. So depending on what data you are caching, number set for "max cache Entries", and number of duplicates you detect, your cache server likely continues to grow to max and then eviction starts. If you have a "Persistence Directory" configured, the cached data is also being written to that directory so that it is not lost in the event the NiFi instance or DistributedMapCache server is restarted. This also means hat after a NiFi restart the persisted cache is loaded back into heap memory. Keep in mind that there are other external cache server options that do have HA, are distributed, and would not consume NiFi's heap or memory on the NiFi host if installed on a different server/host. RedisDistributedMapCacheClientService SimpleRedisDistributedMapCacheClientService HazelcastMapCacheClient CouchbaseMapCacheClient - Removed as of Apache NiFi 2.x HBase_2_ClientMapCacheService - Removed as of Apache NiFi 2.x CassandraDistributedMapCache - Removed as of Apache NiFi 2.x Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
02-14-2025
01:22 PM
Thank you very much for your answer 🙂 I tried it and found it worked.
... View more
05-20-2022
11:20 AM
Hello everyone! In the company where I work they had the same problem, this time I had some time to review the script and do a proof of concept to try to solve the problem. In my test, after a few flowfiles the processor generates the oracle database connection error. Searching groovy documentation I found that the example that is floating around the internet is not correctly using the closure for the connection (this incident for example). The solution is to close the instance along with the connection, currently it is trying to close the connection that is declared at the beginning of the script but the instance is ignored. In my tests, when closing the connection instance, the process executed 30,000 flowfiles without generating any problems. Instead, for the definition of this script, after 14,000 flowfiles, the processor started to generate a connection error. You can find more info in the groovy documentancion: Connecting to HSQLDB Well, what was expected arrived... the solution is as follows: You need to add the finally statement and close sql //Imports
import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import java.sql.ResultSet
//Get the conncation string from NIFI conncation pool
def lookup = context.controllerServiceLookup
//Este valor se crea en el procesador con el nombre del controlador de la conexion
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
sql = Sql.newInstance(conn);
////Get the session values from Nifi flow Start
try {
flowFile = session.get()
if(!flowFile) return
PRIMER_PARAMETRO = flowFile.getAttribute('PRIMER_PARAMETRO')
SEGUNDO_PARAMETRO = flowFile.getAttribute('SEGUNDO_PARAMETRO')
// Procedimiento con 2 parametros de entrada y 2 de salida
String sqlString ="""{call PRUEBA_CONCEPTO.PROC_DUMMY_DUAL(?, ?, ?, ?)}""";
//Parametros de salida
def CODIGO
def MENSAJE
// Lista de parametros del procedimiento
def parametersList = [PRIMER_PARAMETRO, SEGUNDO_PARAMETRO, Sql.NUMERIC, Sql.VARCHAR];
//Ejecucion del procedimiento
sql.call(sqlString, parametersList) {out_status_code, out_status_desc ->
CODIGO = out_status_desc
MENSAJE = out_status_code
};
//Set the session values start
def attrMap = ['status_desc':CODIGO, 'status_code':String.valueOf(MENSAJE)]
flowFile = session.putAllAttributes(flowFile, attrMap)
session.transfer(flowFile, REL_SUCCESS)
} catch (e){
log.error('Scripting error', e)
flowFile = session.putAttribute(flowFile, "error", e.getMessage())
session.transfer(flowFile, REL_FAILURE)
} finally {
if (conn != null) conn.close();
if (sql != null) sql.close();
} In my case I use SQL.instance with a DBCPConnectionPool but it should be the same for this case. Solution Issue Dummy procedure: CREATE OR REPLACE
PACKAGE PRUEBA_CONCEPTO AS
/* TODO enter package declarations (types, exceptions, methods etc) here */
PROCEDURE PROC_DUMMY_DUAL ( PRIMER_PARAMETRO IN VARCHAR2
, SEGUNDO_PARAMETRO IN NUMBER
, CODIGO OUT NUMBER
, MENSAJE OUT VARCHAR2
);
END PRUEBA_CONCEPTO;
/
CREATE OR REPLACE
PACKAGE BODY PRUEBA_CONCEPTO AS
PROCEDURE PROC_DUMMY_DUAL ( PRIMER_PARAMETRO IN VARCHAR2
, SEGUNDO_PARAMETRO IN NUMBER
, CODIGO OUT NUMBER
, MENSAJE OUT VARCHAR2
) AS
BEGIN
-- TAREA: Se necesita implantación para PROCEDURE PRUEBA_CONCEPTO.PROC_DUMMY_DUAL
CODIGO := SEGUNDO_PARAMETRO;
MENSAJE := PRIMER_PARAMETRO;
END PROC_DUMMY_DUAL;
END PRUEBA_CONCEPTO; Test Procedure SET SERVEROUTPUT ON;
DECLARE
PRIMER_PARAMETRO VARCHAR2(100);
SEGUNDO_PARAMETRO NUMBER;
CODIGO NUMBER;
MENSAJE VARCHAR2(100);
BEGIN
PRIMER_PARAMETRO := 'eSTO ES UN MENSAJE';
SEGUNDO_PARAMETRO := 1;
PRUEBA_CONCEPTO.PROC_DUMMY_DUAL(PRIMER_PARAMETRO, SEGUNDO_PARAMETRO, CODIGO, MENSAJE);
DBMS_OUTPUT.PUT_LINE('CODIGO : ' || CODIGO);
DBMS_OUTPUT.PUT_LINE('MENSAJE : ' || MENSAJE);
END;
... View more