Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to Remove a Cache Entry Identifier From DistributedMapCacheServer

avatar

Hi all!

NiFi newbie attacks again!

Today I have a question about using DistributedMapCacheServer.

We have the following scenario:

- We will have a lot of incoming data but we don't want to process two (or more) identifiers at the same time.If a second object with the same identifier come from data flow (FlowFile), we will have to discard it until the other identifier runs through the whole process.

Then we find the DetectDuplicate Processor and it's working perfect (as you can see in the image below and the template is here -> detect-duplicate-v1.xml).

16756-flow-1.png

But the problem is that after the whole process execute, we will have to free the identifier from DistributedMapCacheServer.

We know that the DetectDuplicate Processor has the propertie to clear the cache (Age Off Duration), but it uses time instead of an event to clear the cache, then that propertie doesn't suit for our use case.

16760-detectduplicate-properties.png

Now we are trying to finding a way to Remove the identifier from DistributedMapCacheServer like the flow below.

16757-flow-2-ideal-flow.png

I searched in NiFi docs and internet and I don't found any processor to remove a cached identifier. The only thing that I find was @Matt Burgess article https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html but it uses a script and I don't know how to use it.

I am missing something?

Any help will be much appreciate!

1 ACCEPTED SOLUTION

avatar

Hello @Gabriel Queiroz

I'm surprised to know that there's no existing processor that removes a key from distributed map cache. Would you submit a JIRA issue to request that functionality if possible?

In the mean while, if you encounter such shortcomings, you can address it by writing a custom processor with ExecuteScript or InvokeScriptedProcessor in most cases. Those processors let you write custom processor using your favorite scripting engine.

I've written an example, using Groovy to remove a key from distributed map cache. It will work with your use-case I think.

https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772

View solution in original post

7 REPLIES 7

avatar

Hello @Gabriel Queiroz

I'm surprised to know that there's no existing processor that removes a key from distributed map cache. Would you submit a JIRA issue to request that functionality if possible?

In the mean while, if you encounter such shortcomings, you can address it by writing a custom processor with ExecuteScript or InvokeScriptedProcessor in most cases. Those processors let you write custom processor using your favorite scripting engine.

I've written an example, using Groovy to remove a key from distributed map cache. It will work with your use-case I think.

https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772

avatar

Hi @kkawamura,

I updated my template using your solution and it's working perfect!

here is the template -> detect-duplicate-v2-with-remove-cache.xml

How can I submit a Jira?

Thank you again!

avatar

Hi @Gabriel Queiroz

In order to submit a JIRA, please go to the login page and sign-up your account. Then you'll see a red 'Create' button.

https://issues.apache.org/jira/login.jsp

19381-jira-sign-up.png

avatar

avatar

Great, thank you very much!

avatar

Hi @Gabriel Queiroz

I have created a processor which removes the entry from cache on event basis. Let me know if I can add this to NiFi.

Thanks,

Ghanashyam

avatar
New Contributor

All ,

If you are like me where this script was working perfectly until you upgraded to NIFI 1.9  where groovy scripts don't seem to work . Here is my custom processor :

 

1. Add the Dependency to the NAR module

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-nar</artifactId>
<version>1.9.0</version>
<type>nar</type>
</dependency>

 2. Add dependency to the processor module

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>

3.  Re-purpose the PutDistributedcache processor code to remove entry

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

@EventDriven
@SupportsBatching
@Tags({"map", "cache", "put", "distributed"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Gets the content of a FlowFile and removes a distributed map cache, using a cache key ")
@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " + "attribute is true, is the FlowFile is cached, otherwise false.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.FetchDistributedMapCache"})
public class RemoveDistributedMapCache extends AbstractProcessor
{

// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache flow files")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();

// Selects the FlowFile attribute, whose value is used as cache key
public static final PropertyDescriptor CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " + "be evaluated against a FlowFile in order to determine the cache key")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully inserted into cache will be routed to this relationship")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be inserted into the cache will be routed to this relationship")
.build();
private final Set<Relationship> relationships;

private final Serializer<String> keySerializer = new StringSerializer();


public RemoveDistributedMapCache()
{
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors()
{
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CACHE_ENTRY_IDENTIFIER);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
return descriptors;
}

@Override
public Set<Relationship> getRelationships()
{
return relationships;
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException
{
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ComponentLog logger = getLogger();
// cache key is computed from attribute 'CACHE_ENTRY_IDENTIFIER' with expression language support
final String cacheKey = context
.getProperty(CACHE_ENTRY_IDENTIFIER)
.evaluateAttributeExpressions(flowFile)
.getValue();
// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
// the cache client used to interact with the distributed cache
final DistributedMapCacheClient cache = context
.getProperty(DISTRIBUTED_CACHE_SERVICE)
.asControllerService(DistributedMapCacheClient.class);
try {
// Remove Cache code block.
cache.remove(cacheKey, keySerializer);
session.transfer(flowFile, REL_SUCCESS);

}
catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
}
}

public static class CacheValueSerializer implements Serializer<byte[]>
{

@Override
public void serialize(final byte[] bytes, final OutputStream out) throws SerializationException, IOException
{
out.write(bytes);
}
}

public static class CacheValueDeserializer implements Deserializer<byte[]>
{

@Override
public byte[] deserialize(final byte[] input) throws DeserializationException, IOException
{
if (input == null || input.length == 0) {
return null;
}
return input;
}
}

/**
* Simple string serializer, used for serializing the cache key
*/
public static class StringSerializer implements Serializer<String>
{

@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException
{
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}

}

 4. Have your nifi admin put the Nar file in the lib folder and restart nifi