Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11133 | 04-15-2020 05:01 PM | |
| 7031 | 10-15-2019 08:12 PM | |
| 3075 | 10-12-2019 08:29 PM | |
| 11275 | 09-21-2019 10:04 AM | |
| 4193 | 09-19-2019 07:11 AM |
05-10-2018
11:40 PM
@adam chui
Sure.. I have created a directory called nifi_test in tmp directory. [bash tmp]$ mkdir nifi_test<br>[bash tmp]$ cd nifi_test/
[bash nifi_test]$ touch test.txt
[bash nifi_test]$ touch test1.txt
[bash nifi_test]$ touch test2.txt
[bash nifi_test]$ ll
total 0
-rw-r--r-- 1 nifi nifi 0 May 10 19:16 test1.txt
-rw-r--r-- 1 nifi nifi 0 May 10 19:16 test2.txt
-rw-r--r-- 1 nifi nifi 0 May 10 19:16 test.txt<br> Make sure nifi having access to pull the files in the directory. Let's assume you are having dynamic generated directory attribute value as /tmp/nifi_test/ in middle of the flow. Now we need to fetch all the files that are in /tmp/nifi_test directory Flow:- GenerateFlowFile configs:- i have added new property as directory
/tmp/nifi_test now i'm having a flowfile with directory attribute with /tmp/nifi_test as a value. ExecuteStreamCommand configs: Now i'm passing directory attribute as command attribute and listing all the files in the directory(/tmp/nifi_test) SplitText configs:- When you are having more than one file in the directory use this processor to split into individual flowfile Change the below property value Line Split Count
1 Extract Text Configs:- we need to dynamically pull all the files from the directory so use extract text processor add new property as filename
(.*) in this processor we are extracting flowfile content and keeping for the filename attribute Now we are having directory and filenames in the directory as attributes now. Fetch File Configs:- In File to Fetch property we are using directory and filename attributes to fetch the file/s from the directory, at the end flow screenshot you can see 3 files got fetched from the directory. By following this way we are able to pull files middle of the flow. I have added my flow.xml save/upload xml to your nifi istance and test it out. fetch-files-189935.xml
... View more
05-11-2018
05:03 PM
Thank you, Shu. This worked perfectly, thanks for the additional info as well.
... View more
05-09-2018
12:27 PM
Thanks it worked perfectly !!
... View more
05-10-2018
11:34 AM
Thanks a lot ... really appreciate it. I was struggling to get this done, you saved a lot of my time.
... View more
05-08-2018
11:17 AM
1 Kudo
Thank you @Shu! I already have the processor configured with the URI instead of the EL, but I supposed that I could use a variable to set the value of any property. Now, thank to you, I know for sure that I have to check if it support as value an EL.
... View more
06-06-2018
07:56 PM
@Mahmoud Shash Just wanted to follow-up to see how things are progressing. Still seeing issue? Did you try any of my suggestions above? I see no reason for having your invokeHTTP processor scheduled to execute on primary node only since it is being triggered by incoming FlowFiles. If you switch it to "all nodes", do you still see issue? What do you see when you perform a "List queue" action on the connection feeding your invokeHTTP processor? Within the "Cluster" UI found under the global menu, who is currently elected as the primary node? Does the data listed when you ran "List queue" belong to that same node? - Thank you, Matt
... View more
05-06-2018
05:54 PM
1 Kudo
@adam chui As per the documentation of SelectHiveQL processor states that If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query. FlowFile attribute 'selecthiveql.row.count' indicates how many rows were selected. But you can file a jira addressing the issue found. As a work around to fix this issue You can use PutDistributeCacheMap processor to keep all your attributes in cache server and fetch the attributes from cache server using FetchDistributeCacheMap processor. Sample Flow Example:- i'm using GenerateFlowFile processor and adding 3 attributes to the flowfile attr1
56
attr2
67
attr3
89 ReplaceText Processor:- Search Value
(?s)(^.*$)
Replacement Value
${allAttributes("attr1","attr2","attr3"):join("|")} //i'm using allAttributes and keeping all the attributes with "|" pipe delimiter(output flowfile will be 1|2|3) Maximum Buffer Size
1 MB //needs to change the size if the content is more than 1MB size.
Replacement Strategy
Always Replace
Evaluation Mode
Entire text Use this link to evaluate multiple attributes. UpdateAttribute:- We are using this processor to change the filename of flowfile to UUID because We cannot not refer to use UUID as cache identifier reason is output from SelectHiveQL processor is having same filename but different UUID(i.e until selecthiveq processor flowfile having one uniqueid after selecthiveql processor different uuid).** Add new property as filename ${UUID()} PutDistributeCache processor:- Configure DistributedMapCacheServer,DistributedMapCacheClientService and enable them(you need to change cache number of entriesas per your needs,persistence directory if not mentioned then all the entries will be stored in memory). Cache Entry Identifier
${filename} Now we have changed the flowfile content and cached the output content with the filename. SelectHiveQL processor:- Feed success relation to SelectHiveQL processor once the processor outputs flowfile with content of the flowfile then feed the success relationship to FetchDistributeCacheMap:- Configs:- Cache Entry Identifier
${filename:substringBefore('.')} //because based on output ff format we are going to have .avro/.csv extensions
Distributed Cache Service
DistributedMapCacheClientService
Put Cache Value In Attribute
cache_value //the cached content will be put in to this attribute instead putting into flowfile content
Max Length To Put In Attribute
256 //needs to change the value if max length is more than this value Output:- Flowfile will have attribute called cached.value then you can rebuild all your attributes by using getDelimitedField function ${cached.value:getDelimitedField(1, '|')} //will give attr1 field Even without rebuilding all the attributes again by using above expression language you can directly pull the required attribute value and use them in your flow. I have attached my sample flow.xml below, save and upload the xml to your instance and change the as per your needs. selecthiveql-attributes-188338.xml - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-04-2018
12:21 PM
5 Kudos
Short Description: This Tutorial describes how to use
DeleteHBaseRow processor to delete one or more row keys in HBase table. Thanks alot @Ed Berezitsky helping out to write this article. Introduction: NiFi 1.6 introduced DeleteHbaseRow processor, based on the
row key values provided to this Processor, deletes those row key/s in Hbase
table. This processor does deleteall
operation on the row key that we intended to delete from the hbase table. This Processor works with row key values are presented as
FlowFile Content (or) FlowFile attributes. We can select Row ID
Location in this processor one of the below ways, by default this property
configured to have FlowFile content as the value. 1. FlowFile content -Get the row key(s) from the flowfile
content. 2. FlowFile attributes -Get the row key from an expression
language statement. 2.1.
Having one row key value associated with the flowfile. 2.2. Having
more than one row key values associated with the flowfile. DeleteHBaseRow Processor Params: Client Controller:
Configure and enable HBase_1_1_2_ClientService controller service. Table Name:
HBase Table name supporting Expression Language. Row Identifier:
Row key to be deleted when Row ID Location set to ‘Flow file attribute’. Value
will be ignored if a location set to ‘Content’. Note, it doesn’t support list
of values. Row ID Location:
Source of Row ID(s) to be deleted, either content of attribute. Flow File Fetch
Count: Amount of flow files to be consumed from incoming connection(s)
to be combined in single run. Batch Size:
Max number of deletes to send per batch. Actual Batch size won’t exceed number
of row keys in a content of each flow file. Delete Row Key Separator: Specify Delimiter and supports REGEX,
Expression Language. Character Set: Character set used to encode the row key for
HBase. 1.Delete HBase
Row/s based on Flow File content: Delete Row Key
Separator specifies delimiter for a list of row keys. It could be any
value, including new line character. Example: Flow: Explanation:
Generate Row Key(s) to delete using GenerateFlowFile processor: I have used Generate FlowFile processor with custom text has
all row key/s with comma separated 1,2,3,4,5,6. Then we need to feed flow file to DeleteHBaseRow processor. DeleteHBaseRow
Processor: As we are having comma separated list of row key values as
Flowfile contents, Configure and enable HbaseClientService Configure DeleteHBaseRow processor as following Once the deletion is done the processor will routes the
flowfile(s) to success relation and adds two new attributes to each flowfile (these
write attributes are added only when we are using Row ID Location as flowfile-content). Write Attributes: rowkey.start The first
rowkey in the flowfile. Only written when using the flowfile's content for
the row IDs. rowkey.end The last
rowkey in the flowfile. Only written when using the flowfile's content for
the row IDs. rowkey.start and rowkey.end attributes are added to the
flowfile with first and last values of flowfile content i.e our flowfile
contents are 1,2,3,4,5,6 so rowkey.start value is 1 and rowkey.end value is 6. **Note** If we try to delete a rowkey that doesn’t exist in hbase
table also this processor won’t throw any error message i.e. if we specify 99
value in our flowfile contents as we are not having 99 as rowkey value still
processor doesn’t show any error message. Reference flow.xml for deletehbaserow from flowfile content 1delete-hbase-row-s-based-on-flow-file-content.xml How to Configure
DeleteHbaseRow processor for Other Seperators/Delimiters? With Multi Separator/Delimiter: In this file we are having multi separator as colon and
comma(:,) 1:,2:,3 DeleteHbaseRow
Configs: Keep Delete Row Key separator value as :, With Newline
separator:- Configure Delete Row Key Separator as shift+enter (or) \n 2.Row ID Location FlowFile
Attributes: 2.1:If we are
having one row key value as attribute to the flowfile: If we are having row key to delete from Hbase table as
flowfile attribute then we can use expression language. Explanation: GenerateFlowFile
Configs: Add new properties as tab_name
and row_key attributes with
values delete_hbase_demo and 1 to
the flowfile. DeleteHbaseRow
Configs: Now we can configure DeleteHbaseRow processor with
expression language so that processor gets tab_name and row_key values from the
flowfile attributes and perform deletions dynamically. Reference flow.xml for deletehbaserow from flowfile
attribute 2delete-hbase-row-from-flow-file-attribute.xml 2.2. If
we are having single/multiple row key values as attribute to the flowfile: DeleteHbaseRow processor doesn’t support for comma separated
list of values presented as flowfile attributes. Here is workaround example on
how to delete row keys without changing flow file content. - Using expression language with indexof and ifElse functions loop
through all the list of row_keys values Flow: Explanation: GenerateFlowFile configs: Add new properties as tab_name
and row_key attributes with
values delete_hbase_demo and 1,2,3,4,5
to the flowfile. RouteOnAttribute Configs: Add new property to check row_keys attribute value Null or
empty and auto terminate this empty relationship. Feed the unmatched relationship from RouteOnAttribute
processor to DeleteHBaseRow processor. DeleteHBaseRow Configs: Configure and enable the controller service. Configure DeleteHBaseRow processor as following: Row Identifier property value as ${row_keys:indexOf(','):equals(-1):ifElse('${row_keys}','${row_keys:substringBefore(",")}')} //check the indexof delimiter if equals to -1 then use row_keys(one value in row_keys attribute else use the value before , and delete that row key in hbase table. Fork
the success relationship from DeleteHBaseRow processor Fork1 of Success
relationship:- UpdateAttribute Configs:- Configure the processor as following row_keys property with value as ${row_keys:indexOf(','):equals(-1):ifElse('','${row_keys:substringAfter(",")}')} //if index of “,” equals -1 then‘’(empty value set) else update the row_keys attribute value with substringAfter “,” This loop will continue until all the row_keys values will
be deleted in the Hbase table. Fork2-Success
relationship: Use this relationship for further processing. Reference flow.xml for DeleteHBaseRow having list of
row_keys as flowfile attribute 22delete-list-of-row-keys-as-attribute-values.xml Create and put
data into hbase table:- bash$ hbase shell hbase> create 'delete_hbase_demo','cf' hbase> put 'delete_hbase_demo','1','cf:name','foo'
hbase> put 'delete_hbase_demo','2','cf:name','bar'
hbase> put 'delete_hbase_demo','3','cf:name','foo'
hbase> put 'delete_hbase_demo','4','cf:name','bar'
hbase> put 'delete_hbase_demo','5','cf:name','foo'
hbase> put 'delete_hbase_demo','6','cf:name','bar'
hbase> scan 'delete_hbase_demo'
... View more
05-07-2018
11:09 AM
@B
X
For reducing number of fields and renaming the fields we won't need to use Convert Record processor also, we can acheive by using one UpdateRecord processor, as update record processor expects to add atleast one user-defined properties(like swapping field name...) once we add the one property then we can do reduce or rename the fields. Please see this article as i'm reducing and renaming fields in first update record processor. if you are thinking to just reduce number of fields and not changing any contents then we need to use ConvertRecord processor.
... View more
05-02-2018
03:21 AM
@Paul
Byrum I don't think we can write to c:\ but can you use the below commands to check the files in test directory(is there any setting causing to hide these files?). Open cmd terminal c:\Users>cd .. c:\>cd c:\test c:\>dir then you are going to see all the files in the directory. if you don't see any files still then use PutFile[id=1967bc4d-0163-1000-1af5-b40c46be92b6] Produced copy of StandardFlowFileRecord[uuid=58d18018-a864-4c12-a98d-498bbf8d19d1,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1525145201278-982, container=default, section=982], offset=336986, length=168493],offset=0,name=3706261132832753,size=168493] at location c:/test/3706261132832753 Assuming with above logs to configure GetFile processor and configure the processor with your test directory and file filter as same filename that you have stored into the directory Input Directory C:\test File Filter
3706261132832753 Keep Source File
false
... View more