<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question How to Remove a Cache Entry Identifier From DistributedMapCacheServer in Support Questions</title>
    <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232433#M194272</link>
    <description>&lt;P&gt;Hi all!&lt;/P&gt;&lt;P&gt;NiFi newbie attacks again!&lt;/P&gt;&lt;P&gt;Today I have a question about using DistributedMapCacheServer.&lt;/P&gt;&lt;P&gt;We have the following scenario:&lt;/P&gt;&lt;P&gt;- We will have a lot of incoming data but we don't want to process two (or more) identifiers at the same time.If a second object with the same identifier come from data flow (FlowFile), we will have to discard it until the other identifier runs through the whole process.&lt;/P&gt;&lt;P&gt;Then we find the DetectDuplicate Processor and it's working perfect (as you can see in the image below and the template is here -&amp;gt; &lt;A href="https://community.cloudera.com/legacyfs/online/attachments/16759-detect-duplicate-v1.xml" target="_blank"&gt;detect-duplicate-v1.xml&lt;/A&gt;).&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="16756-flow-1.png" style="width: 1404px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14837i687446946072AC83/image-size/medium?v=v2&amp;amp;px=400" role="button" title="16756-flow-1.png" alt="16756-flow-1.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;But the problem is that after the whole process execute, we will have to free the identifier from DistributedMapCacheServer.&lt;/P&gt;&lt;P&gt;We know that the DetectDuplicate Processor has the propertie to clear the cache (Age Off Duration), but it uses time instead of an event to clear the cache, then that propertie doesn't suit for our use case.&lt;/P&gt;&lt;DIV&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="16760-detectduplicate-properties.png" style="width: 801px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14838i94386DFEECBFD2C1/image-size/medium?v=v2&amp;amp;px=400" role="button" title="16760-detectduplicate-properties.png" alt="16760-detectduplicate-properties.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;/DIV&gt;
&lt;DIV&gt;Now we are trying to finding a way to Remove the identifier from DistributedMapCacheServer like the flow below.&lt;/DIV&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="16757-flow-2-ideal-flow.png" style="width: 1466px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14839i03AD1817506B3527/image-size/medium?v=v2&amp;amp;px=400" role="button" title="16757-flow-2-ideal-flow.png" alt="16757-flow-2-ideal-flow.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;I searched in NiFi docs and internet and I don't found any processor to remove a cached identifier. The only thing that I find was &lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/103"&gt;@Matt&lt;/a&gt; Burgess article &lt;A href="https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html" target="_blank" rel="nofollow noopener noreferrer"&gt;https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html&lt;/A&gt; but it uses a script and I don't know how to use it.&lt;/P&gt;&lt;P&gt;I am missing something?&lt;/P&gt;&lt;P&gt;Any help will be much  appreciate!&lt;/P&gt;</description>
    <pubDate>Sun, 18 Aug 2019 00:25:53 GMT</pubDate>
    <dc:creator>gabrielfqueiroz</dc:creator>
    <dc:date>2019-08-18T00:25:53Z</dc:date>
    <item>
      <title>How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232433#M194272</link>
      <description>&lt;P&gt;Hi all!&lt;/P&gt;&lt;P&gt;NiFi newbie attacks again!&lt;/P&gt;&lt;P&gt;Today I have a question about using DistributedMapCacheServer.&lt;/P&gt;&lt;P&gt;We have the following scenario:&lt;/P&gt;&lt;P&gt;- We will have a lot of incoming data but we don't want to process two (or more) identifiers at the same time.If a second object with the same identifier come from data flow (FlowFile), we will have to discard it until the other identifier runs through the whole process.&lt;/P&gt;&lt;P&gt;Then we find the DetectDuplicate Processor and it's working perfect (as you can see in the image below and the template is here -&amp;gt; &lt;A href="https://community.cloudera.com/legacyfs/online/attachments/16759-detect-duplicate-v1.xml" target="_blank"&gt;detect-duplicate-v1.xml&lt;/A&gt;).&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="16756-flow-1.png" style="width: 1404px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14837i687446946072AC83/image-size/medium?v=v2&amp;amp;px=400" role="button" title="16756-flow-1.png" alt="16756-flow-1.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;But the problem is that after the whole process execute, we will have to free the identifier from DistributedMapCacheServer.&lt;/P&gt;&lt;P&gt;We know that the DetectDuplicate Processor has the propertie to clear the cache (Age Off Duration), but it uses time instead of an event to clear the cache, then that propertie doesn't suit for our use case.&lt;/P&gt;&lt;DIV&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="16760-detectduplicate-properties.png" style="width: 801px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14838i94386DFEECBFD2C1/image-size/medium?v=v2&amp;amp;px=400" role="button" title="16760-detectduplicate-properties.png" alt="16760-detectduplicate-properties.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;/DIV&gt;
