Member since
07-29-2020
350
Posts
109
Kudos Received
105
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
50 | 12-06-2023 12:31 PM | |
54 | 12-05-2023 03:49 AM | |
82 | 12-03-2023 04:54 PM | |
98 | 12-03-2023 12:02 PM | |
43 | 12-02-2023 06:49 AM |
11-15-2023
11:06 AM
So if the goal is that at any given time the number of concurrent upload is always no more than 20, then ignore the last part in my response that talks about "Batch Processing at the group Level". All you need is basically to get the records from the DB as I have explained and on the processor that does the upload you can set the Concurrent Tasks to 20 with the consideration listed above. Also Since you have too many records I would also consider using GenerateTableFetch before ExecuteSQL which will help generate sql statement that will partition the data into different pages given that you have a column with Numeric value such as an id that is sequence and evenly distributed. for more info see: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.GenerateTableFetch/ This way you can setup a schedule on the GenerateTableFetch to generate the different partitioned queries so that the fetched data downstream is processed without causing backpressure or out of memory exceptions. You can Accept the solution once all your questions are answered and the issue is resolved.
... View more
11-14-2023
11:55 AM
So Im not sure what are you going to use for inserting the Data into EleasticSearch/Slor. Its possible that you want to use concurrent curl api command with ExecuteStreamCommand processor. Alternatively, you can use InvokeHttp processor to construct the API call which comes equipped with a lot of options and parameters to make the API call much easier. Nifi also comes with out of the box processor to interact with ElasticSearch & Solr like: PutElasticsearchHttp, PutElasticsearchJson, PutSolrRecord,PutSolrContentStream....etc. This can be much easier to setup than using API. If you are looking to do multiple upload concurrently, regardless of what process you use, you can configure that on the processor itself by right click on the processor, select Configure , select the Scheduling tab and set the proper value in the "Concurrent Tasks" box, for example you can set 20: Keep in mind the following when you set this value : 1- Single Node vs. Cluster: if you are using single node then you can set the value of Concurrent Tasks as much as needed , but if you have a cluster and you are using some Load Balancing before making the upload to the target system , then you are already doing some concurrency where a given record might be uploaded at the same time by different nodes and you have to take that in consideration. For example, if you have 5 node cluster and 20 record per batch then each node will take 4 records and in this case the Concurrent Tasks will have the value of 4. 2- Make sure the "Maximum Timer Driven Thread Count" is set to >= 20 to allow Nifi to process 20 tasks concurrently , by default on this value is set to 10. To get to this value click on the 3 bar icon on top right , select Controller Settings and the value should be under the first tab "General": Recommendation on how to set this value: https://community.cloudera.com/t5/Support-Questions/How-to-configure-NiFi-to-maximize-the-usage-of-system-cores/td-p/189581 When it comes to batching the data from the source database , you can use a processor like ExecuteSQL or ExecuteSQLRecord to fetch data out. in those processors configuration you need to setup the DB Connection Pool service to create the connection, for more information on this refer to : https://community.cloudera.com/t5/Support-Questions/how-to-configure-and-connect-mysql-with-nifi-and-perform/m-p/109478 https://www.youtube.com/watch?v=fuEosO24fgI Also in the configuration you can specify that you want 20 max records per flowfile in the "Max Rows Per Flow File" so that you dont get all 2 million in one file which might take a lot of time and memory to process which can result in an error depending on your heap size. ExcuteSQL will give you the result in Avro format. You can use ConvertAvroToJson processor if you want to convert to Json, or use ExecuteSQLRecord processor and set the RecordWriter to the desired target format. If you want to ensure that you are processing 20 records at a time and you want to prevent any records from being processed before the older batch is complete , you can use the Batch Processing at the Group Level as described here: https://www.youtube.com/watch?v=kvJx8vQnCNE&t=296s So the idea is you put processing & uploading the 20 records in a group where you configure the "Process Group FlowFile Concurrency" as described in the video above. If your ExecuteSQL fetch 20 rows per flowfile then you allow one flowfile at a time to enter the group. Inside the group you need to split the records and upload concurrently. Hopefully that will at least get you going. If you have any questions please let me know. If you find this helpful please accept solution. Thanks
... View more
11-14-2023
09:45 AM
Hi @Sipping1n0s , Can you provide more information on the other system? How do you batch insert 20 records? is this an API call or some database? Thanks
... View more
11-12-2023
07:06 AM
Hi @skoch244 , You are getting this error because you are suppose to use JoinEnrichment in conjunction with ForkEnrichment processor as the documentation of those processors specify: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.20.0/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html The error details indicate that it cant find 'original' and 'enrichment' flowfiles. Those terms are setup and created by the FormEnrichment processor through its relationships: enrichment & original. Im not sure what your actual flow looks like and what are the actual source for each of the data sources but just using GenerateRecord for both dataset wont work because this processor cant take upstream relationship which doesnt apply to the pattern of using the Fork\JoinEnrichment processors. The link above will have an example that will help you understand how those processors work. If you need more help with your actual flow please provide more information on the actual scenario you have. If you find this helpful please accept solution. Thanks
... View more
11-11-2023
06:40 AM
There is no magic solution for those scenarios and no one solution fits all out of Nifi that I can think of. You have to understand the nature of the input before you start consuming it and you have to provide the solution catered to this input. Sometimes if you are lucky you can combine multiple scenarios into one flow but that still depends on the complexity of the input. Even thought in your first scenario the second option I proposed seem to be simple enough and it did the job, for your second example its more complex and I dont think the out of the box GrokReader will be able to handle such complexity, therefore the first option of using the ExtractText Processor will work better because you can customize your regex as needed. For example, based on the text you provided: JohnCena32 Male New York USA813668 I can use the following regex: [A-Z][a-z]+[A-Z][a-z]+\d+\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ In the ExtractText processor I will define a dynamic property for each attribute (city, age, firstname...etc.) and surround the segment of the pattern that corresponds to the value with a parenthesis to extract as matching group. For Example: Age: [A-Z][a-z]+[A-Z][a-z]+(\d+)\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ FirstName: ([A-Z][a-z]+)[A-Z][a-z]+\d+\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ Gender: [A-Z][a-z]+[A-Z][a-z]+\d+\s((?:Male|Female|M|F))\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s[A-Za-z]+\d+ Country: [A-Z][a-z]+[A-Z][a-z]+\d+\s(?:Male|Female|M|F)\s[A-Z][a-z]+(?:\s[A-Z][a-z]+)?\s([A-Za-z]+)\d+ And so on... This should give you the attribute you need. Then you can use the AttributeToJson processor to get the json output and finally if you want to convert the data to the proper type you can either user JoltTransformation or QueryRecord with cast as shown above. One final note: If you know how to use some external libraries in python for example or groovy or any of the supported code script in the ExecuteScript processor then you can use that to write your custom code to create the required fllowfile\attributes that will help you downstream to generate the final output. If that helps please accept solution. Thanks
... View more
11-10-2023
09:10 AM
Hi @Chai09 , There are different options to do this: 1- You can do ExtractText -> AttributeToJson. In the ExtractText you specify the attribute (Name, Age...) and the regex to capture each attribute value. The AttributeToJson processor you can list the attribute that you want to convert into Json in the AttributeList property and set the Destination to "flowfile-content" to get the Json you need. To learn how you to use the ExtractText please refer to: https://community.cloudera.com/t5/Support-Questions/How-to-ExtractText-from-flow-file-using-Nifi-Processor/m-p/190826 However, You might find the Age and Country values are given to you as string values vs integer, if that is OK with you then you dont need to do anything, If you need them to be integer then you have to use another processor like QueryRecord where you can cast the string into integer or JoltTransformationJson processor where you can do conversion using the following jolt spec: [
{
"operation": "modify-overwrite-beta",
"spec": {
"Age": "=toInteger(@(1,Age))",
"Country": "=toInteger(@(1,Country))"
}
}
] 2- The easiest way I found if you dont like to use Regex is using QueryRecord with RecordReader set as GrokReader and RecordWrite set as JsonRecordSetWriter as follows The JsonRecord is dynamic property that will define the relationship that will produce the Json output you are looking for and it has the following value: select Name
,CAST(AgeN as SMALLINT) as Age
,Gender
,City
,CAST(CountryN as SMALLINT) as Country
from FLOWFILE The GrokReader service is used to help you read unstructured data such as log files and its configured as the following: The Grok Expressions Property is set to the following: %{WORD:Name} %{NUMBER:AgeN} %{WORD:Gender} %{DATA:City} %{NUMBER:CountryN} The Grok Expression uses predefined Regex for the given types: WORD, NUMBER, DATA...etc. For more info: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-reaord-serialization-services-nar/1.23.2/org.apache.nifi.grok.GrokReader/additionalDetails.html The JsonRecordSetWriter service is configured as follows : This will produce the Json you are looking for the correct data types. Notice how I needed to still cast the Age,Country to INT despite they are being defined as Number in the Grok Expression and that is because JsonRecordSetWriter will still convert everything to string unless you provide an Avro Schema. If that helps please accept solution. Thanks
... View more
11-09-2023
07:09 AM
Hi @PradNiFi1236 , It seems in your spec when you do the "modify-overwrite-beta" it will create the object "AltriDatiGestionali" on each array element regardless of the Natura value and it doesnt seem that there is an easy way to remove the unwanted "AltriDatiGestionali" without over complicating the spec. However if you can change the order of how you do things in the spec you might be able to avoid this problem all together , so instead of doing the following: shift-1, shift-2, modify-overwrite-beta, remove, cardinality you can change the order as follows: shift-1, modify-overwrite-beta, remove, shift-2, cardinality So basically after you isolate the "AltriDatiGestionali" object in the first shift , then you do the modification over there, then you remove unwanted fields and finally you shift to the element where Natura = N3.5 and finish with the cardinality. I dont think the recursivelySquashNulls modify at the end is doing anything so you can remove it. Here is how the spec looks like: [
{
"operation": "shift",
"spec": {
"FatturaElettronicaBody": {
"DatiBeniServizi": {
"DettaglioLinee": {
"*": {
"AltriDatiGestionali": "FatturaElettronicaBody.DatiBeniServizi.AltriDatiGestionali",
"*": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&1].&"
}
}
}
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"FatturaElettronicaBody": {
"DatiBeniServizi": {
"AltriDatiGestionali": {
"splitRiferimentoTesto": "=split('-',@(1,RiferimentoTesto))",
"stringsize": "=size(@(1,splitRiferimentoTesto[0]))",
"RiferimentoTesto": "=substring(@(1,splitRiferimentoTesto[0]),26,@(1,stringsize))",
"stringsize1": "=size(@(1,splitRiferimentoTesto[1]))",
"RiferimentoD": "=substring(@(1,splitRiferimentoTesto[1]),11,@(1,stringsize1))",
"splitRiferimentoData": "=split('/',@(1,RiferimentoD))",
"RiferimentoData": "=concat(@(1,splitRiferimentoData[2]),'-',@(1,splitRiferimentoData[1]),'-',@(1,splitRiferimentoData[0]))"
}
}
}
}
}
,
{
"operation": "remove",
"spec": {
"FatturaElettronicaBody": {
"DatiBeniServizi": {
"AltriDatiGestionali": {
"splitRiferimentoTesto": "",
"splitRiferimentoData": "",
"stringsize": "",
"stringsize1": "",
"RiferimentoD": ""
}
}
}
}
}
,
{
"operation": "shift",
"spec": {
"FatturaElettronicaBody": {
"DatiBeniServizi": {
"DettaglioLinee": {
"*": {
"NumeroLinea": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&1].NumeroLinea",
"PrezzoTotale": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&1].PrezzoTotale",
"Descrizione": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&1].Descrizione",
"Natura": {
"N3.5": {
"$": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].Natura",
"@(2,NumeroLinea)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].NumeroLinea",
"@(2,PrezzoTotale)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].PrezzoTotale",
"@(2,Descrizione)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].Descrizione",
"@(4,AltriDatiGestionali)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].AltriDatiGestionali"
},
"*": {
"$": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].Natura",
"@(2,NumeroLinea)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].NumeroLinea",
"@(2,PrezzoTotale)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].PrezzoTotale",
"@(2,Descrizione)": "FatturaElettronicaBody.DatiBeniServizi.DettaglioLinee[&3].Descrizione"
}
}
}
}
}
}
}
}
,
{
"operation": "cardinality",
"spec": {
"FatturaElettronicaBody": {
"DatiBeniServizi": {
"DettaglioLinee": {
"*": {
"NumeroLinea": "ONE",
"Descrizione": "ONE",
"Quantita": "ONE",
"PrezzoUnitario": "ONE",
"ScontoMaggiorazione": "ONE",
"AltriDatiGestionali": "ONE",
"PrezzoTotale": "ONE",
"AliquotaIVA": "ONE"
}
}
}
}
}
}
] If that helps please accept solution. Thanks
... View more
11-09-2023
05:27 AM
Are you sure you are getting the correct input to the JoltTransformJSON processor? If you are able to test the spec using GenerateFlowFile and its working then something might be happening upstream that might be causing the input to change or dropped. I would check the EvaluateJsonPath Destination property and make sure its not set flowfile-content, rather it should be set to flowfile-attribute. If you think everything is set correctly, can you take screenshot of all processors configurations in your reply? Thanks
... View more
11-08-2023
01:14 PM
Hi @CE , Where exactly are you getting the null? are you getting it after the Jolt processor in the success relationship queue? I ran the same spec against simple GenerateFlowFile with the input you provided set in the CustomText property then use the Jolt processor with the provided spec and I got the expected output. Can you try that and see if it works? Can you also provide processor configuration screenshot ? Thanks
... View more
11-02-2023
11:27 AM
Can you send me the spec as it seems to be different from what I provided? Also if you can simplify the input\spec and keep it isolated to the problem that will save me sometime.
... View more