Created on 07-07-2017 11:41 PM - edited 08-17-2019 05:25 PM
Hi all!
NiFi newbie attacks again!
Today I have a question about using DistributedMapCacheServer.
We have the following scenario:
- We will have a lot of incoming data but we don't want to process two (or more) identifiers at the same time.If a second object with the same identifier come from data flow (FlowFile), we will have to discard it until the other identifier runs through the whole process.
Then we find the DetectDuplicate Processor and it's working perfect (as you can see in the image below and the template is here -> detect-duplicate-v1.xml).
But the problem is that after the whole process execute, we will have to free the identifier from DistributedMapCacheServer.
We know that the DetectDuplicate Processor has the propertie to clear the cache (Age Off Duration), but it uses time instead of an event to clear the cache, then that propertie doesn't suit for our use case.
I searched in NiFi docs and internet and I don't found any processor to remove a cached identifier. The only thing that I find was @Matt Burgess article https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html but it uses a script and I don't know how to use it.
I am missing something?
Any help will be much appreciate!
Created 07-10-2017 01:47 AM
Hello @Gabriel Queiroz
I'm surprised to know that there's no existing processor that removes a key from distributed map cache. Would you submit a JIRA issue to request that functionality if possible?
In the mean while, if you encounter such shortcomings, you can address it by writing a custom processor with ExecuteScript or InvokeScriptedProcessor in most cases. Those processors let you write custom processor using your favorite scripting engine.
I've written an example, using Groovy to remove a key from distributed map cache. It will work with your use-case I think.
https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772
Created 07-10-2017 01:47 AM
Hello @Gabriel Queiroz
I'm surprised to know that there's no existing processor that removes a key from distributed map cache. Would you submit a JIRA issue to request that functionality if possible?
In the mean while, if you encounter such shortcomings, you can address it by writing a custom processor with ExecuteScript or InvokeScriptedProcessor in most cases. Those processors let you write custom processor using your favorite scripting engine.
I've written an example, using Groovy to remove a key from distributed map cache. It will work with your use-case I think.
https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772
Created 07-10-2017 06:09 PM
Hi @kkawamura,
I updated my template using your solution and it's working perfect!
here is the template -> detect-duplicate-v2-with-remove-cache.xml
How can I submit a Jira?
Thank you again!
Created on 07-10-2017 11:47 PM - edited 08-17-2019 05:25 PM
In order to submit a JIRA, please go to the login page and sign-up your account. Then you'll see a red 'Create' button.
https://issues.apache.org/jira/login.jsp
Created 07-11-2017 12:24 PM
Created 07-11-2017 12:25 PM
Great, thank you very much!
Created 01-10-2019 07:28 AM
I have created a processor which removes the entry from cache on event basis. Let me know if I can add this to NiFi.
Thanks,
Ghanashyam
Created on 02-26-2020 08:31 AM - edited 02-26-2020 08:34 AM
All ,
If you are like me where this script was working perfectly until you upgraded to NIFI 1.9 where groovy scripts don't seem to work . Here is my custom processor :
1. Add the Dependency to the NAR module
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-nar</artifactId>
<version>1.9.0</version>
<type>nar</type>
</dependency>
2. Add dependency to the processor module
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
3. Re-purpose the PutDistributedcache processor code to remove entry
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "put", "distributed"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Gets the content of a FlowFile and removes a distributed map cache, using a cache key ")
@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " + "attribute is true, is the FlowFile is cached, otherwise false.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.FetchDistributedMapCache"})
public class RemoveDistributedMapCache extends AbstractProcessor
{
// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache flow files")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
// Selects the FlowFile attribute, whose value is used as cache key
public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " + "be evaluated against a FlowFile in order to determine the cache key")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
.build();
private final Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
public RemoveDistributedMapCache()
{
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors()
{
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CACHE_ENTRY_IDENTIFIER);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
return descriptors;
}
@Override
public Set<Relationship> getRelationships()
{
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException
{
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
// cache key is computed from attribute 'CACHE_ENTRY_IDENTIFIER' with expression language support
final String cacheKey = context
.getProperty(CACHE_ENTRY_IDENTIFIER)
.evaluateAttributeExpressions(flowFile)
.getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
// the cache client used to interact with the distributed cache
final DistributedMapCacheClient cache = context
.getProperty(DISTRIBUTED_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
try {
// Remove Cache code block.
cache.remove(cacheKey, keySerializer);
session.transfer(flowFile, REL_SUCCESS);
}
catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
}
}
public static class CacheValueSerializer implements Serializer<byte[]>
{
@Override
public void serialize(final byte[] bytes, final OutputStream out) throws SerializationException, IOException
{
out.write(bytes);
}
}
public static class CacheValueDeserializer implements Deserializer<byte[]>
{
@Override
public byte[] deserialize(final byte[] input) throws DeserializationException, IOException
{
if (input == null || input.length == 0) {
return null;
}
return input;
}
}
/**
* Simple string serializer, used for serializing the cache key
*/
public static class StringSerializer implements Serializer<String>
{
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException
{
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}
4. Have your nifi admin put the Nar file in the lib folder and restart nifi
Created on 07-15-2024 03:53 AM - edited 07-15-2024 03:54 AM
This is not working in the case of version 2.0.0-M3, getting the mentioned error,
Java.lang.NoClassDefFoundError: Could not initialize class RemoveCache: java.lang.NoClassDefFoundError: Could not initialize class RemoveCache - Caused by: java.lang.ExceptionInInitializerError: Exception groovy.lang.MissingMethodException