Member since
06-08-2017
1049
Posts
518
Kudos Received
312
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 11257 | 04-15-2020 05:01 PM | |
| 7160 | 10-15-2019 08:12 PM | |
| 3139 | 10-12-2019 08:29 PM | |
| 11588 | 09-21-2019 10:04 AM | |
| 4372 | 09-19-2019 07:11 AM |
06-09-2018
10:21 PM
@Samant Thakur Check which version of curl are you using if it is older version then update to new version and run the curl again. Some useful links regarding are as follows link1 link2
... View more
06-09-2018
10:14 PM
1 Kudo
@Alex Witte To add row column use order by clause, by using this clause we are going to order the whole data set with in one reducer and assigns row_number value. Now you will get the correct ordered data set with row column added. select row_number() over (order by col1) as rowid,* from source_table1;
... View more
06-09-2018
09:49 PM
@Samant Thakur Try without keeping space between Content-type: text/xml curl http://localhost:8886/solr/update?commit=true -H "Content-type:text/xml"--data-binary '<delete><query>*:*</query></delete>'
... View more
06-08-2018
08:45 PM
1 Kudo
@Raj ji We cannot load the sample table that you have shared incrementally because let's take you have used QueryDatabaseTable and given max value column as id then processor will pulls(id's 1,2,3) in the first run and stores the state as 3 after the first run. Second run processor pulls only the records that are greater than id>3 now we are not able to pull 1,3 records as they are updated ,Processor will pull only id=4 and updates the state to 4. 1.How can we pull data incrementally from RDBMS table then? Let's consider your table having one more extra column i.e update_at(timestamp type) this field will be updated with system timestamp when there is new records inserted/created in the table (or) records got updated in the table. Initially you are having 3 records that are created at 2017-01-01 12:00:00.00 and then id's 1,3 updated and 4 created with new updated_at timestamp i.e 2018-06-08 12:00:00.00. Now you have configured QueryDatabaseTable processor with max value column as updated_at so first run processor pulls all the id's(1,2,3) and updates the state as 2017-01-01 12:00:00.00 next run processor checks for any updated values after 2017-01-01 12:00:00.00 if it founds then pulls all the records and updates the state to 2018-06-08 12:00:00.00(in our case). By using this table structure we are going to capture all the updates that are happening on the source table to Hive table. 2.On the second load , We have to remove the values 3 and update the value 1 and insert the value 4 . How that is possible in Nifi . Is that at possible ? Update/Deletes in Hive through NiFi is not yet possible(hive natively don't support update/deletes) every thing will be appends to the hive table. Approach1: Look into Hive Acid Merge strategy described in this link will best fit for your needs.
In this article describes merge strategy
when matched record found on the final table then define which action you need to take either Update (or) Delete
if the record not matched in the final dataset then insert the record. To use this merge strategy your source table needs to be designed in a way that you can capture all the updates that are happening in the table. The final table in hive needs to be Acid enabled table to support merge functionality and Make sure your hive version supports this merge strategy. Approach2: Capture all the incremental data to hive table then use window function to get only the latest record from the table It's always best practice to keep ingestion_timestamp field to each record that is storing into Hive..etc. So while retrieving the data from Hive use window function using Row_Number() to get most recent record by using where row_number=1 . Use predicate push downs while selecting data so that we are minimizing the data that would go to this window functions. By following this way if we are already having duplicated data also we are going to get only the most recent record. Even you can run some dedupe kind of jobs(using row_number) on hive table like once a week which is going to clean up the table that can eliminate duplicates in the table. References regarding implementation of row_number function https://community.hortonworks.com/questions/58405/how-to-get-the-row-number-for-particular-values-fr.html Approach3: If you enabled Acid transactions to the hive table also we need to prepare hive statements(i.e. need to know how to identify which record is updated/inserted/deleted) i.e update table <table-name>... to update the record and to delete the record prepare statement to delete the record in NiFi and execute the statement/s using PutHiveQL processor. If the table having million records and we are trying to update/delete one record,This process will be very intense and resource consuming as for each record we are doing insert/update/delete DML statements. - Click on Accept button below, if you feel like the answer addressed your question..!!
... View more
06-08-2018
12:13 PM
@Satya Nittala Thanks for the sample records..!! One thing i commonly observed pattern is second from the last / after you need that to be in partitionfields column
before second from last / needs to be in location column.
Split based on = will not give the desired results but split on rownum= will satisfies. Using Regexp_extract function: 1.using rownum= matching and extracting hive> select regexp_extract('hdfs://servername/sds/sds/erg/rownum=123/columnnumber=456',"(.*)(rownum=.*)",1) Location,regexp_extract('hdfs://servername/sds/sds/erg/rownum=123/columnnumber=456',"(.*)(rownum=.*)",2) partitionfields;
+---------------------------------+------------------------------+--+
| location | partitionfields |
+---------------------------------+------------------------------+--+
| hdfs://servername/sds/sds/erg/ | rownum=123/columnnumber=456 |
+---------------------------------+------------------------------+--+
<br> 2.Using forward slashes and extracting hive> select regexp_extract('hdfs://servername/sds/sds/erg/rownum=123/columnnumber=456',"(.*(?:\\/))(.*(?:\\/).*)",1) Location,regexp_extract('hdfs://servername/sds/sds/erg/rownum=123/columnnumber=456',"(.*(?:\\/))(.*(?:\\/).*)",2) partitionfields;
+---------------------------------+------------------------------+--+
| location | partitionfields |
+---------------------------------+------------------------------+--+
| hdfs://servername/sds/sds/erg/ | rownum=123/columnnumber=456 |
+---------------------------------+------------------------------+--+
Using Split Function: 1.Split the string based on rownum= as we are doing split on rownum= so we are going to miss the rownum= in partitionfields data we need to concat rownum= to get the correct data in partitionfields column. hive> select split('hdfs://servername/sds/sds/erg/rownum=123/columnnumber=456',"rownum=")[0] Location,concat('rownum=',split('hdfs://servername/sds/sds/erg/rownum=123/columnnumber=456',"rownum=")[1]) partitionfields;
+---------------------------------+------------------------------+--+
| location | partitionfields |
+---------------------------------+------------------------------+--+
| hdfs://servername/sds/sds/erg/ | rownum=123/columnnumber=456 |
+---------------------------------+------------------------------+--+
Use the any of the above functions which best fits for your case if you still want to split on = then tweak the above regexp_extract (or) split functions to get the desired results.
... View more
06-07-2018
10:07 PM
1 Kudo
@Raj ji Try without file:// in your Database Driver Location(s) value i.e /tmp/mysqljar/mysql-connector-java-5.1.46.jar Check permissions and driver is on all the NiFi cluster nodes. - Is it possible to insert updated values + New values in Mysql to Hive? Yes, it's possible only if you are having some sort of identifier field that you could tell it's an updated record in the source table. Usually when record gets updated/created in the RDBMS tables, add current timestamp to the record to the record then in NiFi using QueryDatabaseTable processor Maximum-value Columns property with the timestamp field then the processor will store the state and pulls only the incremental records(updated records+ New records).
... View more
06-07-2018
12:39 PM
@Satya Nittala Please add some sample records with different string lengths, so that we can prepare the query that satisfies them.
... View more
06-07-2018
11:25 AM
1 Kudo
@aman
mittal
Great, Good to know that..!! Other way of doing by checking the length of fragment.index attribute then using nested ifelse statements to determine the prepend by 00,0. but the expression will become complex using Advanced property will be good approach. If the Answer addressed your question, Take a moment to Click on Accept button below to accept the answer, That would be great help to Community users to find solution quickly for these kind of issues.
... View more
06-07-2018
10:52 AM
1 Kudo
@aman
mittal
For this case use IfElse function to check is the value exist for fragment.index attribute if exists then then use the same value of fragement.index if not exist(i.e filesize is less than 2GB) keep your default value 1. UpdateAttributeConfigs: new_attribute
${tab_name}_${fragment.index:isEmpty():ifElse('1','${fragment.index}')} Refer to this link for more details regarding ifelse function of NiFi expression language.
... View more
06-07-2018
10:46 AM
@Vladislav Shcherbakov It depends on how the file is been stored on FTP server side if the file is in CSV format then FetchFTP processor pulls the file in CSV format. Fetching of files won't change the format of the file in NiFi. If the file is in CSV format on source server side then in PutDatabaseRecord processor you need to configure/enable CSV Reader processor so that processor reads the incoming data and executes sql statements based on the SqlType property value.
... View more