Member since
04-07-2022
30
Posts
7
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1922 | 06-12-2024 08:32 PM |
12-11-2023
10:17 PM
we have nifi 1.23.2 running in k8s on cluster mode, we have 3 pods. we have been getting frequent flowSynchroniuzationException from the logs. This has been observed in previous versions also cluster due to: org.apache.nifi.controller.serialization.FlowSynchronizationException: Failed to connect node to cluster because local flow controller partially updated. Administrator should disconnect node and review flow for corruption."}
org.apache.nifi.controller.serialization.FlowSynchronizationException: Failed to connect node to cluster because local flow controller partially updated. Administrator should disconnect node and review flow for corruption.
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:1059)
at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:520)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:896)
at org.apache.nifi.NiFi.<init>(NiFi.java:172)
at org.apache.nifi.NiFi.<init>(NiFi.java:83)
at org.apache.nifi.NiFi.main(NiFi.java:332)
Caused by: org.apache.nifi.controller.serialization.FlowSynchronizationException: java.lang.IllegalStateException: A Parameter Context tries to inherit from another Parameter Context [5b8d6cf7-018c-1000-0000-00001849f526] that does not exist
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.synchronizeFlow(VersionedFlowSynchronizer.java:448)
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.sync(VersionedFlowSynchronizer.java:206)
at org.apache.nifi.controller.serialization.StandardFlowSynchronizer.sync(StandardFlowSynchronizer.java:42)
at org.apache.nifi.controller.FlowController.synchronize(FlowController.java:1530)
at org.apache.nifi.persistence.StandardFlowConfigurationDAO.load(StandardFlowConfigurationDAO.java:104)
at org.apache.nifi.controller.StandardFlowService.loadFromBytes(StandardFlowService.java:817)
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:1028)
... 5 common frames omitted
Caused by: java.lang.IllegalStateException: A Parameter Context tries to inherit from another Parameter Context [5b8d6cf7-018c-1000-0000-00001849f526] that does not exist
at org.apache.nifi.controller.flow.AbstractFlowManager.withParameterContextResolution(AbstractFlowManager.java:559)
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.inheritParameterContexts(VersionedFlowSynchronizer.java:687)
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.synchronizeFlow(VersionedFlowSynchronizer.java:387)
... 11 common frames omitted pods except for 1 are all going to crash loop, we're unable to execute into the pod and delete the flowfiles. What could be the reason for this ? Please advise how to find the root cause and rectify this.
... View more
Labels:
- Labels:
-
Apache NiFi
12-05-2023
04:41 AM
@SAMSAL , Thank you. this works.
... View more
12-04-2023
09:35 PM
@SAMSAL , Sorry for the delay, Thank you for your solution. {
"relatedParty": [
{
"id": "B2BSmallandMediumBusiness",
"role": "InstanceConsumerGroup",
"@referredType": "InstanceConsumerGroup"
}
],
"serviceOrderItem": [
{
"id": "0965f63b-db26-4397-b222-190a891838f0",
"action": "modify",
"service": {
"id": "cust",
"state": "active",
"supportingService": [
{
"id": "e26205f4-e144-4ded-9e64-82ede0b26e99",
"href": "serviceInventory/v4/service"
}
]
},
"modifyPath": [
{
"op": "add",
"path": "$.supportingService"
}
]
}
]
} for the incoming request modifyPath will come outside the service object , supportingService array will come inside service object. the output should have serviceCharacteristic array inside service object with name and value ( value being the whole array moved from modifyPath in this in case), "serviceCharacteristic" : [ {
"name" : "modifyPath",
"value" : "[]"
}, { "name" : "supportingService", "value" : "[]" } ] like the attached screenshot The input could change , can also have other arrays that need to be moved into the serviceCharacteristic array, was trying to understand if it is possible to avoid the indexing on serviceCharacteristic in jolt transformation.
... View more
11-30-2023
03:50 AM
@joseomjr, Infact i want it in an array, as objects inside serviceCharacteristics array I was wondering if i could avoid the 1 inside serviceCharacteristics[1] rather put a more generic indexing
... View more
11-21-2023
08:45 AM
@SAMSAL thank you, that solved it
... View more
11-21-2023
05:55 AM
I am trying to transform a json using jolt where i am moving the array inside serviceCharacteristic to outside of service {}, eventually i plan to move to respective positions {
"serviceOrderItem": [
{
"action": "modify",
"id": "0965f63b-db26-4397-b222-190a891838f0",
"service": {
"href": "serviceInventory/v4/service/adaptiveNetworkSite/1b9a3fae-f50b-4310-887d-60988f8d5645",
"id": "adaptiveNetworkSite",
"serviceCharacteristic": [
{
"name": "modifyPath",
"value": [
{
"op": "add",
"path": "$.supportingService",
"value": {}
},
{
"op": "sub",
"path": "$.supportingService",
"value": {}
}
]
}
]
}
},
{
"action": "create",
"id": "0965f63b-db26-4397-b222-190a891838f0",
"service": {
"href": "serviceInventory/v4/service/adaptiveNetworkSite/1b9a3fae-f50b-4310-887d-60988f8d5645",
"id": "adaptiveNetworkSite",
"serviceCharacteristic": [
{
"name": "modifyPath",
"value": [
{
"op": "abcd",
"path": "$.supportingService",
"value": {
"state": "active"
}
},
{
"op": "efgh",
"path": "$.supportingService",
"value": {
"state": "active"
}
}
]
}
]
}
}
]
} desired output is {
"serviceOrderItem": [
{
"action": "modify",
"id": "0965f63b-db26-4397-b222-190a891838f0",
"service": {
"href": "serviceInventory/v4/service/adaptiveNetworkSite/1b9a3fae-f50b-4310-887d-60988f8d5645",
"id": "adaptiveNetworkSite"
}
"modifyPath": [
{
"op": "add",
"path": "$.supportingService",
"value": {}
},
{
"op": "sub",
"path": "$.supportingService",
"value": {}
}
]
},
{
"action": "create",
"id": "0965f63b-db26-4397-b222-190a891838f0",
"service": {
"href": "serviceInventory/v4/service/adaptiveNetworkSite/1b9a3fae-f50b-4310-887d-60988f8d5645",
"id": "adaptiveNetworkSite"
}
"modifyPath": [
{
"op": "abcd",
"path": "$.supportingService",
"value": {
"state": "active"
}
},
{
"op": "efgh",
"path": "$.supportingService",
"value": {
"state": "active"
}
}
]
}
}
]
} my Jolt is working only when there is single array in modifyPath [{
"operation": "shift",
"spec": {
"*": "&",
"serviceOrderItem": {
"*": {
"*": "serviceOrderItem[&1].&",
"service": {
"*": "serviceOrderItem[&2].service.&",
"serviceCharacteristic": {
"*": {
"value": "@(1,name)"
}
}
}
}
}
}
}, {
"operation": "shift",
"spec": {
"*": "&",
"modifyPath": {
"@(1,modifyPath)": "serviceOrderItem[0].modifyPath"
},
"supportingService": {
"@(1,supportingService)": "serviceOrderItem[0].service.supportingService"
}
}
}] when multiple array is given it shuffles the array, "serviceOrderItem" : [ {
"action" : "modify",
"id" : "0965f63b-db26-4397-b222-190a891838f0",
"modifyPath" : [ {
"op" : "add",
"path" : "$.supportingService",
"value" : { }
}, {
"op" : "sub",
"path" : "$.supportingService",
"value" : { }
}, [ {
"op" : "abcd",
"path" : "$.supportingService",
"value" : {
"state" : "active"
}
}, {
"op" : "efgh",
"path" : "$.supportingService",
"value" : {
"state" : "active"
}
} ] ],
"service" : {
"href" : "serviceInventory/v4/service/adaptiveNetworkSite/1b9a3fae-f50b-4310-887d-60988f8d5645",
"id" : "adaptiveNetworkSite"
}
}, {
"action" : "create",
"id" : "0965f63b-db26-4397-b222-190a891838f0",
"service" : {
"href" : "serviceInventory/v4/service/adaptiveNetworkSite/1b9a3fae-f50b-4310-887d-60988f8d5645",
"id" : "adaptiveNetworkSite"
}
} ]
} please advise , thank you for you time
... View more
Labels:
- Labels:
-
Apache NiFi
11-20-2023
09:52 PM
Changed the jolt to [
{
"operation": "shift",
"spec": {
"relatedParty": "relatedParty",
"serviceOrderItem": {
"*": {
"id": "serviceOrderItem[&1].id",
"action": "serviceOrderItem[&1].action",
"service": {
"*": "serviceOrderItem[&2].service.&",
"supportingService": {
"#supportingService": "serviceOrderItem[&3].service.serviceCharacteristic[1].name",
"*": "serviceOrderItem[&3].service.serviceCharacteristic[1].value"
}
},
"modifyPath": {
"#modifyPath": "serviceOrderItem[&2].service.serviceCharacteristic[1].name",
"@(1,modifyPath)": "serviceOrderItem[&2].service.serviceCharacteristic[1].value"
}
}
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
"*": "=recursivelySquashNulls"
}
}] Is there a way to remove the indexing from serviceCharacteristic[1].name / serviceCharacteristic[0].name ?
... View more
11-20-2023
07:43 PM
I am new to Jolt, I am trying to transform JSON to a particular format using nifi, where i am moving some objects into another object, I think i am close This is my JSON {
"relatedParty": [
{
"id": "B2BSmallandMedium"
}
],
"serviceOrderItem": [
{
"id": "e26205f4-e144-4ded-9e64-82ede0b26f4",
"action": "add",
"service": {
"name": "cust",
"state": "active",
"supportingService": [
{
"id": "e26205f4-e144-4ded-9e64-82ede0b26e99",
"href": "serviceInventory/v4/service"
},
{
"id": "7313c885-a075-4188-a6bc-c3fbfb4c7422",
"href": "activationAndConfiguration/v4/service"
}
]
}
}
]
} I am new to Jolt, I am trying to transform JSON to a particular format, where i am moving some objects into another object, I think i am close. This is my JSON {
"relatedParty": [
{
"id": "B2BSmallandMedium"
}
],
"serviceOrderItem": [
{
"id": "e26205f4-e144-4ded-9e64-82ede0b26f4",
"action": "add",
"service": {
"name": "cust",
"state": "active",
"supportingService": [
{
"id": "e26205f4-e144-4ded-9e64-82ede0b26e99",
"href": "serviceInventory/v4/service"
},
{
"id": "7313c885-a075-4188-a6bc-c3fbfb4c7422",
"href": "activationAndConfiguration/v4/service"
}
]
}
}
]
}
I am using the JOLT [
{
"operation": "shift",
"spec": {
"relatedParty": "relatedParty",
"serviceOrderItem": {
"*": {
"id": "serviceOrderItem[&1].id",
"action": "serviceOrderItem[&1].action",
"service": {
"*": "serviceOrderItem[&2].service.&",
"#modifyPath": "serviceOrderItem[&2].service.serviceCharacteristic[0].name",
"@(1,modifyPath)": "serviceOrderItem[&2].service.serviceCharacteristic[0].value",
"#supportingService": "serviceOrderItem[&2].service.serviceCharacteristic[1].name",
"@(1,service.supportingService)": "serviceOrderItem[&2].service.serviceCharacteristic[1].value"
}
}
}
}
},
{
"operation": "modify-default-beta",
"spec": {
"serviceOrderItem": {
"*": {
"service": {
"serviceCharacteristic": {
"*": {
"value": []
}
}
}
}
}
}
},
{
"operation": "remove",
"spec": {
"serviceOrderItem": {
"*": {
"service": {
"supportingService": ""
}
}
}
}
}] this is now giving me "service" : {
"name" : "cust",
"serviceCharacteristic" : [ {
"name" : "modifyPath",
"value" : "[]"
}, {
"name" : "supportingService",
"value" : [ {
"href" : "serviceInventory/v4/service",
"id" : "e26205f4-e144-4ded-9e64-82ede0b26e99"
}, {
"href" : "activationAndConfiguration/v4/service",
"id" : "7313c885-a075-4188-a6bc-c3fbfb4c7422"
} ]
} ],
"state" : "active"
}
} ] This is working for me but, I would like to know if it is possible replace: "#modifyPath": "serviceOrderItem[&2].service.serviceCharacteristic[0].name" "#supportingService": "serviceOrderItem[&2].service.serviceCharacteristic[1].name" to update the Json only when the key is actually available. I do not want an empty array. Thank you for your time
... View more
Labels:
- Labels:
-
Apache NiFi
09-12-2023
07:03 AM
Turns out there is a workaround, there is an option to change the first processor to execute in single node, this will execute flowfiles from single node.
... View more
09-12-2023
04:24 AM
I have NiFi 1.23.2 running in cluster mode in K8s. As List SFTP does not take an incoming request have written a script that is called from ExecuteStreamCommand to count the number of files in a directory. I am using expect to make this happen. #!/bin/sh -e USER=$1 export PASSWORD="${SFTP_PASSWORD}" REMOTE_DIR=$3 HOST=$4 BATCH_FILE=$(mktemp) echo "spawn sftp $USER@$HOST" > $BATCH_FILE echo "expect \"*fingerprint*\"" >> $BATCH_FILE echo "send \"yes\\r\"" >> $BATCH_FILE echo "expect \"password:\"" >> $BATCH_FILE echo "send \"\$env(PASSWORD)\\r\"" >> $BATCH_FILE echo "expect \"sftp>\"" >> $BATCH_FILE echo "send \"ls -l $REMOTE_DIR \\r\"" >> $BATCH_FILE echo "expect \"sftp>\"" >> $BATCH_FILE echo "send \"exit\\r\"" >> $BATCH_FILE #expect $BATCH_FILE dirs=$(expect $BATCH_FILE| sed -n '/sftp>/, /sftp>/p' | grep -v '^sftp>'| awk '$0 ~ /^d/ {print $NF}') rm $BATCH_FILE total=0 SAVEIFS=$IFS IFS=$'\n' directories=($dirs) IFS=$SAVEIFS for value in "${directories[@]}"; do val=$(echo "$value" | tr -d '[:blank:]' | tr -d '[:space:]') if [[ "$val" == "Completed" || "$val" == "Failed" || "$val" == "Rejected" || "$val" == "Error" ]]; then dir=$REMOTE_DIR/$val BATCH_FILE=$(mktemp) echo "spawn sftp $USER@$HOST" > $BATCH_FILE echo "expect \"*fingerprint*\"" >> $BATCH_FILE echo "send \"yes\\r\"" >> $BATCH_FILE echo "expect \"password:\"" >> $BATCH_FILE echo "send \"\$env(PASSWORD)\\r\"" >> $BATCH_FILE echo "expect \"sftp>\"" >> $BATCH_FILE echo "send \"ls -l $dir\\r\"" >> $BATCH_FILE echo "expect \"sftp>\"" >> $BATCH_FILE echo "send \"exit\\r\"" >> $BATCH_FILE count=$(expect $BATCH_FILE| sed -n '/sftp>/, /sftp>/p' | grep -v '^sftp>'| wc -l) total=$((total + count)) rm $BATCH_FILE fi done echo "$total" This script is working fine when I run inside this manually with in the container, also is working fine when NiFi runs on standalone mode in 1.23.0 bash-4.4$ ./test_files.sh 4 test_files.sh has the same script but HOST, USER and REMOTE_DIR hardcoded. When I call the same script from NiFi processor I am getting execution.error send: spawn id exp3 not open while executing "send "yes\r"" (file "/tmp/tmp.eatmyc06B6" line 3) The fingerprint expects had to be added inside the container as it is being asked. when running outside NiFi it is not required. There are times when it works, in the list. one has an error , the other doesn't I have tried the header with both /bash and /sh. what could be the reason? Please advise.
... View more
Labels:
- Labels:
-
Apache NiFi
- « Previous
-
- 1
- 2
- Next »