Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Upserts using Nifi ( updates + Inserts)

Solved Go to solution

Upserts using Nifi ( updates + Inserts)

Explorer

Hi @Shu

I followed your tutorial here: https://community.hortonworks.com/articles/191760/create-dynamic-partitions-based-on-flowfile-conten...

I'm stuck with the error attached .

Also , Is it possible to do update+insert in the flow as mentioned the URL above using PutHiveQL. Am able to execute Merge option in Hiveserver2. However due to large amount of data Merge is not working for huge dataset

I'm checking using the URL above , can I achieve the below use case nifi-predicateerror.jpg

  • when matched record found on the final table then define which action you need to take either Update (or) Delete
  • if the record not matched in the final dataset then insert the record.
1 ACCEPTED SOLUTION

Accepted Solutions

Re: Upserts using Nifi ( updates + Inserts)

Super Guru

@Raj ji

Merge functionality i.e. if matched then Update/Delete, If not then insert is very efficient if you are running from Hive.

We cannot run Update/Delete/Insert statements from NiFi to Hive.

in my article i'm creating partitions in NiFi using Partition Record processor and storing them in HDFS directly then By using PutHiveQL processor i'm executing Msck Repair table to add newly created partitions in HDFS directory to Hive table.

Try with some different ways using Merge strategy:

1.Create your source table as partitioned then target table also partitioned then use the partitioned column in your Merge statement, as this will boost your hive job to run more efficiently. Set the auto.convert.join property value to false.

hive> set Hive.auto.convert.join = false;

(or)
2. Create smaller chunks of files in HDFS instead of one big file then when you are running merge use

MERGE INTO merge_data.transactions AS T USING 
(select * from source_table where input__file__name="<HDFS-file-path>") AS S

By performing the above input__file__name predicate hive merge will be performed only on the particular file not on the full data set of the table.
If you are following this way first you need to extract all distinct filenames from the source table then run merge on each individual file at once.

(or)

3.Storing all the data into target table with some timestamp then while processing the data get only the latest record by using `window function.`
(or)

4. Store all the data to target table then recreate(by performing update/delete/insert) the table every day once your ingestion is completed.

5 REPLIES 5

Re: Upserts using Nifi ( updates + Inserts)

Super Guru

@Raj ji

Merge functionality i.e. if matched then Update/Delete, If not then insert is very efficient if you are running from Hive.

We cannot run Update/Delete/Insert statements from NiFi to Hive.

in my article i'm creating partitions in NiFi using Partition Record processor and storing them in HDFS directly then By using PutHiveQL processor i'm executing Msck Repair table to add newly created partitions in HDFS directory to Hive table.

Try with some different ways using Merge strategy:

1.Create your source table as partitioned then target table also partitioned then use the partitioned column in your Merge statement, as this will boost your hive job to run more efficiently. Set the auto.convert.join property value to false.

hive> set Hive.auto.convert.join = false;

(or)
2. Create smaller chunks of files in HDFS instead of one big file then when you are running merge use

MERGE INTO merge_data.transactions AS T USING 
(select * from source_table where input__file__name="<HDFS-file-path>") AS S

By performing the above input__file__name predicate hive merge will be performed only on the particular file not on the full data set of the table.
If you are following this way first you need to extract all distinct filenames from the source table then run merge on each individual file at once.

(or)

3.Storing all the data into target table with some timestamp then while processing the data get only the latest record by using `window function.`
(or)

4. Store all the data to target table then recreate(by performing update/delete/insert) the table every day once your ingestion is completed.

Re: Upserts using Nifi ( updates + Inserts)

Explorer

Hi @Shu

I did try your steps , i have few queries

Answer 1:

I cannot insert the partition key if the source and target tables are partitioned

Updating partition not supported --> Error: Error while compiling statement: FAILED: SemanticException [Error 10292]: Updating values of partition columns is not supported (state=42000,code=10292)

Answer2:

1.Raw data is in JSON and landing to HDFS

2.For Merging we are converting the data into ORC,transaction, bucketed

3. With ORC, How I can use Input_file_name . It is not possible to merge with JSON raw data files right .

Also , Could you please highlight on point 3&4 .

Re: Upserts using Nifi ( updates + Inserts)

Super Guru

@Raj ji

Sorry to hear that :(

Q1?

i mean to say you can create temp table with some partition column and final table is also partitioned by the same column.

then in your merge statement use this T.<partition_column>=T.<partition_column> statement.

MERGE INTO merge_data.transactions AS T 
USING merge_data.merge_source AS S
ON T.ID = S.ID and T.tran_date = S.tran_date and T.<partition_column>=T.<partition_column>
WHEN MATCHED...

By using this way we are not updating the partition field value instead we are using partition column in ON clause and updating the partition column value will not be possible once the partition created.

Refer to this link for some more details regards to this question.

Q2?

For Raw data table while selecting the data from the table use INPUT__FILE__NAME which is hive internal column by using this way you are not performing the join on full data(in hdfs directory) instead performing merge on single file from the hdfs directory.

Refer to this link for more details regards to INPUT__FILE__NAME usage in hive.

Yes we are able to use json table for merge also.


Highlighted

Re: Upserts using Nifi ( updates + Inserts)

New Contributor

I got errors if the input contains duplicated records 

Re: Upserts using Nifi ( updates + Inserts)

Explorer

Accepeted the answer for his answer . But didn't work out well.:(

Don't have an account?
Coming from Hortonworks? Activate your account here