Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11280 | 04-15-2020 05:01 PM | |
| 7175 | 10-15-2019 08:12 PM | |
| 3160 | 10-12-2019 08:29 PM | |
| 11619 | 09-21-2019 10:04 AM | |
| 4395 | 09-19-2019 07:11 AM |
04-08-2018
12:56 PM
2 Kudos
@Kunal Gaikwad Use replace text processor with Literal Replace as Replacement strategy, so that you can search for ORDER BY asc in your flowfile content and replace with ORDER BY newid() asc Replace Text Configs:- Search Value ORDER BY asc
Replacement Value
ORDER BY newid() asc
Character Set
UTF-8
Maximum Buffer Size
1 MB
Replacement Strategy
Literal Replace
Evaluation Mode
Entire text Input Flowfile content:- SELECT * FROM (SELECT TOP 50000 *, ROW_NUMBER() OVER(ORDER BY asc) rnum FROM ABCD.dbo.DEFG) A WHERE rnum > 0 AND rnum <= 50000 OutputFlowfile Content:- SELECT * FROM (SELECT TOP 50000 *, ROW_NUMBER() OVER(ORDER BY newid() asc) rnum FROM ABCD.dbo.DEFG) A WHERE rnum > 0 AND rnum <= 50000 In output flowfile content we have replace ORDER BY asc with ORDER BY newid() asc . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-06-2018
06:20 PM
@Rui Pinheiro Please see my answer in comments..!!
... View more
04-06-2018
06:19 PM
@Rui Pinheiro For this to be done in the Hive table must have a timestamp for the moment that was created every single row right? Yes,if your hive table having a timestamp field then the process is pretty easy to get the last state and store the last state. (or) If your hive table won't have a timestamp field then we need to have some increase number column value(like id column value with always increases in number like row number) in the dataset. After selectHiveQL processor fork the result set and keep one fork for processing and another fork to QueryRecord processor add max row query property in the QueryRecord processor as max_row
select * from FLOWFILE where id=(select MAX(id) from FLOWFILE) Now we are selecting max id value from flowfile content and extract the id value then store that value as your state. QueryRecord processor references:- https://community.hortonworks.com/articles/121794/running-sql-on-flowfiles-using-queryrecord-process.html In addition you can look into Wait and Notify processors to make sure once you store the state then only start processing the first forked dataset. Wait and Notify processors references:- http://ijokarumawak.github.io/nifi/2017/02/02/nifi-notify-batch/ (or) if you don't have any column value which gets increasing number then you need to look into Hive Pagination i.e SELECT *,ROW_NUMBER over (Order by id) as rowid FROM mytable
where rowid > 0 and rowid <=20 then store the max rowid value in your state in this case i.e 20, for the next run we are going to have 20 as base value and add how many records you want to add to the base value, use update attribute processor plus function to add number for the upper limit. 3.ExtractText (Extract the value got stored and keep as start_value) the value that I'm going to copy to start_value come from the current time? for the first run we can define start_Value i.e check if the start value is presented(then leave as is) or not(assign default value like 1900-01-01 00:00:00), now start_value is 1900-01-01 00:00:00 and current_time is 2018-04-06 12:00:00. Then we are storing our current_time(i.e 2018-04-06 12:00:00) into HDFS/Distributecache ..etc For the next run we are pulling the stored state(i.e 2018-04-06 12:00:00) and assign the value as start_value.
... View more
04-06-2018
12:50 PM
@umang s
if you are having url as attribute to the flowfile then you can use update attribute processor with replaceFirst function to replace 5 characters with empty string. Example:- I'm having url attribute to the flowfile with value as https://community.hortonworks.com/questions/182384/how-to-remove-just-only-5-characters-from-url.html?childToView=183771#answer-183771 Now i'm using update attribute processor and added new property in update attribute processor url
${url:replaceFirst('\w{5}', '')} (or) ${url:replaceAll('^.{0,5}', '')} Output Flowfile Attribute value:- Now we have removed first 5 characters in url attribute value by using update attribute processor.
... View more
04-06-2018
12:37 PM
1 Kudo
@umang s
Try with below configurations of Replace Text processor:- Search Value ^.{0,5} Replacement Value Empty string set Character Set UTF-8 Maximum Buffer Size 1 MB Replacement Strategy Regex Replace Evaluation Mode Entire text
Input Flowfile Content:- https://community.hortonworks.com/questions/182384/how-to-remove-just-only-5-characters-from-url.html Output Flowfile Content:- we are removing first 5 characters and replacing them with empty string, as you can see below we have removed https(first 5 characters). ://community.hortonworks.com/questions/182384/how-to-remove-just-only-5-characters-from-url.html . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-06-2018
02:36 AM
@Jay Kumar SenSharma Thank you, I'm glad you enjoyed it. 🙂 I’d be more than happy to help..!!
... View more
04-06-2018
02:01 AM
2 Kudos
@Rui Pinheiro
The data duplication is because of SelectHiveQL processor won't store the state. So every time when you execute the hive query it will result same data and adding newly added records(if records got added to the table) every time that's the reason why you are getting duplicated data. For this use case we need to store the last value some where in Hive/Hbase/HDFS/DistributeCache, then when you run SelectHiveQL statement you need to pull the state value and keep the state value as attribute, then use the value in your SelectHiveQL statement. Example:- let's take i'm having a table with following columns in it id(int),name(string),ts(timestamp) and i want to run SelectHiveQL process incrementally My hive statement would be like below select * from table-name where ts > '${start_value}' and ts < '${current_time}' We need to prepare start_value and current_time attributes before selecting hive data. Flow:- 1.GenerateFlowfile //add current_time attribute to the flowfile,Right Click on processor goto configure then click on + sign and add current_time value as ${now():format("yyyy-MM-dd HH:mm:ss"}
2.SelectHiveQL/FetchHbaserow/FetchHdfs/FetchDistributecache processors depending on where you have stored your state value
3.ExtractText/EvaluateJsonPath/UpdateAttribute processors //Extract the value got stored and keep as start_value attribute, in this step we are preparing start_value attribute and this start_value will be base value.
4.UpdateAttribute processor //to check the start_value attribute for the first run value will be empty assign some value to it(like 1900-01-01 00:00:00..etc).
4.selectHiveQL processor //select * from table-name where ts>'${start_value}' and ts <${current_time}, as we are having start_value and current_time attributes to the flowfile now we are running hive statements using those attribute values.
5.Fork the data set <br> 5.1. do your processing with the incremental hive dataset
5.2. store the current_time attribute value by using PutHiveStreaming/PutHbaseCell/PutHDFS/PutDistributeCa cheMap Processors //once you store the current_time value then when your process starts again will pul l this state value and this will be your start_value. . If the Answer helped to resolve your issue, Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
04-04-2018
12:31 PM
1 Kudo
@Yassine
Do alter table on all tables and change the external table to internal table then drop the table. Example:- Write a script which can execute below statement for all the tables that are in warehouse directory hive> ALTER TABLE <table-name> SET TBLPROPERTIES('EXTERNAL'='False'); //changing the tbl properties to to make the table as internal
hive> drop table <table-name>; //now the table is internal if you drop the table data will be dropped automatically.
... View more
04-04-2018
12:21 PM
1 Kudo
@Rubén Carmona
Run Join on Source side:- You can use Execute Sql processor and run sql join on two tables on clientID, Select all the required columns that you need on the result set then store the results into HDFS and create hive table on top it. (or) Based on the clientid that we got from first table:- 1.Use QueryDatabaseTable/GenerateTableFetch+executeSql processors to pull first table data having clientID,clientName as we are going to get the output data in avro format.
2.Use ConvertAvroToJson processor //now we are going to have json array having all the records in it.
3.SplitJson processor //to split each record into individual flowfileEvaluateJson Path processor //extract the required fields from the json message and keep them as flowfile attributes.
4.ExecuteSql //use the extracted attributes to query the second table select * from tablename where clientid=${clientID}(assuming attributename is clientID in evaluatejsonpath processor)Now we have queried the second table based on clientid,now again convert the output from execute sql processor5.Use ConvertAvroToJson processor //now we are going to have json array having all the records in it.6.SplitJson processor //to split each record into individual flowfile.7.EvaluateJson Path processor //extract the required fields from the json message and keep them as flowfile attributes.8.AttributesTojson processor //to build new json message with all the required attributes.9.inferred avro schema processor //to build avro schema to the json messageConvert JSONtoAvro processor.10.MergeContent processor //use merge strategy as Avro,if you want to merge small flowfiles into one big flowfile11.ConvertAvroToORC processor //convert the merged avro content to orc format.12.PutHDFS processor //store the data into HDFS,create hive table on top of hdfs directory. (or) Run Join using Hive:- 1.ingest two tables into HDFS using NiFi, Then create hive tables on top them.
2.once ingestion is completed then Run join on hive tables using PutHiveQl processor and write results to new table(i.e insert into final table select * from table1 join table2 on table1.clientid=table2.clientid) if we are querying based on the clientid that we got from the first table that process will include lot of processors in NiFi, it's easy to run join on both tables either on source side (or) hadoop side.
... View more
04-03-2018
09:54 PM
@Omer Shahzad Check data before storing into Staging area:- You can use Validate Record processor with Strict type checking property to true, to check the primary key columns data before storing the data into Staging area. Please refer to below link for the usage of validate record processor https://community.hortonworks.com/articles/147198/using-apache-nifi-to-validate-records-adhere-to-a.html https://community.hortonworks.com/articles/147200/using-apache-nifi-to-validate-that-records-adhere.html (or) Check data before moving from staging area to next stage:- in our hive statements we can add filter to check the primary key columns are not null and length(col-name) !=0(not equals to zero) before moving the data to next stage Ex:- insert into target_db_name.target_table_name1 partition(dt)select*from source_db_name.tablename1 where dt='partition_value' and <pk-col-name> is not null and length(<pk-col-name>) !=0 by using above hive statement we are going to ingest only the data that is not null from the source db to target db.
... View more