Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

How to generate a variable under conditions for another processor?

avatar
Contributor

Hello, everybody.

I hope you guys will please help me. I have to store data in many different kafka topics in order to not have dozens of publishkafta processors.

I thought I could use a variable in the topic name property. My question is as follows, if the log lines I receive have to meet various conditions to be assigned to a topic or other topic. How it could generate the attribute that will serve as a variable to the publkafka processor.

Example:

- adult,dog,bulldog,9,23,male,brown,4,etc

If my log line contains two terms, let's say 'cat' and 'adult' will have to generate the variable 'CAT1' for publishkafka.

If my log line contains the term 'adult' or 'puppy' and also the term 'dog' will have to generate the variable' DOGALL' for publkafka.

I understand that I could use the executescript processor.

Someone can help me with the problem, please.

Greetings

1 ACCEPTED SOLUTION

avatar
Master Guru

Hi @xav webmaster, i think this answer will help you for sure,

Sample Flow :-

GenerateFlowFile--> ExtractText-->UpdateAttribute-->PublishKafka

GenerateFlowFile:- As for testing purpose i'm using this but in your case you are having some other processors

ExtractText Processor:- in this processor i'm extracting the contents of flowfile as attribute.

Ex:- adult,dog,bulldog,9,23,male,brown,4,etc 

The above content of flowfile by adding new property to ExtractText i'm going to extract the content and keeping that as attribute of the flowfile

cnt_attr as

 (.*) //capture everything and add to ff as cnt_attr

Configs:-

39826-extract-text.png

Output of this processor:-

Every flowfile will associated with the attribute called cnt_attr to it, we can use this attribute in

UpdateAttribute Processor:-

To dynamically change the topic names based on the cnt_attr attribute, for this case we need to use Advanced Usage of Update Attribute processor.

Right Click on UpdateAttribute processor and click on Advanced Button in the lower right corner.

Steps:-

39824-update-attr-adult-dog.png

open above screenshot in new tab to see 1,2,3,4 steps and refer them with below steps

1. As mentioned in the above screenshot click on FlowFile Policy change to

UseOriginal

2. Click on + sign at Rules and give name as adult_dog

3. Click on + sign at Conditions and give our check condition in it

${cnt_attrt:matches('.*adult.*dog.*')}

4. Click on + sign at Actions and give the attribute name as

kafka_topic

Value for the kafka_topic attribute as

adult_dog

New Rule:-

for cat_dog

conditions check is ${cnt_attr:matches('.*cat.*dog.*')} and Actions add attribute name as kafka_topic and value is cat_dog same as 2,3,4 steps above.

summarize all the steps:-

step 1 we are using original flowfile and 
step2 we are creating a rule and 
step3 adding conditions to check if the cnt_attr attribute satisfies or not 
step4 if it satisfies then adding kafka_topic attribute with desired name to it.

like this way we can add as many rules as we want in same UpdateAttribute Processor as you can see in my screenshot i have added 2 Rules(adult_dog,cat_dog).

This processor checks which Rule has satisfied and updates kafka_topic attribute with the mentioned name in it.

PublishKafka:-

use the kafka_topic attribute in Topic Name property of processor

${kafka_topic}

39825-publish-kafka.png

Flow Screenshot:-

39827-flow-kafka.png

In this way we can use only one UpdateAttribute to dynamically change the value of kafka_topic based on update attribute processor and use same kafka_topic attribute to publish messages to respective topics.

View solution in original post

8 REPLIES 8

avatar
Master Guru
@xav webmaster

We can do that by using RouteonContent processor followed by UpdateAttribute processor.

Routeoncontent:-

change Match Requirement strategy to content must contain match

add the properties

adult_dog as (.*adult.*dog.*)
cat_dog as (.*cat.*dog.*)

Config:-

39815-routecontent.png

Right now we are checking the contents of flowfile and add new properties for all the cases that you need to publish the data to kafka, then use

UpdateAttribute processor:

Add your deserved name to kafka_topic attribute of the flowfile.

add new property

kafka_topic as cat_dog

Configs:-

39816-update-attr.png

PublishKafka:-

every flowfile will have only kafka_topic as the attribute having different topic names in it, make use of kafka_topic in our publish kafka processor.

Change Topic Name property to

${kafka_topic}

Configs:-

39817-publish-kafka.png

Flow screenshot:-

39818-flow-kafka.png

Flow Explanation:-

Routeoncontent //check the content and transfers flowfile to matching relationships.
UpdataAttribute //adding kafka_topic attribute with some name.
PublishKafka //using kafka_topic attribute and publishing the contents to respective topics dynamically.