&lt;DIV&gt;Now we are trying to finding a way to Remove the identifier from DistributedMapCacheServer like the flow below.&lt;/DIV&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="16757-flow-2-ideal-flow.png" style="width: 1466px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14839i03AD1817506B3527/image-size/medium?v=v2&amp;amp;px=400" role="button" title="16757-flow-2-ideal-flow.png" alt="16757-flow-2-ideal-flow.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;I searched in NiFi docs and internet and I don't found any processor to remove a cached identifier. The only thing that I find was &lt;a href="https://community.cloudera.com/t5/user/viewprofilepage/user-id/103"&gt;@Matt&lt;/a&gt; Burgess article &lt;A href="https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html" target="_blank" rel="nofollow noopener noreferrer"&gt;https://community.hortonworks.com/articles/71837/working-with-a-nifi-distributedmapcache.html&lt;/A&gt; but it uses a script and I don't know how to use it.&lt;/P&gt;&lt;P&gt;I am missing something?&lt;/P&gt;&lt;P&gt;Any help will be much  appreciate!&lt;/P&gt;</description>
      <pubDate>Sun, 18 Aug 2019 00:25:53 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232433#M194272</guid>
      <dc:creator>gabrielfqueiroz</dc:creator>
      <dc:date>2019-08-18T00:25:53Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232434#M194273</link>
      <description>&lt;P&gt;Hello &lt;A rel="user" href="https://community.cloudera.com/users/20327/gabrielfqueiroz.html" nodeid="20327"&gt;@Gabriel Queiroz&lt;/A&gt;&lt;/P&gt;&lt;P&gt;I'm surprised to know that there's no existing processor that removes a key from distributed map cache. Would you submit a JIRA issue to request that functionality if possible?&lt;/P&gt;&lt;P&gt;In the mean while, if you encounter such shortcomings, you can address it by writing a custom processor with ExecuteScript or InvokeScriptedProcessor in most cases. Those processors let you write custom processor using your favorite scripting engine.&lt;/P&gt;&lt;P&gt;I've written an example, using Groovy to remove a key from distributed map cache. It will work with your use-case I think.&lt;/P&gt;&lt;P&gt;&lt;A href="https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772" target="_blank"&gt;https://gist.github.com/ijokarumawak/14d560fec5a052b3a157b38a11955772&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 10 Jul 2017 08:47:01 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232434#M194273</guid>
      <dc:creator>kkawamura</dc:creator>
      <dc:date>2017-07-10T08:47:01Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232435#M194274</link>
      <description>&lt;P&gt;Hi @&lt;A href="https://community.hortonworks.com/users/3908/kkawamura.html"&gt;kkawamura&lt;/A&gt;,&lt;/P&gt;&lt;P&gt;I updated my template using your solution and it's working perfect! 
