- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
How to generate a variable under conditions for another processor?
- Labels:
-
Apache Kafka
-
Apache NiFi
Created 10-16-2017 08:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello, everybody.
I hope you guys will please help me. I have to store data in many different kafka topics in order to not have dozens of publishkafta processors.
I thought I could use a variable in the topic name property. My question is as follows, if the log lines I receive have to meet various conditions to be assigned to a topic or other topic. How it could generate the attribute that will serve as a variable to the publkafka processor.
Example:
- adult,dog,bulldog,9,23,male,brown,4,etc
If my log line contains two terms, let's say 'cat' and 'adult' will have to generate the variable 'CAT1' for publishkafka.
If my log line contains the term 'adult' or 'puppy' and also the term 'dog' will have to generate the variable' DOGALL' for publkafka.
I understand that I could use the executescript processor.
Someone can help me with the problem, please.
Greetings
Created on 10-17-2017 01:51 AM - edited 08-17-2019 07:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @xav webmaster, i think this answer will help you for sure,
Sample Flow :-
GenerateFlowFile--> ExtractText-->UpdateAttribute-->PublishKafka
GenerateFlowFile:- As for testing purpose i'm using this but in your case you are having some other processors
ExtractText Processor:- in this processor i'm extracting the contents of flowfile as attribute.
Ex:- adult,dog,bulldog,9,23,male,brown,4,etc
The above content of flowfile by adding new property to ExtractText i'm going to extract the content and keeping that as attribute of the flowfile
cnt_attr as
(.*) //capture everything and add to ff as cnt_attr
Configs:-
Output of this processor:-
Every flowfile will associated with the attribute called cnt_attr to it, we can use this attribute in
UpdateAttribute Processor:-
To dynamically change the topic names based on the cnt_attr attribute, for this case we need to use Advanced Usage of Update Attribute processor.
Right Click on UpdateAttribute processor and click on Advanced Button in the lower right corner.
Steps:-
open above screenshot in new tab to see 1,2,3,4 steps and refer them with below steps
1. As mentioned in the above screenshot click on FlowFile Policy change to
UseOriginal
2. Click on + sign at Rules and give name as adult_dog
3. Click on + sign at Conditions and give our check condition in it
${cnt_attrt:matches('.*adult.*dog.*')}
4. Click on + sign at Actions and give the attribute name as
kafka_topic
Value for the kafka_topic attribute as
adult_dog
New Rule:-
for cat_dog
conditions check is ${cnt_attr:matches('.*cat.*dog.*')} and Actions add attribute name as kafka_topic and value is cat_dog same as 2,3,4 steps above.
summarize all the steps:-
step 1 we are using original flowfile and step2 we are creating a rule and step3 adding conditions to check if the cnt_attr attribute satisfies or not step4 if it satisfies then adding kafka_topic attribute with desired name to it.
like this way we can add as many rules as we want in same UpdateAttribute Processor as you can see in my screenshot i have added 2 Rules(adult_dog,cat_dog).
This processor checks which Rule has satisfied and updates kafka_topic attribute with the mentioned name in it.
PublishKafka:-
use the kafka_topic attribute in Topic Name property of processor
${kafka_topic}
Flow Screenshot:-
In this way we can use only one UpdateAttribute to dynamically change the value of kafka_topic based on update attribute processor and use same kafka_topic attribute to publish messages to respective topics.
Created on 10-16-2017 09:58 AM - edited 08-17-2019 07:08 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
We can do that by using RouteonContent processor followed by UpdateAttribute processor.
Routeoncontent:-
change Match Requirement strategy to content must contain match
add the properties
adult_dog as (.*adult.*dog.*) cat_dog as (.*cat.*dog.*)
Config:-
Right now we are checking the contents of flowfile and add new properties for all the cases that you need to publish the data to kafka, then use
UpdateAttribute processor:
Add your deserved name to kafka_topic attribute of the flowfile.
add new property
kafka_topic as cat_dog
Configs:-
PublishKafka:-
every flowfile will have only kafka_topic as the attribute having different topic names in it, make use of kafka_topic in our publish kafka processor.
Change Topic Name property to
${kafka_topic}
Configs:-
Flow screenshot:-
Flow Explanation:-
Routeoncontent //check the content and transfers flowfile to matching relationships. UpdataAttribute //adding kafka_topic attribute with some name. PublishKafka //using kafka_topic attribute and publishing the contents to respective topics dynamically.
Like this way we don't have to use any scripts and dynamically publish into kafka topics.
Created 10-16-2017 10:10 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello shu, as usual your answer is very clear and well worked. In fact what you say is effective, but unfortunately it would not be practical for my particular case. While it is true that using this method you describe, only one publkafka processor is required. It will generate as many updateattribute processors as there are kafka topics. Please excuse me if my way of expressing myself in the question was not the most appropriate. finally the goal I'm looking for is not to have dozens of processors. Thank you for your help, I always learn something from your answers.
Created 10-16-2017 10:16 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
At the moment with my little knowledge of nifi, I'm trying to generate an attribute from the executescript processor. At the moment I am able to read the content of the flowfile and have generated conditions using' if' and' else'. Unfortunately, at the moment I am not able to generate the dynamic attribute. so that the flowfile that comes out, comes out as it is, but with a dynamic attribute that allows pusblishkafka to derive the content.
Created 10-16-2017 01:27 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Have you looked to rules feature in the UpdateAttribute processor ? it's available in the advanced configuration section : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-update-attribute-nar/1.4.0/or...
You can extract information that you want and add them as an attribute (using extract processors). Once you have information that you want as an attribute, you can use UpdateAttribute with rules to add/update a new attribute 'topic' and set the value following you conditions.
Is this helpful ?
Created 10-16-2017 01:43 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
to have an idea on how this works, look at this answer https://community.hortonworks.com/questions/140060/nifi-how-to-load-a-value-in-memory-one-time-from-...
It's not the same subject but it can give you an idea on how you can use this feature
Created 10-16-2017 01:52 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Abdelkrim Hadjidj, thanks for the help. Unfortunately, it's not a solution to the problem I have. Since as a target I want to reduce the number of processors to use. Having many kafta topics, I think the solution may be to use the executescript processor. Since if I generate attributes in another way, I will end up having only one publisher, but many updateattribute. Greetings
Created 10-16-2017 01:57 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
You can use one UpdateAttribute with several rules. if x then topic = 'A', if y then topic = 'B' etc
You can add rules and define an action for each rule which set the correct topic value
Created on 10-17-2017 01:51 AM - edited 08-17-2019 07:07 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi @xav webmaster, i think this answer will help you for sure,
Sample Flow :-
GenerateFlowFile--> ExtractText-->UpdateAttribute-->PublishKafka
GenerateFlowFile:- As for testing purpose i'm using this but in your case you are having some other processors
ExtractText Processor:- in this processor i'm extracting the contents of flowfile as attribute.
Ex:- adult,dog,bulldog,9,23,male,brown,4,etc
The above content of flowfile by adding new property to ExtractText i'm going to extract the content and keeping that as attribute of the flowfile
cnt_attr as
(.*) //capture everything and add to ff as cnt_attr
Configs:-
Output of this processor:-
Every flowfile will associated with the attribute called cnt_attr to it, we can use this attribute in
UpdateAttribute Processor:-
To dynamically change the topic names based on the cnt_attr attribute, for this case we need to use Advanced Usage of Update Attribute processor.
Right Click on UpdateAttribute processor and click on Advanced Button in the lower right corner.
Steps:-
open above screenshot in new tab to see 1,2,3,4 steps and refer them with below steps
1. As mentioned in the above screenshot click on FlowFile Policy change to
UseOriginal
2. Click on + sign at Rules and give name as adult_dog
3. Click on + sign at Conditions and give our check condition in it
${cnt_attrt:matches('.*adult.*dog.*')}
4. Click on + sign at Actions and give the attribute name as
kafka_topic
Value for the kafka_topic attribute as
adult_dog
New Rule:-
for cat_dog
conditions check is ${cnt_attr:matches('.*cat.*dog.*')} and Actions add attribute name as kafka_topic and value is cat_dog same as 2,3,4 steps above.
summarize all the steps:-
step 1 we are using original flowfile and step2 we are creating a rule and step3 adding conditions to check if the cnt_attr attribute satisfies or not step4 if it satisfies then adding kafka_topic attribute with desired name to it.
like this way we can add as many rules as we want in same UpdateAttribute Processor as you can see in my screenshot i have added 2 Rules(adult_dog,cat_dog).
This processor checks which Rule has satisfied and updates kafka_topic attribute with the mentioned name in it.
PublishKafka:-
use the kafka_topic attribute in Topic Name property of processor
${kafka_topic}
Flow Screenshot:-
In this way we can use only one UpdateAttribute to dynamically change the value of kafka_topic based on update attribute processor and use same kafka_topic attribute to publish messages to respective topics.