Like this way we don't have to use any scripts and dynamically publish into kafka topics.


extract-text.png

avatar
Contributor

Hello shu, as usual your answer is very clear and well worked. In fact what you say is effective, but unfortunately it would not be practical for my particular case. While it is true that using this method you describe, only one publkafka processor is required. It will generate as many updateattribute processors as there are kafka topics. Please excuse me if my way of expressing myself in the question was not the most appropriate. finally the goal I'm looking for is not to have dozens of processors. Thank you for your help, I always learn something from your answers.

avatar
Contributor

At the moment with my little knowledge of nifi, I'm trying to generate an attribute from the executescript processor. At the moment I am able to read the content of the flowfile and have generated conditions using' if' and' else'. Unfortunately, at the moment I am not able to generate the dynamic attribute. so that the flowfile that comes out, comes out as it is, but with a dynamic attribute that allows pusblishkafka to derive the content.

avatar

Hi @xav webmaster

Have you looked to rules feature in the UpdateAttribute processor ? it's available in the advanced configuration section : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-update-attribute-nar/1.4.0/or...

You can extract information that you want and add them as an attribute (using extract processors). Once you have information that you want as an attribute, you can use UpdateAttribute with rules to add/update a new attribute 'topic' and set the value following you conditions.

Is this helpful ?

avatar

@xav webmaster

to have an idea on how this works, look at this answer https://community.hortonworks.com/questions/140060/nifi-how-to-load-a-value-in-memory-one-time-from-...

It's not the same subject but it can give you an idea on how you can use this feature

avatar
Contributor

Hi Abdelkrim Hadjidj, thanks for the help. Unfortunately, it's not a solution to the problem I have. Since as a target I want to reduce the number of processors to use. Having many kafta topics, I think the solution may be to use the executescript processor. Since if I generate attributes in another way, I will end up having only one publisher, but many updateattribute. Greetings

avatar

@xav webmaster

You can use one UpdateAttribute with several rules. if x then topic = 'A', if y then topic = 'B' etc

You can add rules and define an action for each rule which set the correct topic value

avatar
Master Guru

Hi @xav webmaster, i think this answer will help you for sure,

Sample Flow :-

GenerateFlowFile--> ExtractText-->UpdateAttribute-->PublishKafka

GenerateFlowFile:- As for testing purpose i'm using this but in your case you are having some other processors

ExtractText Processor:- in this processor i'm extracting the contents of flowfile as attribute.

Ex:- adult,dog,bulldog,9,23,male,brown,4,etc 

The above content of flowfile by adding new property to ExtractText i'm going to extract the content and keeping that as attribute of the flowfile

cnt_attr as

 (.*) //capture everything and add to ff as cnt_attr

Configs:-

39826-extract-text.png

Output of this processor:-

Every flowfile will associated with the attribute called cnt_attr to it, we can use this attribute in

UpdateAttribute Processor:-

To dynamically change the topic names based on the cnt_attr attribute, for this case we need to use Advanced Usage of Update Attribute processor.

Right Click on UpdateAttribute processor and click on Advanced Button in the lower right corner.

Steps:-

39824-update-attr-adult-dog.png

open above screenshot in new tab to see 1,2,3,4 steps and refer them with below steps

1. As mentioned in the above screenshot click on FlowFile Policy change to

UseOriginal

2. Click on + sign at Rules and give name as adult_dog

3. Click on + sign at Conditions and give our check condition in it

${cnt_attrt:matches('.*adult.*dog.*')}

4. Click on + sign at Actions and give the attribute name as

kafka_topic

Value for the kafka_topic attribute as

adult_dog

New Rule:-

for cat_dog

conditions check is ${cnt_attr:matches('.*cat.*dog.*')} and Actions add attribute name as kafka_topic and value is cat_dog same as 2,3,4 steps above.

summarize all the steps:-

step 1 we are using original flowfile and 
step2 we are creating a rule and 
step3 adding conditions to check if the cnt_attr attribute satisfies or not 
step4 if it satisfies then adding kafka_topic attribute with desired name to it.

like this way we can add as many rules as we want in same UpdateAttribute Processor as you can see in my screenshot i have added 2 Rules(adult_dog,cat_dog).

This processor checks which Rule has satisfied and updates kafka_topic attribute with the mentioned name in it.

PublishKafka:-

use the kafka_topic attribute in Topic Name property of processor

${kafka_topic}

39825-publish-kafka.png

Flow Screenshot:-

39827-flow-kafka.png

In this way we can use only one UpdateAttribute to dynamically change the value of kafka_topic based on update attribute processor and use same kafka_topic attribute to publish messages to respective topics.