Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2155 | 12-20-2024 05:49 AM | |
| 2453 | 12-19-2024 08:33 PM | |
| 2201 | 12-19-2024 06:48 AM | |
| 1465 | 12-17-2024 12:56 PM | |
| 2110 | 12-16-2024 04:38 AM |
11-22-2023
09:23 AM
Hi @glad1 , Can you elaborate more on the data that you want to remove? For example if the data is part of the CSV and it has unique value in one or more columns, then you can use QueryRecord processor where the query exclude records with this unique value. If the data is out of the CSV - like a header information - then depending how this data look like and if its surrounded with some special characters then you can use ReplaceText Processor with regex that would isolate those lines and then replace them with empty space and so on. If you can provide some sample data it would help in figuring out the best solution for this scenario. Thanks
... View more
11-21-2023
08:09 AM
1 Kudo
Can you provide more details what the issue is , what the error message you are getting if any and where and in which processor is causing the problem based on the provided input and expected output. I have never used PutDynamoDB but here are some links that can help: https://www.youtube.com/watch?si=ctBH-f-JOzAPgKAJ&embeds_referring_euri=https%3A%2F%2Fwww.google.com%2F&source_ve_path=MzY4NDIsMTM5MTE3LDEzOTExNywyODY2NCwxNjQ1MDY&feature=emb_share&v=Aw6PCz8gbmA https://stackoverflow.com/questions/45840156/how-putdynamodb-works-in-nifi
... View more
11-21-2023
06:32 AM
1 Kudo
Hi @scoutjohn , I think you are over complicating the spec unless Im missing something. You can do the transformation in one shift spec if you are just looking to move the "serviceCharacteristic" elements to the upper level where the service object remain as is, and value of "name" is used as parent to the "value" array. In this case you can use the following spec: [
{
"operation": "shift",
"spec": {
"serviceOrderItem": {
"*": {
"*": "serviceOrderItem[&1].&",
"service": {
"href": "serviceOrderItem[&2].service.href",
"id": "serviceOrderItem[&2].service.id",
"serviceCharacteristic": {
"*": {
"value": "serviceOrderItem[&4].@(1,name)"
}
}
}
}
}
}
}
] If that helps please accept solution. Thanks
... View more
11-19-2023
08:14 AM
1 Kudo
Hi @simonsig , There is no straight forward generic way to do this using just jolt only. What you are looking for involves some regex manipulation that I dont think Jolt spec support. Maybe at some point it will be supported through the "modify-overwrite-beta" spec by adding regexReplace function to the string functions. Jolt however can support simple pattern matching. For example, if you use RHS "*=*" when traversing the tags array value in the spec , it will give you values that contain "=" character. You can use this to accommodate for all possible special characters then direct values to InvalidTag object , whatever is left "*" can be directed to valid tags array. Then you can use another spec to remove the InvalidTags. The drawback of this is that you have to know all possible special characters and the list them as in the following spec: [
{
"operation": "shift",
"spec": {
"tags": {
"*": {
// find all values with special character listed
// below and move to InvalidTags
"*\\\"*": {
"$": "InvalidTags[]"
},
"*-*": {
"$": "InvalidTags[]"
},
"*=*": {
"$": "InvalidTags[]"
},
"*:*": {
"$": "InvalidTags[]"
},
//The values that wont have any of the special
//characters above will be moved to tags
"*": {
"$": "tags[]"
}
}
}
}
},
{
"operation": "remove",
"spec": {
//Remove InvalidTags
"InvalidTags": ""
}
}
] If you cant account for all possible special characters , then you cant just rely on Jolt . The simplest way I can think of is to use an UpdateRecord processor before Jolt where you can use Expression Language that support regex replace functions to replace all special characters using the regex pattern "\W+" with a common character like "?" then you can use the Jolt spec above but list only "*?*" values to be moved to InvalidTags. The UpdateRecord will look like: The value for the dynamic property /tags[*] which has the path to the tag array values: ${field.value:replaceAll('\W+','?')} Note: Based on your input make sure the JsonRecordSetWrite OutputGrouping property is set to "One Line Per Object" The Jolt Spec in this case will be as follows: [
{
"operation": "shift",
"spec": {
"tags": {
"*": {
"*?*": {
"$": "InvalidTags[]"
},
"*": {
"$": "tags[]"
}
}
}
}
},
{
"operation": "remove",
"spec": {
"InvalidTags": ""
}
}
] This way you dont have to worry about what special characters you might end up with. If you find this helpful please accept solution. Thanks
... View more
11-15-2023
11:06 AM
So if the goal is that at any given time the number of concurrent upload is always no more than 20, then ignore the last part in my response that talks about "Batch Processing at the group Level". All you need is basically to get the records from the DB as I have explained and on the processor that does the upload you can set the Concurrent Tasks to 20 with the consideration listed above. Also Since you have too many records I would also consider using GenerateTableFetch before ExecuteSQL which will help generate sql statement that will partition the data into different pages given that you have a column with Numeric value such as an id that is sequence and evenly distributed. for more info see: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.GenerateTableFetch/ This way you can setup a schedule on the GenerateTableFetch to generate the different partitioned queries so that the fetched data downstream is processed without causing backpressure or out of memory exceptions. You can Accept the solution once all your questions are answered and the issue is resolved.
... View more
11-14-2023
11:55 AM
So Im not sure what are you going to use for inserting the Data into EleasticSearch/Slor. Its possible that you want to use concurrent curl api command with ExecuteStreamCommand processor. Alternatively, you can use InvokeHttp processor to construct the API call which comes equipped with a lot of options and parameters to make the API call much easier. Nifi also comes with out of the box processor to interact with ElasticSearch & Solr like: PutElasticsearchHttp, PutElasticsearchJson, PutSolrRecord,PutSolrContentStream....etc. This can be much easier to setup than using API. If you are looking to do multiple upload concurrently, regardless of what process you use, you can configure that on the processor itself by right click on the processor, select Configure , select the Scheduling tab and set the proper value in the "Concurrent Tasks" box, for example you can set 20: Keep in mind the following when you set this value : 1- Single Node vs. Cluster: if you are using single node then you can set the value of Concurrent Tasks as much as needed , but if you have a cluster and you are using some Load Balancing before making the upload to the target system , then you are already doing some concurrency where a given record might be uploaded at the same time by different nodes and you have to take that in consideration. For example, if you have 5 node cluster and 20 record per batch then each node will take 4 records and in this case the Concurrent Tasks will have the value of 4. 2- Make sure the "Maximum Timer Driven Thread Count" is set to >= 20 to allow Nifi to process 20 tasks concurrently , by default on this value is set to 10. To get to this value click on the 3 bar icon on top right , select Controller Settings and the value should be under the first tab "General": Recommendation on how to set this value: https://community.cloudera.com/t5/Support-Questions/How-to-configure-NiFi-to-maximize-the-usage-of-system-cores/td-p/189581 When it comes to batching the data from the source database , you can use a processor like ExecuteSQL or ExecuteSQLRecord to fetch data out. in those processors configuration you need to setup the DB Connection Pool service to create the connection, for more information on this refer to : https://community.cloudera.com/t5/Support-Questions/how-to-configure-and-connect-mysql-with-nifi-and-perform/m-p/109478 https://www.youtube.com/watch?v=fuEosO24fgI Also in the configuration you can specify that you want 20 max records per flowfile in the "Max Rows Per Flow File" so that you dont get all 2 million in one file which might take a lot of time and memory to process which can result in an error depending on your heap size. ExcuteSQL will give you the result in Avro format. You can use ConvertAvroToJson processor if you want to convert to Json, or use ExecuteSQLRecord processor and set the RecordWriter to the desired target format. If you want to ensure that you are processing 20 records at a time and you want to prevent any records from being processed before the older batch is complete , you can use the Batch Processing at the Group Level as described here: https://www.youtube.com/watch?v=kvJx8vQnCNE&t=296s So the idea is you put processing & uploading the 20 records in a group where you configure the "Process Group FlowFile Concurrency" as described in the video above. If your ExecuteSQL fetch 20 rows per flowfile then you allow one flowfile at a time to enter the group. Inside the group you need to split the records and upload concurrently. Hopefully that will at least get you going. If you have any questions please let me know. If you find this helpful please accept solution. Thanks
... View more
11-14-2023
09:45 AM
Hi @Sipping1n0s , Can you provide more information on the other system? How do you batch insert 20 records? is this an API call or some database? Thanks
... View more
11-12-2023
07:06 AM
1 Kudo
Hi @skoch244 , You are getting this error because you are suppose to use JoinEnrichment in conjunction with ForkEnrichment processor as the documentation of those processors specify: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html The error details indicate that it cant find 'original' and 'enrichment' flowfiles. Those terms are setup and created by the FormEnrichment processor through its relationships: enrichment & original. Im not sure what your actual flow looks like and what are the actual source for each of the data sources but just using GenerateRecord for both dataset wont work because this processor cant take upstream relationship which doesnt apply to the pattern of using the Fork\JoinEnrichment processors. The link above will have an example that will help you understand how those processors work. If you need more help with your actual flow please provide more information on the actual scenario you have. If you find this helpful please accept solution. Thanks
... View more
11-11-2023
06:40 AM
There is no magic solution for those scenarios and no one solution fits all out of Nifi that I can think of. You have to understand the nature of the input before you start consuming it and you have to provide the solution catered to this input. Sometimes if you are lucky you can combine multiple scenarios into one flow but that still depends on the complexity of the input. Even thought in your first scenario the second option I proposed seem to be simple enough and it did the job, for your second example its more complex and I dont think the out of the box GrokReader will be able to handle such complexity, therefore the first option of using the ExtractText Processor will work better because you can customize your regex as needed. For example, based on the text you provided: JohnCena32 Male New York USA813668 I can use the following regex: [A-Z][a-z]+[A-Z][a-z]+\d+\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ In the ExtractText processor I will define a dynamic property for each attribute (city, age, firstname...etc.) and surround the segment of the pattern that corresponds to the value with a parenthesis to extract as matching group. For Example: Age: [A-Z][a-z]+[A-Z][a-z]+(\d+)\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ FirstName: ([A-Z][a-z]+)[A-Z][a-z]+\d+\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ Gender: [A-Z][a-z]+[A-Z][a-z]+\d+\s((?:Male|Female|M|F))\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ Country: [A-Z][a-z]+[A-Z][a-z]+\d+\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s([A-Za-z]+)\d+ And so on... This should give you the attribute you need. Then you can use the AttributeToJson processor to get the json output and finally if you want to convert the data to the proper type you can either user JoltTransformation or QueryRecord with cast as shown above. One final note: If you know how to use some external libraries in python for example or groovy or any of the supported code script in the ExecuteScript processor then you can use that to write your custom code to create the required fllowfile\attributes that will help you downstream to generate the final output. If that helps please accept solution. Thanks
... View more
11-10-2023
09:10 AM
Hi @Chai09 , There are different options to do this: 1- You can do ExtractText -> AttributeToJson. In the ExtractText you specify the attribute (Name, Age...) and the regex to capture each attribute value. The AttributeToJson processor you can list the attribute that you want to convert into Json in the AttributeList property and set the Destination to "flowfile-content" to get the Json you need. To learn how you to use the ExtractText please refer to: https://community.cloudera.com/t5/Support-Questions/How-to-ExtractText-from-flow-file-using-Nifi-Processor/m-p/190826 However, You might find the Age and Country values are given to you as string values vs integer, if that is OK with you then you dont need to do anything, If you need them to be integer then you have to use another processor like QueryRecord where you can cast the string into integer or JoltTransformationJson processor where you can do conversion using the following jolt spec: [
{
"operation": "modify-overwrite-beta",
"spec": {
"Age": "=toInteger(@(1,Age))",
"Country": "=toInteger(@(1,Country))"
}
}
] 2- The easiest way I found if you dont like to use Regex is using QueryRecord with RecordReader set as GrokReader and RecordWrite set as JsonRecordSetWriter as follows The JsonRecord is dynamic property that will define the relationship that will produce the Json output you are looking for and it has the following value: select Name
,CAST(AgeN as SMALLINT) as Age
,Gender
,City
,CAST(CountryN as SMALLINT) as Country
from FLOWFILE The GrokReader service is used to help you read unstructured data such as log files and its configured as the following: The Grok Expressions Property is set to the following: %{WORD:Name} %{NUMBER:AgeN} %{WORD:Gender} %{DATA:City} %{NUMBER:CountryN} The Grok Expression uses predefined Regex for the given types: WORD, NUMBER, DATA...etc. For more info: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-reaord-serialization-services-nar/1.23.2/org.apache.nifi.grok.GrokReader/additionalDetails.html The JsonRecordSetWriter service is configured as follows : This will produce the Json you are looking for the correct data types. Notice how I needed to still cast the Age,Country to INT despite they are being defined as Number in the Grok Expression and that is because JsonRecordSetWriter will still convert everything to string unless you provide an Avro Schema. If that helps please accept solution. Thanks
... View more