Created 05-05-2018 11:33 PM
Attributes lost after running SelectHiveQL, do you know why? Is it a bug? And how to fix it?
I have some attributes updated before SelectHiveQL, but all the attributes set disappear after running SelectHiveQL, please help.
Created on 05-06-2018 05:54 PM - edited 08-18-2019 02:50 AM
As per the documentation of SelectHiveQL processor states that If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query. FlowFile attribute 'selecthiveql.row.count' indicates how many rows were selected. But you can file a jira addressing the issue found.
As a work around to fix this issue You can use PutDistributeCacheMap processor to keep all your attributes in cache server and fetch the attributes from cache server using FetchDistributeCacheMap processor.
Sample Flow Example:-
i'm using GenerateFlowFile processor and adding 3 attributes to the flowfile
attr1
56
attr2
67
attr3
89
ReplaceText Processor:-
Search Value
(?s)(^.*$)
Replacement Value
${allAttributes("attr1","attr2","attr3"):join("|")}
//i'm using allAttributes and keeping all the attributes with "|" pipe delimiter(output flowfile will be 1|2|3)
Maximum Buffer Size
1 MB //needs to change the size if the content is more than 1MB size.
Replacement Strategy
Always Replace
Evaluation Mode
Entire text
Use this link to evaluate multiple attributes.
UpdateAttribute:-
We are using this processor to change the filename of flowfile to UUID because We cannot not refer to use UUID as cache identifier reason is output from SelectHiveQL processor is having same filename but different UUID(i.e until selecthiveq processor flowfile having one uniqueid after selecthiveql processor different uuid).**
Add new property as
filename
${UUID()}
PutDistributeCache processor:-
Configure DistributedMapCacheServer,DistributedMapCacheClientService and enable them(you need to change cache number of entriesas per your needs,persistence directory if not mentioned then all the entries will be stored in memory).
Cache Entry Identifier
${filename}
Now we have changed the flowfile content and cached the output content with the filename.
SelectHiveQL processor:-
Feed success relation to SelectHiveQL processor once the processor outputs flowfile with content of the flowfile then feed the success relationship to
FetchDistributeCacheMap:-
Configs:-
Cache Entry Identifier
${filename:substringBefore('.')} //because based on output ff format we are going to have .avro/.csv extensions
Distributed Cache Service
DistributedMapCacheClientService
Put Cache Value In Attribute
cache_value //the cached content will be put in to this attribute instead putting into flowfile content
Max Length To Put In Attribute
256 //needs to change the value if max length is more than this value
Output:-
Flowfile will have attribute called cached.value then you can rebuild all your attributes by using getDelimitedField function
${cached.value:getDelimitedField(1, '|')} //will give attr1 field
Even without rebuilding all the attributes again by using above expression language you can directly pull the required attribute value and use them in your flow.
I have attached my sample flow.xml below, save and upload the xml to your instance and change the as per your needs.
selecthiveql-attributes-188338.xml
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
Created on 05-06-2018 05:54 PM - edited 08-18-2019 02:50 AM
As per the documentation of SelectHiveQL processor states that If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query. FlowFile attribute 'selecthiveql.row.count' indicates how many rows were selected. But you can file a jira addressing the issue found.
As a work around to fix this issue You can use PutDistributeCacheMap processor to keep all your attributes in cache server and fetch the attributes from cache server using FetchDistributeCacheMap processor.
Sample Flow Example:-
i'm using GenerateFlowFile processor and adding 3 attributes to the flowfile
attr1
56
attr2
67
attr3
89
ReplaceText Processor:-
Search Value
(?s)(^.*$)
Replacement Value
${allAttributes("attr1","attr2","attr3"):join("|")}
//i'm using allAttributes and keeping all the attributes with "|" pipe delimiter(output flowfile will be 1|2|3)
Maximum Buffer Size
1 MB //needs to change the size if the content is more than 1MB size.
Replacement Strategy
Always Replace
Evaluation Mode
Entire text
Use this link to evaluate multiple attributes.
UpdateAttribute:-
We are using this processor to change the filename of flowfile to UUID because We cannot not refer to use UUID as cache identifier reason is output from SelectHiveQL processor is having same filename but different UUID(i.e until selecthiveq processor flowfile having one uniqueid after selecthiveql processor different uuid).**
Add new property as
filename
${UUID()}
PutDistributeCache processor:-
Configure DistributedMapCacheServer,DistributedMapCacheClientService and enable them(you need to change cache number of entriesas per your needs,persistence directory if not mentioned then all the entries will be stored in memory).
Cache Entry Identifier
${filename}
Now we have changed the flowfile content and cached the output content with the filename.
SelectHiveQL processor:-
Feed success relation to SelectHiveQL processor once the processor outputs flowfile with content of the flowfile then feed the success relationship to
FetchDistributeCacheMap:-
Configs:-
Cache Entry Identifier
${filename:substringBefore('.')} //because based on output ff format we are going to have .avro/.csv extensions
Distributed Cache Service
DistributedMapCacheClientService
Put Cache Value In Attribute
cache_value //the cached content will be put in to this attribute instead putting into flowfile content
Max Length To Put In Attribute
256 //needs to change the value if max length is more than this value
Output:-
Flowfile will have attribute called cached.value then you can rebuild all your attributes by using getDelimitedField function
${cached.value:getDelimitedField(1, '|')} //will give attr1 field
Even without rebuilding all the attributes again by using above expression language you can directly pull the required attribute value and use them in your flow.
I have attached my sample flow.xml below, save and upload the xml to your instance and change the as per your needs.
selecthiveql-attributes-188338.xml
-
If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.