&lt;/P&gt;&lt;P&gt;here is the template -&amp;gt; &lt;A href="https://community.cloudera.com/legacyfs/online/attachments/16784-detect-duplicate-v2-with-remove-cache.xml"&gt;detect-duplicate-v2-with-remove-cache.xml&lt;/A&gt;&lt;/P&gt;&lt;P&gt;How can I submit a Jira?&lt;/P&gt;&lt;P&gt;Thank you again!&lt;/P&gt;</description>
      <pubDate>Tue, 11 Jul 2017 01:09:55 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232435#M194274</guid>
      <dc:creator>gabrielfqueiroz</dc:creator>
      <dc:date>2017-07-11T01:09:55Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232436#M194275</link>
      <description>&lt;P&gt;Hi &lt;A rel="user" href="https://community.cloudera.com/users/20327/gabrielfqueiroz.html" nodeid="20327" target="_blank"&gt;@Gabriel Queiroz&lt;/A&gt;&lt;/P&gt;&lt;P&gt;In order to submit a JIRA, please go to the login page and sign-up your account. Then you'll see a red 'Create' button.&lt;/P&gt;&lt;P&gt;&lt;A href="https://issues.apache.org/jira/login.jsp" rel="nofollow noopener noreferrer" target="_blank"&gt;https://issues.apache.org/jira/login.jsp&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="19381-jira-sign-up.png" style="width: 1144px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/14836iD4564AF94CC03519/image-size/medium?v=v2&amp;amp;px=400" role="button" title="19381-jira-sign-up.png" alt="19381-jira-sign-up.png" /&gt;&lt;/span&gt;&lt;/P&gt;</description>
      <pubDate>Sun, 18 Aug 2019 00:25:35 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232436#M194275</guid>
      <dc:creator>kkawamura</dc:creator>
      <dc:date>2019-08-18T00:25:35Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232437#M194276</link>
      <description>&lt;P&gt;Hi &lt;A rel="user" href="https://community.cloudera.com/users/3908/kkawamura.html" nodeid="3908"&gt;@kkawamura&lt;/A&gt;,&lt;/P&gt;&lt;P&gt;I openned the Jira &lt;A href="https://issues.apache.org/jira/browse/NIFI-4173"&gt;https://issues.apache.org/jira/browse/NIFI-4173&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Thank you!&lt;/P&gt;</description>
      <pubDate>Tue, 11 Jul 2017 19:24:29 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232437#M194276</guid>
      <dc:creator>gabrielfqueiroz</dc:creator>
      <dc:date>2017-07-11T19:24:29Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232438#M194277</link>
      <description>&lt;P&gt;Great, thank you very much!&lt;/P&gt;</description>
      <pubDate>Tue, 11 Jul 2017 19:25:59 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232438#M194277</guid>
      <dc:creator>kkawamura</dc:creator>
      <dc:date>2017-07-11T19:25:59Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232439#M194278</link>
      <description>&lt;P&gt;Hi &lt;A rel="user" href="https://community.cloudera.com/users/20327/gabrielfqueiroz.html" nodeid="20327"&gt;@Gabriel Queiroz&lt;/A&gt;&lt;/P&gt;&lt;P&gt;I have created a processor which removes the entry from cache on event basis. Let me know if I can add this to NiFi.&lt;/P&gt;&lt;P&gt;Thanks,&lt;/P&gt;&lt;P&gt;Ghanashyam&lt;/P&gt;</description>
      <pubDate>Thu, 10 Jan 2019 15:28:01 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/232439#M194278</guid>
      <dc:creator>ghanashyam90</dc:creator>
      <dc:date>2019-01-10T15:28:01Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/290547#M214944</link>
      <description>&lt;P&gt;All ,&lt;/P&gt;&lt;P&gt;If you are like me where this script was working perfectly until you upgraded to NIFI 1.9&amp;nbsp; where groovy scripts don't seem to work . Here is my custom processor :&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;1. Add the Dependency to the NAR module&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;&amp;lt;dependency&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;groupId&amp;gt;&lt;/SPAN&gt;org.apache.nifi&lt;SPAN&gt;&amp;lt;/groupId&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;artifactId&amp;gt;&lt;/SPAN&gt;nifi-standard-nar&lt;SPAN&gt;&amp;lt;/artifactId&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;version&amp;gt;&lt;/SPAN&gt;1.9.0&lt;SPAN&gt;&amp;lt;/version&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;type&amp;gt;&lt;/SPAN&gt;nar&lt;SPAN&gt;&amp;lt;/type&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;/dependency&amp;gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;2. Add dependency to the processor module&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;&amp;lt;dependency&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;groupId&amp;gt;&lt;/SPAN&gt;org.apache.nifi&lt;SPAN&gt;&amp;lt;/groupId&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;artifactId&amp;gt;&lt;/SPAN&gt;nifi-standard-processors&lt;SPAN&gt;&amp;lt;/artifactId&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;version&amp;gt;&lt;/SPAN&gt;1.9.0&lt;SPAN&gt;&amp;lt;/version&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    &amp;lt;scope&amp;gt;&lt;/SPAN&gt;provided&lt;SPAN&gt;&amp;lt;/scope&amp;gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;lt;/dependency&amp;gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;3.&amp;nbsp; Re-purpose the PutDistributedcache processor code to remove entry&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.commons.lang3.StringUtils&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.behavior.&lt;SPAN&gt;EventDriven&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.behavior.&lt;SPAN&gt;InputRequirement&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.behavior.&lt;SPAN&gt;InputRequirement&lt;/SPAN&gt;.Requirement&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.behavior.&lt;SPAN&gt;SupportsBatching&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.behavior.&lt;SPAN&gt;WritesAttribute&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.documentation.&lt;SPAN&gt;CapabilityDescription&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.documentation.&lt;SPAN&gt;SeeAlso&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.annotation.documentation.&lt;SPAN&gt;Tags&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.components.PropertyDescriptor&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.distributed.cache.client.Deserializer&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.distributed.cache.client.DistributedMapCacheClient&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.distributed.cache.client.Serializer&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.distributed.cache.client.exception.DeserializationException&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.distributed.cache.client.exception.SerializationException&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.expression.AttributeExpression.ResultType&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.expression.ExpressionLanguageScope&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.flowfile.FlowFile&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.logging.ComponentLog&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.processor.AbstractProcessor&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.processor.ProcessContext&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.processor.ProcessSession&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.processor.Relationship&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.processor.exception.ProcessException&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;org.apache.nifi.processor.util.StandardValidators&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;java.io.IOException&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;java.io.OutputStream&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;java.nio.charset.StandardCharsets&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;import &lt;/SPAN&gt;java.util.*&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;@EventDriven&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;@SupportsBatching&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;@Tags&lt;/SPAN&gt;({&lt;SPAN&gt;"map"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"cache"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"put"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"distributed"&lt;/SPAN&gt;})&lt;BR /&gt;&lt;SPAN&gt;@InputRequirement&lt;/SPAN&gt;(Requirement.&lt;SPAN&gt;INPUT_REQUIRED&lt;/SPAN&gt;)&lt;BR /&gt;&lt;SPAN&gt;@CapabilityDescription&lt;/SPAN&gt;(&lt;SPAN&gt;"Gets the content of a FlowFile and removes a distributed map cache, using a cache key "&lt;/SPAN&gt;)&lt;BR /&gt;&lt;SPAN&gt;@WritesAttribute&lt;/SPAN&gt;(&lt;SPAN&gt;attribute &lt;/SPAN&gt;= &lt;SPAN&gt;"cached"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;description &lt;/SPAN&gt;= &lt;SPAN&gt;"All FlowFiles will have an attribute 'cached'. The value of this " &lt;/SPAN&gt;+ &lt;SPAN&gt;"attribute is true, is the FlowFile is cached, otherwise false."&lt;/SPAN&gt;)&lt;BR /&gt;&lt;SPAN&gt;@SeeAlso&lt;/SPAN&gt;(&lt;SPAN&gt;classNames &lt;/SPAN&gt;= {&lt;SPAN&gt;"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"org.apache.nifi.processors.standard.FetchDistributedMapCache"&lt;/SPAN&gt;})&lt;BR /&gt;&lt;SPAN&gt;public class &lt;/SPAN&gt;RemoveDistributedMapCache &lt;SPAN&gt;extends &lt;/SPAN&gt;AbstractProcessor&lt;BR /&gt;{&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;// Identifies the distributed map cache client&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;public static final &lt;/SPAN&gt;PropertyDescriptor &lt;SPAN&gt;DISTRIBUTED_CACHE_SERVICE &lt;/SPAN&gt;= &lt;SPAN&gt;new &lt;/SPAN&gt;PropertyDescriptor.Builder()&lt;BR /&gt;            .name(&lt;SPAN&gt;"Distributed Cache Service"&lt;/SPAN&gt;)&lt;BR /&gt;            .description(&lt;SPAN&gt;"The Controller Service that is used to cache flow files"&lt;/SPAN&gt;)&lt;BR /&gt;            .required(&lt;SPAN&gt;true&lt;/SPAN&gt;)&lt;BR /&gt;            .identifiesControllerService(DistributedMapCacheClient.&lt;SPAN&gt;class&lt;/SPAN&gt;)&lt;BR /&gt;            .build()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;// Selects the FlowFile attribute, whose value is used as cache key&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;public static final &lt;/SPAN&gt;PropertyDescriptor &lt;SPAN&gt;CACHE_ENTRY_IDENTIFIER &lt;/SPAN&gt;= &lt;SPAN&gt;new &lt;/SPAN&gt;PropertyDescriptor.Builder()&lt;BR /&gt;            .name(&lt;SPAN&gt;"Cache Entry Identifier"&lt;/SPAN&gt;)&lt;BR /&gt;            .description(&lt;SPAN&gt;"A FlowFile attribute, or the results of an Attribute Expression Language statement, which will " &lt;/SPAN&gt;+ &lt;SPAN&gt;"be evaluated against a FlowFile in order to determine the cache key"&lt;/SPAN&gt;)&lt;BR /&gt;            .required(&lt;SPAN&gt;true&lt;/SPAN&gt;)&lt;BR /&gt;            .addValidator(StandardValidators.&lt;SPAN&gt;createAttributeExpressionLanguageValidator&lt;/SPAN&gt;(ResultType.&lt;SPAN&gt;STRING&lt;/SPAN&gt;&lt;SPAN&gt;, true&lt;/SPAN&gt;))&lt;BR /&gt;            .expressionLanguageSupported(ExpressionLanguageScope.&lt;SPAN&gt;FLOWFILE_ATTRIBUTES&lt;/SPAN&gt;)&lt;BR /&gt;            .build()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    public static final &lt;/SPAN&gt;Relationship &lt;SPAN&gt;REL_SUCCESS &lt;/SPAN&gt;= &lt;SPAN&gt;new &lt;/SPAN&gt;Relationship.Builder()&lt;BR /&gt;            .name(&lt;SPAN&gt;"success"&lt;/SPAN&gt;)&lt;BR /&gt;            .description(&lt;SPAN&gt;"Any FlowFile that is successfully inserted into cache will be routed to this relationship"&lt;/SPAN&gt;)&lt;BR /&gt;            .build()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    public static final &lt;/SPAN&gt;Relationship &lt;SPAN&gt;REL_FAILURE &lt;/SPAN&gt;= &lt;SPAN&gt;new &lt;/SPAN&gt;Relationship.Builder()&lt;BR /&gt;            .name(&lt;SPAN&gt;"failure"&lt;/SPAN&gt;)&lt;BR /&gt;            .description(&lt;SPAN&gt;"Any FlowFile that cannot be inserted into the cache will be routed to this relationship"&lt;/SPAN&gt;)&lt;BR /&gt;            .build()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    private final &lt;/SPAN&gt;Set&amp;lt;Relationship&amp;gt; &lt;SPAN&gt;relationships&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    private final &lt;/SPAN&gt;Serializer&amp;lt;String&amp;gt; &lt;SPAN&gt;keySerializer &lt;/SPAN&gt;= &lt;SPAN&gt;new &lt;/SPAN&gt;StringSerializer()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;    public &lt;/SPAN&gt;&lt;SPAN&gt;RemoveDistributedMapCache&lt;/SPAN&gt;()&lt;BR /&gt;    {&lt;BR /&gt;        &lt;SPAN&gt;final &lt;/SPAN&gt;Set&amp;lt;Relationship&amp;gt; rels = &lt;SPAN&gt;new &lt;/SPAN&gt;HashSet&amp;lt;&amp;gt;()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        rels.add(&lt;SPAN&gt;REL_SUCCESS&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        rels.add(&lt;SPAN&gt;REL_FAILURE&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;relationships &lt;/SPAN&gt;= Collections.&lt;SPAN&gt;unmodifiableSet&lt;/SPAN&gt;(rels)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;    }&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;@Override&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;protected &lt;/SPAN&gt;List&amp;lt;PropertyDescriptor&amp;gt; &lt;SPAN&gt;getSupportedPropertyDescriptors&lt;/SPAN&gt;()&lt;BR /&gt;    {&lt;BR /&gt;        &lt;SPAN&gt;final &lt;/SPAN&gt;List&amp;lt;PropertyDescriptor&amp;gt; descriptors = &lt;SPAN&gt;new &lt;/SPAN&gt;ArrayList&amp;lt;&amp;gt;()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        descriptors.add(&lt;SPAN&gt;CACHE_ENTRY_IDENTIFIER&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        descriptors.add(&lt;SPAN&gt;DISTRIBUTED_CACHE_SERVICE&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;        return &lt;/SPAN&gt;descriptors&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;    }&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;@Override&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;public &lt;/SPAN&gt;Set&amp;lt;Relationship&amp;gt; &lt;SPAN&gt;getRelationships&lt;/SPAN&gt;()&lt;BR /&gt;    {&lt;BR /&gt;        &lt;SPAN&gt;return &lt;/SPAN&gt;&lt;SPAN&gt;relationships&lt;/SPAN&gt;&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;    }&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;@Override&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;public void &lt;/SPAN&gt;&lt;SPAN&gt;onTrigger&lt;/SPAN&gt;(&lt;SPAN&gt;final &lt;/SPAN&gt;ProcessContext context&lt;SPAN&gt;, final &lt;/SPAN&gt;ProcessSession session) &lt;SPAN&gt;throws &lt;/SPAN&gt;ProcessException&lt;BR /&gt;    {&lt;BR /&gt;        FlowFile flowFile = session.get()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;        if &lt;/SPAN&gt;(flowFile == &lt;SPAN&gt;null&lt;/SPAN&gt;) {&lt;BR /&gt;            &lt;SPAN&gt;return;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;        &lt;SPAN&gt;final &lt;/SPAN&gt;ComponentLog logger = getLogger()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;// cache key is computed from attribute 'CACHE_ENTRY_IDENTIFIER' with expression language support&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;final &lt;/SPAN&gt;String cacheKey = context&lt;BR /&gt;                .getProperty(&lt;SPAN&gt;CACHE_ENTRY_IDENTIFIER&lt;/SPAN&gt;)&lt;BR /&gt;                .evaluateAttributeExpressions(flowFile)&lt;BR /&gt;                .getValue()&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;// if the computed value is null, or empty, we transfer the flow file to failure relationship&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;if &lt;/SPAN&gt;(StringUtils.&lt;SPAN&gt;isBlank&lt;/SPAN&gt;(cacheKey)) {&lt;BR /&gt;            logger.error(&lt;SPAN&gt;"FlowFile {} has no attribute for given Cache Entry Identifier"&lt;/SPAN&gt;&lt;SPAN&gt;, new &lt;/SPAN&gt;Object[]{flowFile})&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            flowFile = session.penalize(flowFile)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            session.transfer(flowFile&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;REL_FAILURE&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;            return;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;        &lt;SPAN&gt;// the cache client used to interact with the distributed cache&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;final &lt;/SPAN&gt;DistributedMapCacheClient cache = context&lt;BR /&gt;                .getProperty(&lt;SPAN&gt;DISTRIBUTED_CACHE_SERVICE&lt;/SPAN&gt;)&lt;BR /&gt;                .asControllerService(DistributedMapCacheClient.&lt;SPAN&gt;class&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;        try &lt;/SPAN&gt;{&lt;BR /&gt;            &lt;SPAN&gt;// Remove Cache code block.&lt;BR /&gt;&lt;/SPAN&gt;            cache.remove(cacheKey&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;keySerializer&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            session.transfer(flowFile&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;REL_SUCCESS&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;        &lt;SPAN&gt;catch &lt;/SPAN&gt;(&lt;SPAN&gt;final &lt;/SPAN&gt;IOException e) {&lt;BR /&gt;            flowFile = session.penalize(flowFile)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            session.transfer(flowFile&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;REL_FAILURE&lt;/SPAN&gt;)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;            logger.error(&lt;SPAN&gt;"Unable to communicate with cache when processing {} due to {}"&lt;/SPAN&gt;&lt;SPAN&gt;, new &lt;/SPAN&gt;Object[]{flowFile&lt;SPAN&gt;, &lt;/SPAN&gt;e})&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;    }&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;public static class &lt;/SPAN&gt;CacheValueSerializer &lt;SPAN&gt;implements &lt;/SPAN&gt;Serializer&amp;lt;&lt;SPAN&gt;byte&lt;/SPAN&gt;[]&amp;gt;&lt;BR /&gt;    {&lt;BR /&gt;&lt;BR /&gt;        &lt;SPAN&gt;@Override&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;public void &lt;/SPAN&gt;&lt;SPAN&gt;serialize&lt;/SPAN&gt;(&lt;SPAN&gt;final byte&lt;/SPAN&gt;[] bytes&lt;SPAN&gt;, final &lt;/SPAN&gt;OutputStream out) &lt;SPAN&gt;throws &lt;/SPAN&gt;SerializationException&lt;SPAN&gt;, &lt;/SPAN&gt;IOException&lt;BR /&gt;        {&lt;BR /&gt;            out.write(bytes)&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;    }&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;public static class &lt;/SPAN&gt;CacheValueDeserializer &lt;SPAN&gt;implements &lt;/SPAN&gt;Deserializer&amp;lt;&lt;SPAN&gt;byte&lt;/SPAN&gt;[]&amp;gt;&lt;BR /&gt;    {&lt;BR /&gt;&lt;BR /&gt;        &lt;SPAN&gt;@Override&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;public byte&lt;/SPAN&gt;[] &lt;SPAN&gt;deserialize&lt;/SPAN&gt;(&lt;SPAN&gt;final byte&lt;/SPAN&gt;[] input) &lt;SPAN&gt;throws &lt;/SPAN&gt;DeserializationException&lt;SPAN&gt;, &lt;/SPAN&gt;IOException&lt;BR /&gt;        {&lt;BR /&gt;            &lt;SPAN&gt;if &lt;/SPAN&gt;(input == &lt;SPAN&gt;null &lt;/SPAN&gt;|| input.&lt;SPAN&gt;length &lt;/SPAN&gt;== &lt;SPAN&gt;0&lt;/SPAN&gt;) {&lt;BR /&gt;                &lt;SPAN&gt;return null;&lt;BR /&gt;&lt;/SPAN&gt;            }&lt;BR /&gt;            &lt;SPAN&gt;return &lt;/SPAN&gt;input&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;    }&lt;BR /&gt;&lt;BR /&gt;    &lt;SPAN&gt;/**&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;     * Simple string serializer, used for serializing the cache key&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;     */&lt;BR /&gt;&lt;/SPAN&gt;    &lt;SPAN&gt;public static class &lt;/SPAN&gt;StringSerializer &lt;SPAN&gt;implements &lt;/SPAN&gt;Serializer&amp;lt;String&amp;gt;&lt;BR /&gt;    {&lt;BR /&gt;&lt;BR /&gt;        &lt;SPAN&gt;@Override&lt;BR /&gt;&lt;/SPAN&gt;        &lt;SPAN&gt;public void &lt;/SPAN&gt;&lt;SPAN&gt;serialize&lt;/SPAN&gt;(&lt;SPAN&gt;final &lt;/SPAN&gt;String value&lt;SPAN&gt;, final &lt;/SPAN&gt;OutputStream out) &lt;SPAN&gt;throws &lt;/SPAN&gt;SerializationException&lt;SPAN&gt;, &lt;/SPAN&gt;IOException&lt;BR /&gt;        {&lt;BR /&gt;            out.write(value.getBytes(StandardCharsets.&lt;SPAN&gt;UTF_8&lt;/SPAN&gt;))&lt;SPAN&gt;;&lt;BR /&gt;&lt;/SPAN&gt;        }&lt;BR /&gt;    }&lt;BR /&gt;&lt;BR /&gt;}&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;4. Have your nifi admin put the Nar file in the lib folder and restart nifi&lt;/P&gt;</description>
      <pubDate>Wed, 26 Feb 2020 16:34:23 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/290547#M214944</guid>
      <dc:creator>snuffnit</dc:creator>
      <dc:date>2020-02-26T16:34:23Z</dc:date>
    </item>
    <item>
      <title>Re: How to Remove a Cache Entry Identifier From DistributedMapCacheServer</title>
      <link>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/390345#M247248</link>
      <description>&lt;P&gt;This is not working in the case of version 2.0.0-M3, getting the mentioned error,&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;Java.lang.NoClassDefFoundError: Could not initialize class RemoveCache: java.lang.NoClassDefFoundError: Could not initialize class RemoveCache
- Caused by: java.lang.ExceptionInInitializerError: Exception groovy.lang.MissingMethodException&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 15 Jul 2024 10:54:27 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Support-Questions/How-to-Remove-a-Cache-Entry-Identifier-From/m-p/390345#M247248</guid>
      <dc:creator>Shakti</dc:creator>
      <dc:date>2024-07-15T10:54:27Z</dc:date>
    </item>
  </channel>
</rss>

