Created 03-30-2024 09:38 AM
Hi,
I tried to search for some solution but I guess I'm searching for the wrong terms. So far I couldn't find any solution for my problem.
I sucessfully set up a pipeline that reads data from an S3 bucket and uses upsert to put the data into a MySQL database.
The problem that I'm facing now is that sometimes an older update of a record arrives before a newer update.
Is there some possibility for the upsert to compare timestamp and do the update only if the record has a newer timestamp than the one in the database?
Thanks, Stefan
Created 04-01-2024 03:44 AM
@StefanG, Welcome to our community! To help you get the best possible answer, I have tagged in our NiFi experts @MattWho @cotopaul @SAMSAL @ckumar who may be able to assist you further.
Please feel free to provide any additional information or details about your query, and we hope that you will find a satisfactory solution to your question.
Regards,
Vidya Sargur,Created 04-01-2024 06:35 AM
Hi Vidya,
thanks a lot.
I will try to explain my problem a bit more in details.
I'm importing data from an event based system. I'm using ListS3 and FetchS3Object to download parquet files from an AWS S3 bucket.
In this bucket every entity has a separate directory which then is further split up by date of update of the entity. I'm using RouteOnAttribute to then route the data into the corresponding table of a MySQL database.
The parquet files include updated records of the entites but it's not just the changes but the latest state of the entity so that I could ignore previous updates if it happens that I process a newer update before some older ones.
The files on the bucket have some random name. It seems that ListS3 uses some alphabetical order and I also didn't see any way to order the files corresponding to the changed time of the S3 bucket.
Every record contains a unique id of the entity and a timestamp which indicates the last update of the entity. Some of the entities also include a version number that I could use additionally or also instead of the timestamp.
To put the data into the MySQL database so far I'm using PutDatabaseRecord with statement type UPSERT.
My plan was to check for the latest update timestamp that is stored in the MySQL database. If no entry was found perform an insert, if an entry was found and the timestamp is older or the version number is lower than the one that is currently being processed then I would perform an update. If the entry in the database is already newer I would just skip this record.
br, Stefan