Member since
07-29-2020
198
Posts
45
Kudos Received
49
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
103 | 01-31-2023 10:16 AM | |
73 | 01-30-2023 02:34 PM | |
150 | 01-25-2023 12:42 PM | |
85 | 01-18-2023 12:26 PM | |
245 | 01-17-2023 10:41 AM |
08-31-2022
02:17 PM
Hi , You can use JoltJsonTransform Processor for that. So lets assume we have the following input: [
{
"_id": "5d55246b9ac36012c14b5e7c",
"gtin": "4009602235038",
"articleNumber": "59218-376",
"styleGroupId": "59218",
"colorGroupId": "376",
"brand": {
"_id": "57f75950a56ab89d1973ee36"
}
}
] Add JoltTranformJson processor and set the Jolt Specification property to the following, notice this property allows you to use Expression Language therefore you can reference attributes in the flowfile: [
{
"operation": "modify-default-beta", //for modify-overwrite-beta
"spec": {
"*": {
"target.sku": "${target.sku}"
}
}
}
] This will produce the following Json: [ {
"_id" : "5d55246b9ac36012c14b5e7c",
"gtin" : "4009602235038",
"articleNumber" : "59218-376",
"styleGroupId" : "59218",
"colorGroupId" : "376",
"brand" : {
"_id" : "57f75950a56ab89d1973ee36"
},
"target.sku" : "someValue"
} ] If you find this helpful please accept solution. Thanks
... View more
08-27-2022
04:28 PM
1 Kudo
Hi, You can use the ReplaceText Processor as follows: If you find this helpful please accept solution. Thanks
... View more
08-26-2022
09:00 AM
Hi , Is not every record you pull through the GenerateTableFetch will have the max value for the max column?
... View more
08-24-2022
12:18 PM
Hi, Thanks for providing the diagram. Not sure if you can have both InvokeHttp in one group where one comes after the other instead of using two groups and input\output ports, but if you can then you can take advantage of the ForkEnrichment\JoinEnrichment processors (available in 1.16 and higher) using the following pattern: If both invokehttp output have the same order for the records then you can use the Wrapper or Insert Enrichment strategy, if the records are not in the same order you can use SQL strategy. For more information check the JoinEnrichment user manual : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.17.0/org.apache.nifi.processors.standard.JoinEnrichment/additionalDetails.html Hope that helps. If it does please accept solution.
... View more
08-23-2022
08:07 AM
Hi, The full picture is not clear on how your flow works, for example where do you get the json that goes through the jolt processor and input port from? Is there a correlation between getting the json from the different sources where one json is generated before the other or the two json sources are independent of each other? If they are independent of each other then how do you know when to merge , for example what if the json from the input port did not come? Can you explain more on how your flow works and if there is a dependency where one json input drives the other or not?
... View more
08-19-2022
07:20 AM
1 Kudo
Hi, Couple of options I can think of but that depends on your flow and what are you trying to do with the data: 1- Get all the parameters you need through the whole flow from the xml after you fetch the file. Those parameters will be there on every generated flow file later on and at any step you can retrieve them and do whatever needed. 2- If the data you are trying to get downstream from the xml is unrelated you can always have multiple branches after the FetchFile and created different flow for each branch that can run asynchronously. 3- IF you are looking to do different API calls based on the same XML, the InvokeHttp has an Original relationship besides the Response relationship in case each response independent of the others. 4- If you are using Nifi 1.16 and higher and you are looking for data enrichment for the base XML you can look into the processors : ForkEnrichment, JoinEnrichment.
... View more
08-18-2022
01:21 PM
Hi, In the ExecuteSQL or ExecuteSQLRecord you have Pre Query, Select Query and Post Query properties. In the Pre Query can you declare variables to store the output parameters, run the stored proc passing those parameter and then insert them into a temp table which then you can use the Select Query property to read the values from the temp table and that will translate to flowfiles. If you are using the ExecuteSQLRecord you can define in which format you want the flowfile in by defining the record writer property. Hope that helps.
... View more
08-17-2022
07:35 AM
Hi, Try the following Expression: ${since:toDate('yyyy-MM-dd'):toNumber():plus(86400000):format('yyyy-MM-dd')} If you find this helpful please accept solution. Thanks
... View more
08-16-2022
01:34 PM
1 Kudo
Hi, not sure if you are repeating the attribute name in the processor dynamic property values! Make sure the different header attributes are set\add as follows:
... View more
08-10-2022
12:55 PM
2 Kudos
Hi, you can use ReplaceText by setting the Evaluation Mode property to "Line-by-Line", then set the "Line-by-Line Evalaution Mode" to "Last Line". In the Replacement Value set it to Empty String Set and the Search Value leave as "(?s)(^.*$)" as seen in the screenshot below: If you find this helpful please accept solution. Thanks
... View more
08-08-2022
08:58 AM
The lookupRecrod processor allows you to specify multiple lookup columns in the DatabaseRecordLookupService incase the id is not enough. I had a similar sitution and I usually defer this kind of check to the DB, so my workflow will call stored proc - with the record parameters - that will check if the record exist or not and based on that decide wither to update or insert. This way Im making only one connection to the DB.
... View more
08-05-2022
06:28 AM
Hi , What is the flowfile content format? Is there an Id you can check using the LookupRecord processor to see if the record already exists or not?
... View more
08-03-2022
07:24 PM
Can you explain how did you resolve it with XMLRecrodSetWriter? Thanks
... View more
07-29-2022
10:42 AM
2 Kudos
Hi, Im not sure this can be fixed by changing some configuration on what you have wither on the processor or the service level. Since the root element is not considered in the conversion from xml to Json, as workaround you can surround your xml with arbitrary root element with the ReplaceText Processor before the ConvertRecrod Processor: Search Value: (?s)(^.*$) Replacement Value: <root>$1</root> Evaluation Mode: Entire Text This will output the following xml: <root>n<a>
<b>45</b>
</a>
n</root> And After ConvertRecord Prceossor : {"a": {"b":45}} Hope that helps. If it does, please Accept Solution. Thanks
... View more
07-28-2022
12:10 PM
1 Kudo
Hi , If you want to extract the value (3032) of the name "roll no" into flowfile attribute called "roll no" you can accomplish this with two processors as follows: 1- EvaluateJsonPath: roll no = $.[?(@.name=='roll no')].value Make sure the Return Type is set to Json since the exrepssion above will return json ["3032"]. 2- Update Attribute : which will extract the actual value from result json above with the following attribute: roll no = ${'roll no':jsonPath('$[0]')} Also you can refer to : https://community.cloudera.com/t5/Support-Questions/Unable-to-return-a-scalar-value-for-the-expression-from-NIFI/td-p/213677 If that helps, please Accept Solution. Thanks
... View more
07-28-2022
11:27 AM
1 Kudo
Hi the only way I can think of to avoid duplicating the content of the flowfile and not having to worry about the Maximum Buffer Size in the ExtractText processor is to use the ExecuteScript processor to update the flowfile content into json format. Here is an example of Script using "ECMAScript" in the ExecuteScript Processor "Script Body" property that can do that: var StreamCallback = Java.type("org.apache.nifi.processor.io.StreamCallback");
var IOUtils = Java.type("org.apache.commons.io.IOUtils");
var StandardCharsets = Java.type("java.nio.charset.StandardCharsets");
var flowFile = session.get();
if(flowFile != null) {
// Create a new StreamCallback, passing in a function to define the interface method
flowFile = session.write(flowFile,
new StreamCallback(function(inputStream, outputStream) {
var text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
var obj = new Object();
obj.SomeKey = text;
var jsonString= JSON.stringify(obj);
outputStream.write(jsonString.getBytes(StandardCharsets.UTF_8))
}));
}
session.transfer(flowFile, REL_SUCCESS) Hope that helps. If it does please Accept Solution. Thanks
... View more
07-20-2022
01:48 PM
1 Kudo
Hi, Are you looking to copy the value from the parent tag name attribute "<OriginalQTY name=" 5.000">" to the value of the child tag "<OriginalORDEREDPURCHASEQUANTITY>" so that its matching all the time? If so you can try the TransformXML processor with the following script: <?xml version="1.0" encoding="UTF-8" ?>
<xsl:transform xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="2.0">
<xsl:template match="@*|node()">
<xsl:copy>
<xsl:apply-templates select="@*|node()"/>
</xsl:copy>
</xsl:template>
<xsl:template match="Document/PURCHPURCHASEORDERHEADERV2ENTITY/OriginalQTY/PURCHPURCHASEORDERLINEV2ENTITY/OriginalORDEREDPURCHASEQUANTITY">
<xsl:copy>
<xsl:value-of select="../../@name"/>
</xsl:copy>
</xsl:template>
</xsl:transform> If you find that helpful please accept solution. Thanks
... View more
07-15-2022
06:05 AM
In this case I would suggest ReplaceText processor with the Search Value set to: (MapRecord|\[|\]) And Replacement Value is set to Empty String.This will produce the result: aaaa|"{country=CHINA, city=null, street=null, latitude=null, postalCode=null, geocodeAccuracy=null, state=null, longitude=null}"
... View more
07-15-2022
05:39 AM
Not sure I understand the question, which processor/property are you taking about? Basically if you want to access a flowfile attribute value in expression language you can use ${A} in case you have an attribute called A
... View more
07-15-2022
05:22 AM
Hi, You can use FlattenJson processor to flatten your json before converting to CSV. The output Json after flattening will be something like this: { "id": "aaaa", "billingaddress.city": null, "billingaddress.country": "CHINA", "billingaddress.geocodeAccuracy": null, "billingaddress.latitude": null, "billingaddress.longitude": null, "billingaddress.postalCode": null, "billingaddress.state": null, "billingaddress.street": null } Another Option is to use JoltTransformJSON to flatten your json if you don't like the "billingaddress." prefix. You can use the following Jolt Spec: [ { "operation": "shift", "spec": { "id": "id", "billingaddress": { "*": "&" } } } ] Which will produce the following json: { "id" : "aaaa", "city" : null, "country" : "CHINA", "geocodeAccuracy" : null, "latitude" : null, "longitude" : null, "postalCode" : null, "state" : null, "street" : null } If you think this solves the issue please accept solution. Thanks
... View more
07-13-2022
09:18 AM
Hi, If I understood you correctly you want to log some information when the QueryRecord is executed, in this case you can use the Original relationship coming out of the QueryRecord where you can log the information like filename,timestamp...etc.
... View more
07-12-2022
06:29 AM
1 Kudo
Hi, I'm not sure how are you splitting the CSV using UpdateAttribute. You probably need to use something like QueryRecord processor where you can query the records in SQL like language, there you create two routing properties: one for ValuesWithNull and another for ValuesWithoutNull, then you redirect each to Save to the DB. Also why are you using the ConvertRecord Processor ? Are you converting the csv to another format like Json?
... View more
07-12-2022
06:19 AM
Marfill, If backpressure is applied when the total number of flowfiles in a given queue has reached (#nodes * the limit per node) for example if you have a cluster of 3 nodes and the threshold is set to 10,000 then the backpressure will be applied when total # of flow files = 30,000 and so on. Regarding the Control Rate I believe its done per node statistics, for example if you have a control rate that allows 1 flow file per hour and the control rate processor is set part of load balancing on 3 nodes cluster, let say you receive total of 3 files for the first time one on each node then the 3 will be get processed immediately.
... View more
07-11-2022
11:21 AM
You cant use Expression Language when setting values in the JsonPathReader processor. You need to get the value first as is using the json path and then use an updateAttribute processor on it using the expression above. In updateAttribute you can use the same variable name or different one.
... View more
07-11-2022
10:32 AM
Hi, Can you post the Expression Language you used? I'm seeing in the error message that its complaining about the "literal" function doesnt exist. Did you use it with the variable name that contains the original date value? If so then remove the literal function and just keep the variable since literal works with constants. So the expression should be something like this: ${date_variable:replaceAll('(IST)\s',''):toDate('EEE MMM dd HH:mm:ss yyyy'):format('yyyy-MM-dd HH:mm:ss')}
... View more
07-09-2022
12:45 PM
1 Kudo
Hi, If your configurations are similar to mine then I would suggest that you test it on version 1.16.0 to see if its a bug with version 1.16.2 which in this case needs to be reported. Another option for you is to write custom code using ExecuteScript processor that will generate a different flowfile for each json record since the ExecuteProcess wont have this option and would write everything in the output steam into one flowfile. you can refer to the following post to help you generate multiple flowfiles: https://community.cloudera.com/t5/Support-Questions/Split-one-Nifi-flow-file-into-Multiple-flow-file-based-on/m-p/203387 Another option you can try incase there is a problem with the SplitContent processor is to create every json record in a different line and then try to split them using SplitText processor and set the property "Split Line Count" to 1
... View more
07-09-2022
06:34 AM
Hi, I tried the same scenario using the split content and the code you provided above and it worked for me where Im getting two json records . Im using version 1.16.0 ExecuteProcess Configuration: SplitContent Configurations:
... View more
07-08-2022
12:51 PM
Hi, I ran into this issue before when dealing with ISO timeformat and it would not allow me to write to the DB. In this case you have to use an updateAttribute to convert this format to the format that DB can accept (yyyy-MM-dd HH:mm:ss), this worked for me: ${literal('Wed Oct 13 15:58:58 IST 2021'):replaceAll('(IST)\s',''):toDate('EEE MMM dd HH:mm:ss yyyy'):format('yyyy-MM-dd HH:mm:ss')} Hope it would help.
... View more
07-08-2022
09:01 AM
1 Kudo
Hi, Another option which doesnt include writing custom code is to use AttributesToCSV and MergeConent processors as follows: 1- AttributesToCSV: This will convert an attribute - filename in your case - to CSV format which will be result in single value flowfile. make sure to set the Destination property to "flowfile-content". 2- MergeContent: This will merge the content of flowfiles from above into single file depending on your merge strategy and other properties like Max Entries Per Bin. Also you can specify Merge Delimiter wither you want new line or comma. 3- PutEmail: This will put the merge content from above into an email and send it. I think this is easier and more straight forward solution than the suggestion above. Hope that helps.
... View more
07-08-2022
08:06 AM
1 Kudo
Hi, I'm not aware of any out of the box processor that can help you with that. A suggestion would be to write custom code in an ExecuteScript Processor that gets the filename attribute and stores in a file with the required format in some staging directory, In this processor you can also decide how many flowfiles you want to process per file and once that file reaches the limit (could be by file size or number of entries) you then move the file to final directory where and GetFile processor is reading from and direct the content of the read file (filenames) to PUTEmail. New file will be created in the staging area to address any new entries after moving the older file to the final directory.
... View more