Member since
07-29-2020
530
Posts
272
Kudos Received
159
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
311 | 09-07-2024 12:59 AM | |
418 | 09-03-2024 12:36 AM | |
344 | 09-03-2024 12:09 AM | |
256 | 08-30-2024 06:23 AM | |
944 | 08-26-2024 04:39 PM |
09-18-2024
04:39 PM
2 Kudos
Hi , well lets start with the last thing you have said because Im carious how its happening that one records override the other record. I assume you are using ExcelReader , correct? If that is the case I have tried creating an excel with two sheets that share same schema and both sheers have the same records as follows: Sheet1: Address1 Sheet2: Address2 I read the file using FetchFile processor then I passed the content to ConvertRecord Processor where the reader is ExcelReader and the Writer JsonRrecordSetWriter configured as follows: ExcelReader: Im passing Avro schema to assign proper field name and type as follows: {
"namespace": "nifi",
"name": "user",
"type": "record",
"fields": [
{ "name": "ID", "type": "int" },
{ "name": "NAME", "type": "string" },
{ "name": "ADDRESS", "type": "string" }
]
} For JsonRecordSetWriter I used with the default settings no changes. Here is how my output looked like which account for all the records from both sheets even with duplicates (1, sam, TX): [ {
"ID" : 1,
"NAME" : "sam",
"ADDRESS" : "TX"
}, {
"ID" : 2,
"NAME" : "Ali",
"ADDRESS" : "WA"
}, {
"ID" : 1,
"NAME" : "sam",
"ADDRESS" : "TX"
}, {
"ID" : 2,
"NAME" : "Ali",
"ADDRESS" : "FL"
} ] So Im carious what is happening in your case so that the record is overwritten. Maybe if we figure out this problem we can solve it as you said by using JOLT. I think Fork\Join Enrichment will still work specially since you are reading the info that you are trying to merge from the same file but the flow is going to be different and you might need two ExcelReader for each sheet which means that you need to read the excel twice. The flow will look like the following in a high level: You can avoid having two excel reader by passing the Sheet info as flowfile attribute. However you still need to read it twice for each sheet and then join the info based on the joining strategy that works best for you. Hope that helps.
... View more
09-18-2024
05:31 AM
1 Kudo
Hi @Crags , It depends. Are you getting this data from the different sources around the same time - may be using common trigger or cron schedule - or are you getting the data independent of each other ? If its the first case, then you might be able to take advantage of ForkEnrichment\JoinEnrichment. The JoinEnrichment provides different merging strategy which you can select from based on the structure and the order of your data. If its the second case and lets assume that you end up saving this information to a database , then you can apply the merging in the data base by passing product data to sql stored proc for example, the stored proc will check if the data exist for the given Id and if it does then apply an update statement otherwise insert statement. Hope that helps.
... View more
09-13-2024
06:34 AM
2 Kudos
Hi @moonspa , You might want to look into ForkEnrichment/JoinEnrichment processors. Lets assume that CSV has the latest information that needs to be merged into the Database, then using processor like GetFile\FetchFile you get the CSV file, then you use ForkEnrichment processor to fork the flow into two: originl (csv) and enrichment using enrichment relationship for the DB, then you connect the result from the DB query (using ExecuteSQLRecord for example) and the original to the JoinEnrichment processor where you decide what is the merging strategy. For example you can use SQL merge strategy so that you can link records using the ID with Full Join. This will give you the full list from both csv and the dB, however since this not real sql you cant use sql function like isNull or conditional statement to set the status but from there you can use QueryRecord processor where you can utilize SQL Calcite syntax to generate the status (new, same , deleted, changed) column. Keep in mind as the documentation on the Fork\JoinEnrichment processor indicate that using SQL strategy with large amount of data might consume a lot of jvm heap which can lead to out of memory errors. To avoid this look into the section "More Complex Joining Strategies" toward the end to use a different approach with the Wrapper strategy alongside ScriptedTransformRecord processor. Hope that helps. let me know if you have more questions. If you found this is helpful please accept the solution. Thanks
... View more
09-11-2024
09:54 PM
Hi @mjmoore , Im not gradle expert but it seems like it has plugins for nifi: https://plugins.gradle.org/search?term=nifi will that work?
... View more
09-07-2024
07:58 AM
2 Kudos
Hi @xtd , Welcome to the community. you definitely look like newbie :), but dont worry, as this is common when you start with Nifi but with time and experience you get better for sure. There are many tutorials out there to help you get started that I would recommend going through to understand how nifi works and what are the best practices. However one crucial concept that every beginner needs to be familiar with early on can be found in series of youtube videos by Mark Payne who is one the creators of Nifi around Nifi Anti Pattern that I recommend you to watch. I can see many issues with this flow. Above all its not doing it the nifi way. Here are some notes on some of the main issues: 1- When you work with formats like CSV, Json , xml ...etc. You need to take advantage of Nifi out of the box capabilities to parse the data and not doing that using replace text which could be inefficient. 2- Its not recommended to use ExtractText to extract the whole content of a flowfile into an attribute when you cant control how big the content can be. you will hear that a lot that attribute are stored in JVM heap which could be limited and expensive which can cause problem if you have large content. 3-When passing parameters to your SQL query through file attribute the N placeholder in sql.args.N.value stands for the parameter index in the query , for example if you the following insert statement: insert into Table1 (col1, col2) values (?,?) Then the processor expects two set of sql attributes where for the first arg (N=0 ) and the second (N=1) as in sq1.args.0.value, sql.args.1.value...etc. The way I would design this flow depends on really what are you trying to do after the ExecuteSQL. The result of the ExecuteSQL if you pass in the list of ID's correctly and based on the the configuration will return a single flow file with the list of records matching the ID's in an Avro format, so what are you going to do with this result? My guess and based on similar scenarios you probably want to process each records on its own wither you want to do data enrichment or some API call ...etc. Based on that here is how I would design this flow: 1- Get the File from the source (GetFile, GetSFTP...etc.) 2- Use SplitRecord to to get every json record into singleflow and convert it into into format that is easy to parse and extract attributes using methods other than Regex in ExtractText, here is an example: Im converting to Json because I can extract the ID using json path as we will see in the next step. You can use the default setting for the reader and writer services. 3- User EvaluateJsonPath to extract the ID into an attribute. This way we know that the value we are going to extract is single integer value and we wont be concerned much about heap storing large content. 4- Fetch the record corresponding to each ID from SQL. I know you probably think that this is going to do sql transaction for each ID vs one select for all and maybe this is the pitfall of this design but those transaction are going to be short and simple and probably perform better specially when you have a cluster where do load balancing or you can increase the number of threads on the processor itself . As I mentioned above the ExecuteSQL will give you the result in Avro format which you cant do much parsing with unless you convert to other parsable format (json, csv, xml...etc) using ConvertRecord , however you can save the the need for ConvertRecord by using ExecuteSQLRecord instead of ExecuteSQL where you can set the record writer to whatever format before sending the result to the success relationship. Lets assume I want to convert the result back to CSV, the configuration would be: Once you get the record in the intended format you can do whatever needed afterward. I think you have enough to get going. Let me know if you have more questions. If you find this is helpful please accept the solution. Thanks
... View more
09-07-2024
12:59 AM
4 Kudos
Hi @shiva239 , I dont think there is a problem using the DuplicateFlowFile processor in production environment. If its really intended for test environment then it wont be provided as an option to begin with. Actually when I search the community for question around flowfile replication I found this post that mentions this processor as part of the solution but I dont see any comments advising against it siting test vs prod regardless if it helped in that case or not. However, if you are not comfortable using the Duplicate processor, there are of course different ways of doing the same thing in Nifi. Lets assume you want to write code to address this problem instead of using Nifi, how would you do it without having to repeat the same code for each target DB? I think one of the first things that comes to mind is using a loop. You can create a loop in Nifi in multiple ways, the easiest I can think of is to use the RetryFlowFile processor. Although its intended more for error handling you can still use it to replicate a loop flow. All you have to do after getting the file is to set the maximum number of retries to how many times ( or steps ) you want to execute against and then redirect the retry relationship to itself and the next step (assign DB & Table based on index). Once the number of retries exceeded you can handle that via the retries_exceeded relationship or you can simply terminate the relationship so its completed. The RetryFlowFile will set a counter on each retried flowfile which you can use to assign the DB and Table accordingly. Here is an example of simple flow that loops 5 times against a given flowfile input from GenerateFlowFile: This how the RetryFlowFile is configured: The Retry Attribute will store the retries count in the provided attribute name "flowfile.retries". If you run the GenerateFlowFile once and look at the LogMessage data provenance you will see it was executed 5 times against the same content : If you check the attributes on each event flow file you will see the flowfiles.retries is populated with the nth time the flowfile was retried. Keep in mind the data provenance stores the last event at the top which means the first event flowfile attribute will have the value of 5: hope that helps. If it does, please accept the solution. Thanks
... View more
09-03-2024
12:36 AM
3 Kudos
I think you are confusing the ExtractText and the ReplaceText proessors. The ExtractText doesn't have Search Value & Replacement Value properties but the ReplaceText does. That is why I said post screenshot would be helpful because have I known that its replace Text my answer would have been different. To get the desired result in this case , you need to specify the following pattern in the Search Value Property: ^(.{5})(.{10}).* Basically you need to specify the full lline text that you want to replace with the matched group. When you stopped at "^(.{5})(.{10})" it meant that you only want to replace up to the 15th character of the full text with the result $1,$2 and that is why you were getting the reminder of the text. By adding ".*" at the end it will replace the whole line and not just up to the 15th character. The final config will look like this I hope that makes sense.
... View more
09-03-2024
12:09 AM
1 Kudo
Hi,
My Apologies. I think I forgot to mention that in both cases you need to set the Timestamp Format in the CSVRecordWriter to the target format as follows since by default it converts the datetime to epoch time:
The point from the conversion in the QueryRecord is to tell the CSVReader that this is a datetime , however without setting the format in the writer it was converting it back to epoch time as the documentation states:
Setting the format there is critical to get the desired output.
Hope that helps.
... View more
09-02-2024
04:53 PM
2 Kudos
Hi, It could have been helpful if you were able to provide some examples regarding the different scenarios with what is expected vs what are you getting. Also providing screenshot of the processor\s in question can help making sure that you have the correct configuration to handle your case. One thing confusing to me is you dont mention anything about white spaces and if they count as a character in case of the name or the address or not. Going with what you provided, if we assume we have the following line: smithaddress123AAAA where name expected to be: smith (1-5) address: address123 (6-15) I have configured the ExtractAddress processor as follows (basically adding new dynamic properties to define the extracted attributes): The output flowfile will have the following attribute which what is expected: The reason on why you are getting additional attributes with an index is because how the processor works in breaking up matching group. You can read more about this here. If you find this helpful please accept the solution. Thanks
... View more
08-31-2024
10:19 AM
1 Kudo
Hi @NagendraKumar , Im not sure that you can use the function "DATE_FROM_UNIX_DATE" since according to the sqlcalcite documentation its not a standard function. If I may recommend two approaches to solve this problem: 1- Using Sql Calcite function TIMESTAMPADD: select TIMESTAMPADD(SECOND, 1724851471,cast('1970-01-01 00:00:00' as timestamp)) mytimestamp from flowfile 2- Using Expression Language: select '${literal('1724851471'):multiply(1000):format('yyyy-MM-dd HH:mm:ss')}' mytimestamp from flowfile In both cases you have to be aware of the timezone that the timestamp is converted into I think one uses local while the other uses GMT Hope that helps. If it helps please accept the solution. Thanks
... View more