Member since
09-04-2019
5
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2051 | 09-12-2019 01:41 AM |
12-04-2019
03:17 AM
Hi,
I tried to insert some data in postgresql db using PUTSQL processor.
Below example of sql statement in putSql properties:
INSERT INTO myTable(date_response, success, code, payload_response) values ('${now()}', '${status:toNumber()}', '${code:toNumber()}', ${payload});
The problem is when this insert is executed, i have this error:
org.postgresql.util.PSQLException: ERROR: column "zxlbavuzvmlvm2x6vedsemrdstzjrnnnzxlbavuzvmlvm2x6swpvz2v5qwlhv1f" does not exist.
What happen ? .... the payload_response column name is replace by the value of my flow file attribute ${payload}. Why ? any hints ?
... View more
Labels:
- Labels:
-
Apache NiFi
09-12-2019
01:41 AM
An issue was created in jira to implement the missing method that was not implemented but declared in Interface. In the meantime, i created a custom processor that use Jedis dependency in order to fetch all entries.
... View more
09-04-2019
10:32 AM
After discussing in Nifi slack, keySet is not implemented in Redis DMCC ... So what is the good way to have it ASAP in order to get all entries in Redis db ? any suggestions .? thanks by advance ....
... View more
09-04-2019
08:20 AM
Hi,
I'm trying to get all entries from a redis cache. When i try to remove, getbykey etc that work but when i call myCache.keySet, i face an issue with following exception: UnsupportedOperationEception on null.
I check it in git source repo and keySet method is implemented. May be there is a mistake in my deserializer ? I would like to know the point of view of Nifi community .
I'm currently stuck as i need to list all entries that are in redis in order to split them in other processor for a specific action in Nifi context.
BElow some piece of code :
- The deserializer
public static class StringDeserializer implements Deserializer<String>{
@Override
public String deserialize(byte[] input) throws DeserializationException, IOException {
if (input == null || input.length == 0) {
return null;
}
return input.toString();
}
}
and piece of code of my custom processor code:
final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE)
.asControllerService( DistributedMapCacheClient.class);
// get all entry in cache
try {
Set<String> entries = cache.keySet(keyDeSerializer); // <== HERE CODE GENERATE ERROR
logger.info("size ===> " + entries.size());
entries.forEach(e->{
logger.info( "********************* " + e.toString());
});
session.transfer(flowFile, REL_SUCCESS);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
session.transfer(flowFile, REL_FAILURE);
}
and to finish the stacktrace:
- failed to process session due to java.lang.UnsupportedOperationException;
- java.lang.UnsupportedOperationException: null at org.apache.nifi.distributed.cache.client.DistributedMapCacheClient.keySet(DistributedMapCacheClient.java:221) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:87) at com.sun.proxy.$Proxy99.keySet(Unknown Source) at com.**********.****.*****.processors.getRedisCache.FetchCacheWithoutInput.onTrigger(FetchCacheWithoutInput.java:121) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1162) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:209) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
... View more
Labels:
- Labels:
-
Apache NiFi