Member since
07-29-2020
347
Posts
107
Kudos Received
101
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
39 | 12-03-2023 12:02 PM | |
53 | 12-01-2023 12:31 PM | |
86 | 12-01-2023 05:37 AM | |
110 | 11-27-2023 03:08 PM | |
77 | 11-21-2023 06:32 AM |
11-28-2023
03:13 PM
Hi, I have managed to download the latest Nifi 2.0.0 M1 and I'm trying to run it on my windows 10 machine. Doing some preliminary testing I ran into the following issues: 1- The system requirement indicates that (https://nifi.apache.org/project-documentation.html ) indicates that at minimum I need Java 17, but when I try to start nifi using run.bat I get the following error: Error: LinkageError occurred while loading main class org.apache.nifi.bootstrap.RunNiFi
java.lang.UnsupportedClassVersionError: org/apache/nifi/bootstrap/RunNiFi has been compiled by a more recent version of the Java Runtime (class file version 65.0), this version of the Java Runtime only recognizes class file versions up to 61.0 It turns out it needs Java 21. Not sure if the documentation has not been updated or if Im missing something. 2- After upgrading to Java 21, Im able to start nifi using default configuration, the log file doesn't show any error and default username and password are generated, however when I try to browse for https://127.0.0.1:8443/nifi I get the following error: Not sure if this is something local to my machine but upon some internet search, I replaced url from 127.0.0.1 to localhost and it worked as I get to the log in screen. 3- This is not related to to 2.0 but I Want to mention in case someone else runs into the same issue. Basically by default, the generated user doesnt have access to security settings regarding Users & Policies. To enable this you need to set the : nifi.security.user.authorizer=managed-authorizer And add the generated username to the authorizers.xml as mentioned here : https://community.cloudera.com/t5/Support-Questions/No-show-Users-and-Policies-in-Global-Menu/td-p/339127 4- The ExecuteScript processor doesnt have Python(Jython) script engine. It could be its deprecated , but that is not mentioned in the depricated components site (https://cwiki.apache.org/confluence/display/NIFI/Deprecated+Components+and+Features ) . It only talks about removing support for Ruby , ECMAScript but not python . If its deprecated , what is the alternative ? Is it using Python API ? 4- Minor glitch I noticed when browsing nifi using chrome , for some reason the "Import from Registry" Icon is not showing! It shows up in Edge and it shows up if I open chrome in private mode. Not sure if its caching issue or what. Please advise. Thanks
... View more
Labels:
- Labels:
-
Apache NiFi
11-28-2023
09:17 AM
2 Kudos
Hi @Rohit1997jio , Not sure if this can be done using the FlowFile Expiration. However, if you are using Nifi 1.16 or higher you can take advantage of another methodology using the "retry" option on the target processor failure relationship as follows: The concept here is to use the settings for "Number of Retry Attempts" , " Retry Back Off Policy" & "Retry Max Back off Period" to configure how often and for how long the file is retried before it gets pushed to the failure relationship queue where you can then log the needed message. Every failed retry, the flowfile will be pushed back to the upstream queue and wait the designated time before its tried again. The challenge here is how to set those values so that the flowfile is only kept for certain period of time ( 1 hour in your case) specially the file will wait in the queue before its tried again depending wither you set the policy to Penalize or Yield, which is a good thing because you want to have some delay before the flowfile is tried again to avoid a lot of overhead. For example if you want the file to expire in an hour , and you want to try it 60 times where each time you wait 1 min before the next retry then you can set the values as follows: Number Of Retry Attempts: 60 Retry Back Off Policy : Penalize ( Set the Penalty Duration under Settings Tab to 1 min) Retry Maximum Back Off Period: 1 min ( this to ensure that the wait time in the queue doesnt exceed the initial penalty duration because every subsequent retry the duration penalty time is doubled - not sure why- ) In this case the flowfile will be retried 60 times upon failure , where each time the flowfile is pushed back to upstream queue an wait only max 1 min before the next retry , which makes the total time flowfile is retried = 60 * 1 min = 60 mins = 1 hour Depending how often you want to retry and how long you want to wait before the next retry, you can adjust those numbers accordingly. Once done with all the retries the flowfile will be moved to failure relationship where you can log the final message. If that helps please accept solution. Thanks
... View more
11-27-2023
03:08 PM
The issue you are having is when you try to read the parquet file using the ParquetReader where its failing on the invalid column names containing the illegal character "-" . I dont know of a way you can address this in Nifi. You probably have to fix this before you consume through Nifi. You can use pandas dataframe in python to help you remove any illegal characters from column name as an example : import pandas as pd
df = pd.read_parquet('source.parquet', engine='fastparquet')
# replace hyphen with underscore in column names
df.columns = df.columns.str.replace("-","_")
df.to_parquet("target.parquet",engine='fastparquet') Its possible to do this through Nifi as well using ExecuteStreamCommand : https://community.cloudera.com/t5/Support-Questions/Can-anyone-provide-an-example-of-a-python-script-executed/td-p/192487 The steps will be like this: 1- Fetch Parquet from S3 2- Save to Staging area with certain filename using PutFile 3- Run ExecuteStreamCommand and pass filename and path to the py . The py script will rename columns as shown above and save final copy to target folder 4- Use FetchFile to get the final parquet file from target folder using the same filename 5- Convert Record .... If that helps please accept solution. Thanks
... View more
11-22-2023
09:23 AM
Hi @glad1 , Can you elaborate more on the data that you want to remove? For example if the data is part of the CSV and it has unique value in one or more columns, then you can use QueryRecord processor where the query exclude records with this unique value. If the data is out of the CSV - like a header information - then depending how this data look like and if its surrounded with some special characters then you can use ReplaceText Processor with regex that would isolate those lines and then replace them with empty space and so on. If you can provide some sample data it would help in figuring out the best solution for this scenario. Thanks
... View more
11-21-2023
08:09 AM
1 Kudo
Can you provide more details what the issue is , what the error message you are getting if any and where and in which processor is causing the problem based on the provided input and expected output. I have never used PutDynamoDB but here are some links that can help: https://www.youtube.com/watch?si=ctBH-f-JOzAPgKAJ&embeds_referring_euri=https%3A%2F%2Fwww.google.com%2F&source_ve_path=MzY4NDIsMTM5MTE3LDEzOTExNywyODY2NCwxNjQ1MDY&feature=emb_share&v=Aw6PCz8gbmA https://stackoverflow.com/questions/45840156/how-putdynamodb-works-in-nifi
... View more
11-21-2023
06:32 AM
1 Kudo
Hi @scoutjohn , I think you are over complicating the spec unless Im missing something. You can do the transformation in one shift spec if you are just looking to move the "serviceCharacteristic" elements to the upper level where the service object remain as is, and value of "name" is used as parent to the "value" array. In this case you can use the following spec: [
{
"operation": "shift",
"spec": {
"serviceOrderItem": {
"*": {
"*": "serviceOrderItem[&1].&",
"service": {
"href": "serviceOrderItem[&2].service.href",
"id": "serviceOrderItem[&2].service.id",
"serviceCharacteristic": {
"*": {
"value": "serviceOrderItem[&4].@(1,name)"
}
}
}
}
}
}
}
] If that helps please accept solution. Thanks
... View more
11-19-2023
08:14 AM
1 Kudo
Hi @simonsig , There is no straight forward generic way to do this using just jolt only. What you are looking for involves some regex manipulation that I dont think Jolt spec support. Maybe at some point it will be supported through the "modify-overwrite-beta" spec by adding regexReplace function to the string functions. Jolt however can support simple pattern matching. For example, if you use RHS "*=*" when traversing the tags array value in the spec , it will give you values that contain "=" character. You can use this to accommodate for all possible special characters then direct values to InvalidTag object , whatever is left "*" can be directed to valid tags array. Then you can use another spec to remove the InvalidTags. The drawback of this is that you have to know all possible special characters and the list them as in the following spec: [
{
"operation": "shift",
"spec": {
"tags": {
"*": {
// find all values with special character listed
// below and move to InvalidTags
"*\\\"*": {
"$": "InvalidTags[]"
},
"*-*": {
"$": "InvalidTags[]"
},
"*=*": {
"$": "InvalidTags[]"
},
"*:*": {
"$": "InvalidTags[]"
},
//The values that wont have any of the special
//characters above will be moved to tags
"*": {
"$": "tags[]"
}
}
}
}
},
{
"operation": "remove",
"spec": {
//Remove InvalidTags
"InvalidTags": ""
}
}
] If you cant account for all possible special characters , then you cant just rely on Jolt . The simplest way I can think of is to use an UpdateRecord processor before Jolt where you can use Expression Language that support regex replace functions to replace all special characters using the regex pattern "\W+" with a common character like "?" then you can use the Jolt spec above but list only "*?*" values to be moved to InvalidTags. The UpdateRecord will look like: The value for the dynamic property /tags[*] which has the path to the tag array values: ${field.value:replaceAll('\W+','?')} Note: Based on your input make sure the JsonRecordSetWrite OutputGrouping property is set to "One Line Per Object" The Jolt Spec in this case will be as follows: [
{
"operation": "shift",
"spec": {
"tags": {
"*": {
"*?*": {
"$": "InvalidTags[]"
},
"*": {
"$": "tags[]"
}
}
}
}
},
{
"operation": "remove",
"spec": {
"InvalidTags": ""
}
}
] This way you dont have to worry about what special characters you might end up with. If you find this helpful please accept solution. Thanks
... View more
11-15-2023
11:06 AM
So if the goal is that at any given time the number of concurrent upload is always no more than 20, then ignore the last part in my response that talks about "Batch Processing at the group Level". All you need is basically to get the records from the DB as I have explained and on the processor that does the upload you can set the Concurrent Tasks to 20 with the consideration listed above. Also Since you have too many records I would also consider using GenerateTableFetch before ExecuteSQL which will help generate sql statement that will partition the data into different pages given that you have a column with Numeric value such as an id that is sequence and evenly distributed. for more info see: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.GenerateTableFetch/ This way you can setup a schedule on the GenerateTableFetch to generate the different partitioned queries so that the fetched data downstream is processed without causing backpressure or out of memory exceptions. You can Accept the solution once all your questions are answered and the issue is resolved.
... View more
11-14-2023
11:55 AM
So Im not sure what are you going to use for inserting the Data into EleasticSearch/Slor. Its possible that you want to use concurrent curl api command with ExecuteStreamCommand processor. Alternatively, you can use InvokeHttp processor to construct the API call which comes equipped with a lot of options and parameters to make the API call much easier. Nifi also comes with out of the box processor to interact with ElasticSearch & Solr like: PutElasticsearchHttp, PutElasticsearchJson, PutSolrRecord,PutSolrContentStream....etc. This can be much easier to setup than using API. If you are looking to do multiple upload concurrently, regardless of what process you use, you can configure that on the processor itself by right click on the processor, select Configure , select the Scheduling tab and set the proper value in the "Concurrent Tasks" box, for example you can set 20: Keep in mind the following when you set this value : 1- Single Node vs. Cluster: if you are using single node then you can set the value of Concurrent Tasks as much as needed , but if you have a cluster and you are using some Load Balancing before making the upload to the target system , then you are already doing some concurrency where a given record might be uploaded at the same time by different nodes and you have to take that in consideration. For example, if you have 5 node cluster and 20 record per batch then each node will take 4 records and in this case the Concurrent Tasks will have the value of 4. 2- Make sure the "Maximum Timer Driven Thread Count" is set to >= 20 to allow Nifi to process 20 tasks concurrently , by default on this value is set to 10. To get to this value click on the 3 bar icon on top right , select Controller Settings and the value should be under the first tab "General": Recommendation on how to set this value: https://community.cloudera.com/t5/Support-Questions/How-to-configure-NiFi-to-maximize-the-usage-of-system-cores/td-p/189581 When it comes to batching the data from the source database , you can use a processor like ExecuteSQL or ExecuteSQLRecord to fetch data out. in those processors configuration you need to setup the DB Connection Pool service to create the connection, for more information on this refer to : https://community.cloudera.com/t5/Support-Questions/how-to-configure-and-connect-mysql-with-nifi-and-perform/m-p/109478 https://www.youtube.com/watch?v=fuEosO24fgI Also in the configuration you can specify that you want 20 max records per flowfile in the "Max Rows Per Flow File" so that you dont get all 2 million in one file which might take a lot of time and memory to process which can result in an error depending on your heap size. ExcuteSQL will give you the result in Avro format. You can use ConvertAvroToJson processor if you want to convert to Json, or use ExecuteSQLRecord processor and set the RecordWriter to the desired target format. If you want to ensure that you are processing 20 records at a time and you want to prevent any records from being processed before the older batch is complete , you can use the Batch Processing at the Group Level as described here: https://www.youtube.com/watch?v=kvJx8vQnCNE&t=296s So the idea is you put processing & uploading the 20 records in a group where you configure the "Process Group FlowFile Concurrency" as described in the video above. If your ExecuteSQL fetch 20 rows per flowfile then you allow one flowfile at a time to enter the group. Inside the group you need to split the records and upload concurrently. Hopefully that will at least get you going. If you have any questions please let me know. If you find this helpful please accept solution. Thanks
... View more
11-14-2023
09:45 AM
Hi @Sipping1n0s , Can you provide more information on the other system? How do you batch insert 20 records? is this an API call or some database? Thanks
... View more