Member since
07-29-2020
574
Posts
323
Kudos Received
176
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 2486 | 12-20-2024 05:49 AM | |
| 2870 | 12-19-2024 08:33 PM | |
| 2532 | 12-19-2024 06:48 AM | |
| 1706 | 12-17-2024 12:56 PM | |
| 2374 | 12-16-2024 04:38 AM |
12-11-2023
07:35 AM
Hi @Anderosn , Have you tried using ExecuteSQLRecord providing JsonRecordSetWriter as RecordWriter ? This will give you the result in json instead of Avro to begin with.
... View more
12-08-2023
11:02 AM
I know this sounds like batching the problem but after the UpdateRecord can you do JoltTransformRecord to transform "" to null for the target field? here is an example of spec that can do such thing: https://github.com/bazaarvoice/jolt/issues/667
... View more
12-08-2023
09:48 AM
Hi @cotopaul , I'm not sure if this will help but the reason you are getting the error: java.lang.NumberFormatException: For input string: "2012-05-21 23:59:35" is because the expression language you provided will generate formatted string date but the avro schema expects long, so is it possible to change EL where you add toNumber() at the end of toDate conversion, or change the data type in the avro schema to string if that works. Another thing to avoid all this confusion of having to worry about timezone and null values , is it possible to reformat the date fields and convert to long before you pull out of Oracle. Hope that helps in any way.
... View more
12-06-2023
12:31 PM
1 Kudo
Hi @Colombia , First of all you are right , the documentation seems to be lacking around this call. When I check the Nifi Rest API documentation under (Nifi-Api ) here is what I find for the upload api : For some reason the Name of all the parameters are not shown ! Not sure if they were missed by mistake or I'm missing something here @MattWho , @steven-matison might know. After trial and error using Postman I was able to get it working and here is the curl command as provided by postman: curl --location 'https://{url}:9443/nifi-api/process-groups/17baf155-018c-1000-7bde-586d185d1d0d/process-groups/upload' \
--header 'Content-Type: multipart/form-data' \
--header 'Authorization: Bearer ...' \
--form 'positionX="11.0"' \
--form 'clientId="4036074c-018c-1000-3e06-aaaaaaaaaaaa"' \
--form 'disconnectNode="true"' \
--form 'groupName="testapi"' \
--form 'positionY="557.0"' \
--form 'file=@"/C:/nifi-2.0.0-M1-bin/nifi-2.0.0-M1/test123.json"' If that helps please accept solution. Thanks
... View more
12-06-2023
07:03 AM
Hi @Fayza , You need to setup the Content-Type as required by the API. There should be "Request Content-Type" property where you can set the value. Also any custom header values can be added as Dynamic Property. The invokehttp processor should be very flexible to accommodate the different API requirements and request types.
... View more
12-05-2023
03:49 AM
@scoutjohn, I can see the full problem now. I wish I have considered it in my first response because it will change things a little bit. Since you have mentioned that you could have other arrays in each level, the current spec basically wont work unless you create different specs for different scenarios which is not a good idea. This can be resolved in two shift transformations: 1- Bring those array objects from different levels into "serviceCharacteristic" array. 2- Apply the name, value assignment based on each object array index in "serviceCharacteristic" array. In this case no need to figure out how many arrays objects you might have in the each level, nor you have to assign hard coded index. Here is the final spec: [
// First transformation, group array objects under service
// and serviceOrderItem into one array "serviceCharacteristic"
{
"operation": "shift",
"spec": {
"*": "&",
"serviceOrderItem": {
"*": {
"id": "serviceOrderItem.[&1].&",
"action": "serviceOrderItem.[&1].&",
"service": {
"id": "serviceOrderItem.[&2].service.&",
"state": "serviceOrderItem.[&2].service.&",
//group any array objects at the supportingService level into serviceCharacteristic
"*": "serviceOrderItem.[&2].service.serviceCharacteristic[].&"
},
//group any array objects at the modifyPath level into serviceCharacteristic
"*": "serviceOrderItem.[&1].service.serviceCharacteristic[].&"
}
}
}
},
// 2ed transformation to assign name, value for each
// serviceCharacteristic array object
{
"operation": "shift",
"spec": {
"*": "&",
"serviceOrderItem": {
"*": {
"*": "serviceOrderItem.[&1].&",
"service": {
"*": "serviceOrderItem.[&2].service.&",
"serviceCharacteristic": {
"*": {
"*": {
"$": "serviceOrderItem.[&5].service.serviceCharacteristic[&2].name",
"@": "serviceOrderItem.[&5].service.serviceCharacteristic[&2].value"
}
}
}
}
}
}
}
}
] Note: Not sure why you need the "recursivelySquashNulls" transformation. I did not see any use case for here but you can add it if it solves other issues. Hope that helps.
... View more
12-03-2023
04:54 PM
@ariel12, If I understood correctly you need to see if the record exist in the DB based on certain unique value\s before inserting it to prevent duplicates. If that is the case, then you can utilize processor called LookupRecord that can lookup record from the different sources including DB and then route the result to match or unmatched where you can decide the proper action. To see an example on how you can set up the lookupRecord and the lookupRecordService you can watch the following video: https://www.youtube.com/watch?v=17KG3kKQkOs More info about RecordLookup: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apache.nifi.processors.standard.LookupRecord/index.html I Hope that will help. Thanks
... View more
12-03-2023
12:02 PM
2 Kudos
Hi @ariel12 , This is happening because the PutSQL expects the timestamp (if no format is provided) to be in the format of "yyyy-MM-dd HH:mm:ss.SSS" . That is the format passed to the Java timestamp constructor by default when trying to parse the date string. Since the date in your input json is missing the milliseconds ".SSS" and no format is provided the process will fail with the "Unparseable date" error. To fix this, you can use different processors as follows: 1- UpdateAttribute You need to find what sql.args.N.value attribute generated by the ConvertJsonToSQL processor that corresponds to the value of the alertDate, then you can update this value using to UpdateAttribute to the proper expected format by PUTSql. For example, if you find that the alertDate value is stored in sql.args.1.value then , then you can update it using Expression Language to the following: ${sql.args.1.value:toDate("yyyy-MM-dd HH:mm:ss"):format("yyyy-MM-dd HH:mm:ss.SSS")} And the UpdateAttribute will look like: Alternatively: Instead of updating the value, you can update the type for the sql.args.N.type to set it to be as varchar ( value of 12) instead of timestamp (value of 93), this will fix the problem too. Also you can assign format using sql.args.N.format -not provided by default - where you set the format value to "yyyy-MM-dd HH:mm:ss" , the PutSQL will respect the provided format and use it. 2- UpdateRecord: If you dont like purging default data , or figuring out which sql args the timestamp is getting stored into specially when you have large data set and many timestamp fields where it becomes challenging to do or figure out index based on changing order, Instead you want to do it on the source data before it gets converted to SQL, then you can use UpdateRecord where you have to provide Json reader\writer and add dynamic property that sets the path to the target field and set it accordingly. In your case the UpdateRecod will look like this: The Value field has the following: ${field.value:toDate("yyyy-MM-dd HH:mm:ss"):format("yyyy-MM-dd HH:mm:ss.SSS")} Note: the field.value in the expression is provided by this processor to reference the original value of the given path. please refer to UpdateRecod documentation under Additional Details. 3- Jolt Transformation: Since you asked about using Jolt, you can use it as well to set the data in the source. Since Jolt doesnt have datetime functions you just use string concat to add default value for the .SSS like .000. here is the spec : [
{
"operation": "modify-overwrite-beta",
"spec": {
"alertDate": "=concat(@(1,alertDate),'.000')"
}
}
] If that helps please accept solution. Thanks
... View more
12-02-2023
06:49 AM
Hi @Alevc , It seems the problem with the syntax "&[&3]" that is used inside the RUBRICS elements transformation. Notice how its always producing an array of three elements for each RUBRICS element index, this happen to matche the number of objects in the parent array at level &3. Since all parent object have at least 1 RUBRIC object at index 0, they will be grouped together under 0, however only the 3rd Parent object has more than one elements so when it try to group under index 1,2,3...10 only one object is found so its adds two null values to the array to keep total at 3. Im not sure why its doing this, I thought I understand jolt but sometimes I find that Im still scratching the surface :). The good news is I have seen this pattern before and I know how to resolve. The challenge here is how to combine both child and parent attributes together to form a unique object against each child. You can do this by creating a unique key to help you group each parent & child attributes under one object assigned to that key, that will be the first transformation. The second transformation is to dump all parent & child object into the final array leaving the assigned unique key behind. The unique key in this case can be combination of parent index and child index. I used underscore (_) to separate. [
// First transformation to group each parent\child attributes
// under a unique key using combination of parent index and
// chilld index , for example the first child under the first
// parent will have 0_0, first child of second parent will be 1_0,
// 2ed child of the 3rd parent will be 2_1 and so on
{
"operation": "shift",
"spec": {
"*": {
"RUBRICS": {
"*": {
"@name": "[#].&1_&3.RUBRIC",
"@business_unit": "[#].&1_&3.COMPENSATION_GROUP",
"@classification": "[#].&1_&3.CLASSIFICATION",
"@code": "[#].&1_&3.CODE",
"@amount": "[#].&1_&3.AMOUNT",
"@type": "[#].&1_&3.TYPE",
"@aggregation_key": "[#].&1_&3.AGGREGATION_KEY",
"@(2,ENTITY_TYPE)": "[#].&1_&3.ENTITY_TYPE",
"@(2,ENTITY_ID)": "[#].&1_&3.ENTITY_ID",
"@(2,ENTITY_DOCUMENT)": "[#].&1_&3.ENTITY_DOCUMENT",
"@(2,CALCULATION_DATE)": "[#].&1_&3.CALCULATION_DATE"
}
}
}
}
},
// Second trabsforamtion is to dump all combined parent &
// child objects into the final array without the keys.
{
"operation": "shift",
"spec": {
"*": {
"*": "[]"
}
}
}
] If you find this helpful please accept solution. Thanks
... View more
12-01-2023
12:31 PM
2 Kudos
@ChuckE, Regarding the state being persistent, actually you can clear the state by right click on the processor then select "View state" and then click on the "Clear state" link: This should reset the state to the initial value. Regarding your second question of being able to initialize more than one variable, you can define as many stateful variables as you need in one processor, however they all can have one initial value. If you are looking to define different initial values for different stateful variables then you have to create a different UpdateAttribute processors for the variables that have common initial value. Another option - I never tried - if you want to utilize one processor then you can use Advanced option to define rules that will set different initial value based on a common conidition, however you have to be careful how you set the first value on the first flowfile. For example, if you have two stateful variables Attr1 & Attr2 where the first flowfile has Attr1 = 0 & Attr2 = 1 then increment both afterward, you can define the UpdateAttribute as follows : Notice how I set the Initial Value to Empty String since its required to set some value when using stateful. Under Advanced I defined two rules: one to initialize Attr1 to 0 and another to initialize Attr2 to 1 when each is set to empty string : Rule for Attr1: Rule for Attr2: Make sure to set the FlowFile Policy top right to "use original" , otherwise it will duplicate flowfiles for each matched rules if "clone" is used. When setting the same attribute in Advanced mode and in basic mode, the Advanced will take precedence if rules are met, so the first time the increment wont run. The first flowfile will have the initial values since the rules are satisfied. The second flowfile the rules are not satisfied therefore the increment will happen: Depending on what you want to see on the first flowfile then you can adjust the initial values accordingly. Not sure if this will work for all scenarios but you can try, otherwise use different processor as I stated above. Also if anyone thinks that goes against best practices or might cause problem please advise. If you find this helpful please accept solution. Thanks
... View more