Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11273 | 04-15-2020 05:01 PM | |
| 7167 | 10-15-2019 08:12 PM | |
| 3155 | 10-12-2019 08:29 PM | |
| 11612 | 09-21-2019 10:04 AM | |
| 4393 | 09-19-2019 07:11 AM |
05-10-2018
11:40 PM
@adam chui
Sure.. I have created a directory called nifi_test in tmp directory. [bash tmp]$ mkdir nifi_test<br>[bash tmp]$ cd nifi_test/
[bash nifi_test]$ touch test.txt
[bash nifi_test]$ touch test1.txt
[bash nifi_test]$ touch test2.txt
[bash nifi_test]$ ll
total 0
-rw-r--r-- 1 nifi nifi 0 May 10 19:16 test1.txt
-rw-r--r-- 1 nifi nifi 0 May 10 19:16 test2.txt
-rw-r--r-- 1 nifi nifi 0 May 10 19:16 test.txt<br> Make sure nifi having access to pull the files in the directory. Let's assume you are having dynamic generated directory attribute value as /tmp/nifi_test/ in middle of the flow. Now we need to fetch all the files that are in /tmp/nifi_test directory Flow:- GenerateFlowFile configs:- i have added new property as directory
/tmp/nifi_test now i'm having a flowfile with directory attribute with /tmp/nifi_test as a value. ExecuteStreamCommand configs: Now i'm passing directory attribute as command attribute and listing all the files in the directory(/tmp/nifi_test) SplitText configs:- When you are having more than one file in the directory use this processor to split into individual flowfile Change the below property value Line Split Count
1 Extract Text Configs:- we need to dynamically pull all the files from the directory so use extract text processor add new property as filename
(.*) in this processor we are extracting flowfile content and keeping for the filename attribute Now we are having directory and filenames in the directory as attributes now. Fetch File Configs:- In File to Fetch property we are using directory and filename attributes to fetch the file/s from the directory, at the end flow screenshot you can see 3 files got fetched from the directory. By following this way we are able to pull files middle of the flow. I have added my flow.xml save/upload xml to your nifi istance and test it out. fetch-files-189935.xml
... View more
05-10-2018
11:08 PM
2 Kudos
@Abhinav Joshi
Controller service doesn't change parameters based on the flowfile attributes, once the controller service is enabled with some parameter then we are going to have that parameter set to each job that we are executing by using same controller. You can use Jdbc connection string in Hive controller service to add all the parameters that needs to be added by using ;(semicolon) as a delimiter. Example:- jdbc:hive2://<connection string>?tez.queue.name=<queue-name>;hive.support.quoted.identifiers=none in the above connection string i have added two parameters to the connection pool service. (or) Connection pool Connection string will be declared without all the parameters. jdbc:hive2://<connection string> Keep your attributes to the flowfile before PutHiveQl processor use Replace text processor and replace all your parameters with the statement(ddl/dml..) that you are executing using PutHiveQL processor. Example:- While executing the job you can set all the parameters like set tez.queue.name=<queue-name>;<br>set hive.support.quoted.identifiers=none;<br>insert into final table select * from staging table in puthiveql processor we are having Statement Delimiter property value as ; which going to execute all the statements at once and for the session that has been initialized we are going to have all these parameters has been set. For PutHiveStreaming processor supports expression language for Database name,table name.. etc properties so we can change the parameters based on the flowfile attributes.NOTE: If multiple concurrent tasks are configured for this processor, only one table can be written to at any time by a single thread. Additional tasks intending to write to the same table will wait for the current task to finish writing to the table. For more information please take a look into this jira addressing about dynamic properties for Connection pool.
... View more
05-10-2018
12:51 PM
2 Kudos
@aman
mittal
You can use RouteOnAttribute Processor to route on the required tables into AzureBlobStorage ..etc. Below is the sample flow for demonstration In this flow we are doing listing all the tables and Generating sql queries that fetches pages then ExecutingSql queries that are generated by GenerateTableFetch Processor. As you need to load all the tables to GoogleCloud so fork the Success relation from ExecuteSql, in this step we are storing all the tables into Google Cloud. In another fork from ExecuteSql processor feed the Success relationship to RouteOnAttribute Processor and add new property as AzureBlob Tables
${db.table.name:equals("tablename1","tablename2")} //db.table.name attribute is added from List database table processor (or) ${db.table.name:toLower():equals("tablename1","tablename2")} //table names are not case sensitive in this EL Use your attribute that having table name in the place of (db.table.name attribute) and Add all the required tables for azure in the above list. Then use the same property name relationship to feed to Azure Blob Storage processor. - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-10-2018
12:21 PM
3 Kudos
@adam chui
If you are having fully qualified filename with the directory information in your flow then you can use Fetch File Processor as this processor accepts incoming connection and pass the attributes(directory/filename) in File to Fetch Property to pull the File into the flow. If you are not having fully qualified filename then we need to list all the files in the directory by using ExecuteStreamCommand processor by passing the dynamic generated directory name as an argument to list all the files in the directory then using Fetch File processor you can pull the required files into data flow. Please refer to this link i have explained how to use ExecuteStreamCommand processor to list all the files in the directory,in addition to filter only the required filenames you can use RouteOnAttribute Processor before FetchFile Processor. - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-09-2018
11:07 PM
@Winnie Philip Make sure in Update Record processor Record Reader controller service having prod_desc column name is defined in the avro schema/registry. if the Record Reader is not having prod_desc column name is defined then Reader not able to read the incoming json message for prod_desc column, which will result writer with null value for prod_desc column value. Instead of using two update record processors you can use the below dynamic property value substringBefore(substringAfter( /prod_desc, '=' ),'}') Update record processor configs: Sample Record Reader for update record processor: Avro Schema with prod_desc column in it {
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "created_at", "type": ["null","string"] },
{ "name": "id_store", "type": ["null","int"] },
-----
-----
{ "name": "prod_desc", "type": ["null","string" ]}]} - If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-09-2018
11:09 AM
@Naeem Ullah Khan Change the replace text Replacement Strategy to Regex Replace because you are using "|"(or operator) to search for all the possible instances in your input csv file by using Evaluation mode as Line-By-Line and change the Replacement value property to empty string. . Always Replace will replace the entire flowfile contents based on Evaluation mode property <value> without searching for any value. ReplaceText Configs:- Replacement Value as empty string Search Value (,Minutes.*\n|CellID|=|"|LogicRNCID|CGI|CellIndex|GCELL:LABEL)
Replacement Value Empty string set //we are searching for all the values above and replacing them with empty string Replacement Strategy Regex Replace //interprets search value as regex and replaces matched value with replacement values
Evaluation Mode Line-by-Line //Run the 'Replacement Strategy' against each line separately (Line-by-Line) - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-09-2018
10:44 AM
1 Kudo
@Prakhar Agrawal In your flow status you are having "stoppedCount":9918,"invalidCount":3239 i.e almost 13k processors are not running now(either stopped or invalid). Even these processors are not running now but NiFi will validate them every time to check the status of the processors until we manually disable those processors.Disable all the stopped/invalid processors in your NiFi instance. How to disable 13k processors? Method 1:- Changing in flow.xml.gz 1.Stop the NiFi instance(to make sure no new processors are adding at the time of changing flow.xml) 2.take a backup of flow.xml.gz 3.guzip flow.xml.gz 4.Search for <scheduledState>STOPPED</scheduledState> in flow.xml and replace with <scheduledState>DISABLED</scheduledState> 5.gzip flow.xml 6. Then you need to replicate the same flow.xml.gz to all other nodes of your HDF cluster (or) just rename the flow.xml.gz to some other names like bkp_flow.xml.gz_<time>. 7.Start NiFi (or) Method2:- Create a flow in NiFi instance in this way you don't need to stop/start the NiFi instance 1. Read flow.xml.gz 2. Parse XML to find stopped processors and their IDs (as per above) 3. Use NiFi rest API to change a state to disabled. - For more details refer this article to Improving performance of NiFi UI
... View more
05-08-2018
10:11 PM
@vivek jain Use SplitContent processor with below configs Byte Sequence Format
Text
Byte Sequence } } this sequence needs to match exactly with your json message (i.e in your question you are having 1 tab for first curly braces } and in new line you are having another curly braces). Keep Byte Sequence true //this determines whether you need to add the byte sequence or not
Byte Sequence Location
Trailing //if keep byte sequence set to true this property adds the sequence to the end. So from this processor you are able to split the shown message into 2 individual messages. Output flowfiles from splits relationship:- ff1:- {
"version": "1",
"source": {
"type": "external",
"id": "456"
}
} ff2:- {
"version": "1",
"source": {
"type": "internal",
"id": "123"
}
} In addition if you want to make these individual messages into valid json array of messages Then you can use Merge content processor with defragment as merge strategy and change the below properties Delimiter Strategy Text
Header
[
Footer
]
Demarcator
, In this processor we are again merging all the individual json messages into one json array with comma as demarcator. Output flowfile:- [{
"version": "1",
"source": {
"type": "internal",
"id": "123"
}
},
{
"version": "1",
"source": {
"type": "external",
"id": "456"
}
}] Now you can use record based processors(convert record..etc) processors to work on chunks of data instead of each message at a time. - If the Answer addressed your question, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
05-08-2018
10:53 AM
1 Kudo
@Manuel Carnerero
The error is because of Broker URI property value doesn't accept expression language, so when you have use ${BrokerURI} it's an expression language, so the processor is complaining about not valid URI syntax. Configure the processor with the broker value in it instead of using variable name and the processor will not complain about the syntax.
... View more
05-07-2018
06:05 PM
@srini use this ddl Create EXTERNAL TABLE SRGMSBI1417.json14_07_05_01(
purchaseid struct
<ticid:string,ticlocation:string,custnum :string,
Travleinfo:struct
<Trav:struct
<fname:string,lname:string,mname:string,
freq:struct
<fre:struct
<frequencynumber:string,frequnecystatus:string>>,
food :struct
<foodpref:array<struct<foodcode:string,foodcodeSegment:string>>>,
Seats :struct<Seat:array<struct<seatberth:string,loc:string>>>, stations :struct<station:array<struct<sationname:string,trainnum:string>>>>>,
Comments :struct<Comment:array<struct<commentno:string,desc:string,passengerseat :struct<intele :string>,passengerloc :struct<intele :string>>>>>)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
location '/user/srgmsbi1417/json14_07_05_01'; And don't keep array type for fre because it's struct not the array, that's the reason why you are getting exceptions.
... View more