Member since
06-04-2024
8
Posts
11
Kudos Received
0
Solutions
08-08-2024
12:34 AM
I have a mongo aggregation query which runs in "RunMongoAggregation" Processor. from using that BULK RESULT, inserted to the mongo collection. For the insertion I'm using "PutMongoRecord" in NIFI 1.25.0 at once. But the issue is this, I need to run this mongo aggregation query in time to time. When it runs in time to time, data will duplicated. It has a Unique Id call "PortfolioTransactionID". here is the sample json, { "PortfolioBaseID": "371", "PortfolioBaseIDOrder": 37539, "PortfolioTransactionID": 1416807, "RecallableDistributedGainLocal": null, "SecTypeCode1": "mf", "SecTypeCode2": "ca", "SplitFactor": 1, "SpotRate": null, "ThruDate": "2016-12-28", "TradeAmount": "5000000.0", "TradeAmountLocal": 5000000.0 } How do I avoid that data duplication using "PortfolioTransactionID" unique key while insert data as bulk.. I'm using NiFi 1.25.0 Thanks.
... View more
Labels:
- Labels:
-
Apache NiFi
06-14-2024
02:12 AM
1 Kudo
Hi @Thar11027 I stand corrected. Well, lets be more specific and you can't get more specific than looking the code itself in github :). It turns out the PutDatabaseRecord uses a DatabaseAdapter which is an interface type that gets implement by each Database Engine Type and passed through the DB service associated with this processor (DBCPConnectionPool). Those adapters are responsible for generating the SQL for each statement type (insert, update, delete....). For MySql there is an adapter called MySQLDatabaseAdapter and if you look at the genereateUpsertStatement method you will find that it uses the following syntax: StringBuilder statementStringBuilder = new StringBuilder("INSERT INTO ")
.append(table)
.append("(").append(columns).append(")")
.append(" VALUES ")
.append("(").append(parameterizedInsertValues).append(")")
.append(" ON DUPLICATE KEY UPDATE ")
.append(parameterizedUpdateValues);
return statementStringBuilder.toString(); Notice the use of "ON DUPLICATE KEY UPDATE" syntax. If you look for what that means in MySQL (https://blog.devart.com/mysql-upsert.html ) you will find that yes it will check if the record key exists or not , and if it does then it will do an update state however that only works on table Primary Key. In your case for the Transaction tale it works because as you mentioned the transaction_id is the primary key and you probably passing this column as part of the record data, however for the other table the id set to auto increment and probably you are not passing it as part of the record and instead relying on none primary key id_from_core. Not sure if its possible to change your table where this column is your primary key, otherwise you will find yourself having to do lookup to find if it exists or not and may be get the id then do your upsert with the id but Im not sure how this will work with Auto Increment being set. Another option which I tend to do in my case to avoid adding more processors\control services is to create stored proc that will defer all that checking for update or insert to sql then use PutSQL processor to execute the stored proc passing all columns to it but this can be cumbersome if you have so many columns which seem to be your case. What you can do to avoid passing each column is pass record as json string and do json parsing to find the column values in mySQL. Hope that helps.
... View more
06-12-2024
06:47 AM
Hi , Just a word of advice so you get better luck with your posts getting noticed and possibly someone to provide you with possible resolution: If you can shorten your json input next time to be isolated only to the problem that would be more helpful. You dont have to post the 99% that works with the 1% that doesnt as long as it doesnt affect the overall structure. For example if you have 50 fields you can just post 1-2 fields and if you have an array of 50 elements , 1-2 elements should be enough. Going to your problem , if you know that you will only have two amounts all the times as you specified, then you can intercept them in the first shift spec and assign the proper field names (Amount1, Amount2 ) as follows: [
{
"operation": "shift",
"spec": {
"*": {
"CustomFields": {
"*": {
"ViewName": {
"Amount": {
"@(2,Value)": "[&5].Amount1"
},
"*": {
"@(2,Value)": "[&5].&1"
}
}
}
},
"PortfolioSharing": {
"*": {
"@(0,PortfolioId)": "[&3].sharing_PortfolioId",
"@(0,CustomerId)": "[&3].sharing_CustomerId",
"@(0,PersonalId)": "[&3].sharing_PersonalId",
"@(0,ReasonId)": "[&3].sharing_ReasonId",
"@(0,ReasonProgId)": "[&3].sharing_ReasonProgId",
"@(0,ReasonName)": "[&3].sharing_ReasonName",
"@(0,Comment)": "[&3].sharing_Comment",
"@(0,TypeId)": "[&3].sharing_TypeId",
"@(0,TypeProgId)": "[&3].sharing_TypeProgId",
"@(0,TypeName)": "[&3].sharing_TypeName"
}
},
"Amount": "[&1].Amount2",
"*": "[&1].&"
}
}
}
,
{
"operation": "modify-default-beta",
"spec": {
"*": {
// trx_customer
"ResidentNonresident": "@(1,Resident/Non-resident)",
"NationalityCountryofIncorporation": "@(1,Nationality/CountryofIncorporation)",
"PermanantTownCity": "@(1,PermanantTown/City)",
"SubsidiaryAssociateofanotherorganization": "@(1,Subsidiary/Associateofanotherorganization)",
"Howdidyougettoknowaboutus": "@(1,Howdidyougettoknowaboutus?)",
// trx_portfolio
"UseBankAccountFromCustomer": "@(1,UseBankAccountFromCustomer?)"
}
}
},
{
"operation": "remove",
"spec": {
"*": {
// trx_customer
"Resident/Non-resident": "",
"Nationality/CountryofIncorporation": "",
"PermanantTown/City": "",
"Subsidiary/Associateofanotherorganization": "",
"NatureOf_Business": "",
"Howdidyougettoknowaboutus?": "",
//trx_portfolio
"UseBankAccountFromCustomer?": ""
}
}
} /**/
] Hope that solve your problem. If you found this is helpful, please accept the solution. Thanks
... View more
06-06-2024
02:44 AM
1 Kudo
@Thar11027 yes, but it will be more complex: you can put a shift operation and using the "*" wildcard. So if for example you do this: [
{
"operation": "shift",
"spec": {
"*": {
"*/*": "[&1].&", //with a '/' in the middle somewhere
"/*":"[&1].&", //with a '/' at the start followed by something
"*/":"[&1].&" //with a '/' at the end following something
}
}
}
] You will obtain all the desired fields, so it will be more easy to do the manual substitution(try it on the jolt demo site). Know that if you want to make it full automated, it will be a little more difficult, because then you would have to manipulate the string of the field. If you are interested in that I really suggest you to look at the last example on the guide I already sent you (My Guide). (I don't know if maybe it will be more appropriate to open another question about this other problem, because the topic changed and maybe if someone with the same problem is searching for it, it can be found)
... View more