Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 3583 | 12-20-2024 05:49 AM | |
| 3826 | 12-19-2024 08:33 PM | |
| 3629 | 12-19-2024 06:48 AM | |
| 2364 | 12-17-2024 12:56 PM | |
| 3115 | 12-16-2024 04:38 AM |
01-29-2024
08:11 AM
1 Kudo
@jarviszzzz, @BigJames , I understand your concern specially when you have too many columns to check against and large amount of records where indexing can make big difference in response time. It seems that the LookupRecord is intended for simpler cases where you are checking against one key that is not nullable. For your case I would suggest two options: 1- You can use ExecuteSQL\ExecuteSQLRecord instead of LookupRecord. To use this option you need to extract all data used to filter, update or insert into flowfile attributes since the content will change after ExecuteSQL. Using this processor you can specify the select statement ( in the SQL Select Query property ) with where clause that checks against all fields. The select property allows Expression Language which means you can use flowfile attributes to populate values dynamically. Using SQL and EL also you can handle null values accordingly. Now to check if there is a match or not , you can use simple RouteOnAttribute processor with one condition (dynamic property) : IsRecordFound: ${fileSize:gt(0)} the "fileSize" is a system attribute that gives you the size of the flowfile content, If it's 0 then there is no content return which mean no match , otherwise there is a match. 2- Defer checking for match (update) and no match (insert) to SQL . I like this approach because you are doing one trip to sql instead of two. This approach also requires you to extract all data into flowfile attributes. Basically you create stored proc where you pass record data as parameters ( or json if your sql can parse json). You can run this stored proc using PutSQL processor. The SQL statement property allows for Expression Language which mean you can pass stored proc parameter values dynamically. Inside the stored proc you can check if the record exist or not and do the proper action. One thing you have to be careful with using this approach is using asynchronous calls to the store proc with multi threading or on a cluster as this might end up in sql in sql deadlock exception which you can retry within nifi or the stored proc itself. I'm looping other experts like @MattWho , @cotopaul to see if they can provide other input as well. Thanks
... View more
01-26-2024
06:43 AM
1 Kudo
Hi @jarviszzzz , The lookupRecord processor is not suppose to work in a chain where the next lookup will filter based on the result of the previous one. Each LookupRecord is independent call to the database and it will return a result from the whole dataset based on the specified key. If you want to lookup record by multiple keys then you probably need to create new column that join all those key columns values together and use that in your lookupRecord, For example I would create a column called CompanyID_FiscalYear to join both values and use this column as my lookup key. In the dynamic property key of the LookupRecord you can use the concat function to concatenate values from different path: https://nifi.apache.org/docs/nifi-docs/html/record-path-guide.html#concat If that helps please accept solution. Thanks
... View more
01-26-2024
02:09 AM
2 Kudos
Hi @SandyClouds , I ran into this issue before and after some research I found that when you do the ConvertJsonToSQL nifi assigns timestamp data type (value = 93 in the sql.args.[n].type attribute ). When the PutSQL runs the generated sql statement it will parse the value according to the assigned type and format it accordingly. However for timestamp it expects it to be in the format of "yyyy-MM-dd HH:mm:ss.SSS" so if you are missing the milliseconds in the original datetime value it will fail with the specified error message. To resolve the issue make sure to assign 000 milliseconds to your datetime value before running the PUTSQL processor. You can do that in the source Json itself before the conversion to SQL or after conversion to SQL using UpdateAttribute, by using the later option you have to know which sql.args.[n].value will have the datetime and do expression language to reformat. If that helps please accept solution. Thanks
... View more
01-26-2024
01:33 AM
2 Kudos
Hi @ALWOSABY , What is the value of the FF_Content? is it the entire JSON record ? if so - as it appears from the specified path - Why not use the EvaluateJsonPath to get whatever values that are needed and store as attributes by setting the Destination property to flowfile-attribute. See the following post to learn more: https://community.cloudera.com/t5/Support-Questions/How-to-handle-json-using-EvaluateJsonPath-processor-in-NiFi/m-p/295335 If that helps please accept solution. Thanks
... View more
01-21-2024
08:17 PM
1 Kudo
OK, I see what is happening here. The QueryRecord is not needed and its giving you the error because its expecting Json format but its getting an insert sql statement that is created from the ConvertJsonToSQL processor. Does the record get inserted after the PutSQL gets executed on the insert statement? If so then the new id should be written as flowfile attribute "sql.generate.key" and its not going to be part of the flowfile content. The QueryRecord is not needed here and its not used for this purpose. Each flowfile will have a content which is the actual data that it represents and attributes representing the metadata , when you list the flowfile click on the (i) icon in the first column and select the Attributes tab and it should be there with the new ID value. I got confused because the ConvertJsontoSQL is showing an error status in your screenshot
... View more
01-20-2024
08:49 PM
Can you provide screenshot of the ConvertAvroToJson processor? What is the output you are getting out of this processor?
... View more
01-18-2024
06:13 PM
what seems to be the issue? please provide more details . If you are getting any error messages please share
... View more
01-17-2024
09:25 PM
Hi @jarviszzzz , If you use the PutSQL process there is property called "Obtain Generated Keys" which is described as follows: "If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. This may result in slightly slower performance and is not supported by all databases." So basically you dont have to do anything extra besides setting this property to true. The new id should be written back as flowfile attribute which will be called "sql.generate.key". The PutSQL is very flexible, you can convert the json to SQL using the ConvertJsonToSQL processor and then use the PutSQL without specifying anything in the SQL Statement property, keep in mind that if you choose the ConvertJsonToSQL approach the fields names should match the target table column names as well as the data types should be compatible. If the fields names dont match and\or you need some flexibility on how to insert the values, you can specify the sql insert statement in the Sql Statement property and use expression language to reference the different json field values but you need first to extract them into flowfile attributes using processor like EvaluateJsonPath and set the destination to flowfile attributes. If that helps please accept solution. Thanks
... View more
01-14-2024
09:14 PM
2 Kudos
Hi @enam , The concept of templates has been removed in 2.0 per the following article: https://medium.com/cloudera-inc/getting-ready-for-apache-nifi-2-0-5a5e6a67f450 You can either use the Nifi registry to share files as the article suggest. You can also right click on a given process group and select Download Flow Definition which will save the process group in json format. If that helps please accept solution. Thanks
... View more
12-29-2023
05:42 AM
1 Kudo
Hi @Heeya8876 , I ran into the same situation , what worked for me is changing the setting in the nifi.properties from nifi.python.command=pythons to nifi.python.command=python Also make sure the python venv package is installed on your machine : python -m venv If that helps please accept solution Thanks
... View more