Member since
04-07-2022
37
Posts
11
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
140 | 03-17-2025 01:46 AM | |
3629 | 06-12-2024 08:32 PM |
03-17-2025
01:46 AM
1 Kudo
Update : Solved it i was missing a '/' infront of the resources that i was providing it should have been "resource": "/data-transfer/output-ports/a2a202da-0195-1000-0000-000045d2086d", instead of "resource": "data-transfer/output-ports/a2a202da-0195-1000-0000-000045d2086d", comparing the policies created with UI and with API made me realize it
... View more
03-16-2025
10:51 PM
Update : i realized that I wasn't adding process-group id to the resources, so i added the process-group to the policy config json policy Group Config: {
"revision": {
"version": 0
},
"permissions": {
"canRead": true,
"canWrite": true
},
"component": {
"resource": "data-transfer/output-ports/a2a202da-0195-1000-0000-000045d2086d",
"action": "write",
"configurable": true,
"users": [],
"userGroups": [
{
"revision": {
"version": 0
},
"id": "3d872ce1-e08a-4794-85e5-66e5a1f2f4ac",
"permissions": {
"canRead": true,
"canWrite": true
},
"component": {
"id": "3d872ce1-e08a-4794-85e5-66e5a1f2f4ac",
"identity": "TenantID.john.nifi_superadmin",
"configurable": true
}
},
{
"revision": {
"version": 0
},
"id": "ebe9c88a-77d7-4070-bead-e24329b2e9c1",
"permissions": {
"canRead": true,
"canWrite": true
},
"component": {
"id": "ebe9c88a-77d7-4070-bead-e24329b2e9c1",
"identity": "nifi_superadmin",
"configurable": true
}
}
]
}
} "data-transfer/output-ports/a2a202da-0195-1000-0000-000045d2086d", where a2a202da-0195-1000-0000-000045d2086d is the id of the process group all the policies were created with 201 when i try to access the policies of the process group , i don't see the respective user-group having privilege to it, it just has super admins in the list
... View more
03-16-2025
06:16 AM
Note 2: I have also tried to fetch the policy id from the processor and appending it a policy configuration using nifi_api_request "policies/${action}${resource}" "GET" example policies/read/data/process-groups/9486a139-0195-1000-0000-00000ddc9b4f gives a json job will fetch the ID of the policy the call the policy nifi_api_request "policies/${policyId}" "GET" use policyConfig to append info where entity_id is the id of the user and entity_name is the name policyConfig=$(echo ${body} | jq ".component.userGroups[.component.userGroups | length] |= {\"revision\":{\"version\":0},\"id\":\"${entity_id}\",\"permissions\":{\"canRead\":${canRead},\"canWrite\":${canWrite}},\"component\":{\"id\":\"${entity_id}\",\"identity\":\"${entity_name}\",\"configurable\":true}}") This unfortunately started created large json and ended up assigning all the users access to all the process-groups
... View more
03-16-2025
06:08 AM
Hi All I have a list of users that i have in an array. I am trying to create a job in shell script that creates a process-group for each user and apply policy to that particular process group so that only that user and nifi's super user can access or operate in it here is how the polices config json looks like '[{"globalAccessPolicies":[{"resource":"flow","action":"read"},{"resource":"provenance","action":"read"}],"componentLevelAccessPolicies":[{"resource":"process-groups","action":"read"},{"resource":"process-groups","action":"write"},{"resource":"operation/process-groups","action":"write"},{"resource":"provenance-data/process-groups","action":"write"},{"resource":"provenance-data/process-groups","action":"read"},{"resource":"data/process-groups","action":"read"},{"resource":"data/process-groups","action":"write"},{"resource":"policies/process-groups","action":"read"},{"resource":"policies/process-groups","action":"write"},{"resource":"data-transfer/input-ports","action":"write"},{"resource":"data-transfer/output-ports","action":"write"}]}]' For each user in the array, the job iterates through globalAccessPolicies and componentLevelAccessPolicies and assigns permission to the user. before assigning the componentLevelAccessPolicies the job creates an empty process-group to in the nifi canvas using the api nifi_api_request "process-groups/root/process-groups" "POST" "Content-Type: application/json" "{\"revision\":{\"version\":0},\"component\":{\"name\":\"${tenant}\",\"position\":{\"x\":${x},\"y\":${y}},\"comments\":\"Processor group for ${tenant}\"}}" this returns a json from which the id of the process group is fetched then the Job uses the fetched ID, uses the componentLevelAccessPolicies array and tries to create a policy configuration using for policy in $(echo "${componentLevelAccessPolicies}" | jq -c '.[]'); do
resource=$(echo "${policy}" | jq -r '.resource')
action=$(echo "${policy}" | jq -r '.action')
policyConfig=$(echo '{
"revision": {
"version": 0
},
"component": {
"resource": "'${resource}'",
"action": "'${action}'",
"configurable": true,
"users": [],
"userGroups": []
}
}' | jq .)
policyConfig=$(echo "${policyConfig}" | jq ".component.userGroups += [{\"revision\":{\"version\":0},\"id\":\"${tenant_superadmin_id}\",\"permissions\":{\"canRead\":true,\"canWrite\":true},\"component\":{\"id\":\"${processor_group_id}\",\"identity\":\"${tenant_superadmin}\",\"configurable\":true}}]")
policyConfig=$(echo "${policyConfig}" | jq ".component.userGroups += [{\"revision\":{\"version\":0},\"id\":\"${tenant_readonly_id}\",\"permissions\":{\"canRead\":true,\"canWrite\":false},\"component\":{\"id\":\"${processor_group_id}\",\"identity\":\"${tenant_readonly}\",\"configurable\":true}}]")
policyConfig=$(echo "${policyConfig}" | jq ".component.userGroups += [{\"revision\":{\"version\":0},\"id\":\"${nifi_superadmin_id}\",\"permissions\":{\"canRead\":true,\"canWrite\":true},\"component\":{\"id\":\"${processor_group_id}\",\"identity\":\"${nifi_superadmin}\",\"configurable\":true}}]")
policyConfig=$(echo "${policyConfig}" | jq ".component.userGroups += [{\"revision\":{\"version\":0},\"id\":\"${nifi_readonly_id}\",\"permissions\":{\"canRead\":true,\"canWrite\":false},\"component\":{\"id\":\"${processor_group_id}\",\"identity\":\"${nifi_readonly}\",\"configurable\":true}}]")
echo "policy Group Config: ${policyConfig}"
nifi_api_request "policies/" "POST" "Content-Type: application/json" "${policyConfig}"
echo -e "status ${status}"
echo -e "body ${body}"
[[ ${status} -eq 201 ]] || exit 1
done The required Ids are fetched beforehand from the user groups Since job is trying to create policy to each processor group, i have set the http method as POST nifi_api_request "policies/" "POST" "Content-Type: application/json" "${policyConfig}"
in the Loop it creates a json paylod like this {
"revision": {
"version": 0
},
"component": {
"resource": "data-transfer/output-ports",
"action": "write",
"configurable": true,
"users": [],
"userGroups": [
{
"revision": {
"version": 0
},
"id": "9fd4eabd-5b6f-4a1d-8c5f-ca6049986d96",
"permissions": {
"canRead": true,
"canWrite": true
},
"component": {
"id": "9ef8a5e3-0195-1000-ffff-ffffb4a7b545",
"identity": "TenantID.john.nifi_superadmin",
"configurable": true
}
},
{
"revision": {
"version": 0
},
"id": "aba0f614-d09a-42ee-9081-3328c86fcd6e",
"permissions": {
"canRead": true,
"canWrite": false
},
"component": {
"id": "9ef8a5e3-0195-1000-ffff-ffffb4a7b545",
"identity": "TenantID.john.nifi_readonly",
"configurable": true
}
},
{
"revision": {
"version": 0
},
"id": "ebe9c88a-77d7-4070-bead-e24329b2e9c1",
"permissions": {
"canRead": true,
"canWrite": true
},
"component": {
"id": "9ef8a5e3-0195-1000-ffff-ffffb4a7b545",
"identity": "nifi_superadmin",
"configurable": true
}
},
{
"revision": {
"version": 0
},
"id": "95aedc49-20f6-4dad-95f5-c7311a66c353",
"permissions": {
"canRead": true,
"canWrite": false
},
"component": {
"id": "9ef8a5e3-0195-1000-ffff-ffffb4a7b545",
"identity": "nifi_readonly",
"configurable": true
}
}
]
}
}
the job did not throw any error for the first user in the loop ( though the process-group did not have the users assigned in the canvas PFA). but when it run for the second user it returns 400 bad request status 409
body Found multiple policies for 'process-groups' with 'read'.
Note: I have tried with configurations by setting the user-group id in the policy config json policyConfig=$(echo "${policyConfig}" | jq ".component.userGroups += [{\"revision\":{\"version\":0},\"id\":\"${tenant_superadmin_id}\",\"permissions\":{\"canRead\":true,\"canWrite\":true},\"component\":{\"id\":\"${tenant_superadmin_id}\",\"identity\":\"${tenant_superadmin}\",\"configurable\":true}}]") This did not work, because processor group id is not linked Can anyone please advise to how to set the policies uniquely for each process group so that only the user and super admins can access it? From the canvas it is achievable, when i select override as empty, But i need this to be done via API call Thanks much for your time.
... View more
Labels:
- Labels:
-
Apache NiFi
01-17-2025
01:39 AM
hi @satz , thanks for the reploy the encoding was the ISO-8859-1 I was able to make it possible for with a groovy script def Ids = flowFile.getAttribute('TestIDs')
def id = flowFile.getAttribute('ID')
if (Ids != null) {
def IdList = new ArrayList()
Ids.replaceAll("[\\[\\]]", "").split(",").each { tenant ->
IdList.add(tenant.trim())
}
headers['TestIDs'] = IdList
}
if (id != null) {
headers['ID'] = id
}
// Serialize the headersdef serializeHeader(headerValue) {
def byteArrayOutputStream = new ByteArrayOutputStream()
def objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
objectOutputStream.writeObject(headerValue)
objectOutputStream.flush()
return byteArrayOutputStream.toByteArray()
}
def serializedHeaders = [:]
headers.each { key, value ->
def serializedValue = serializeHeader(value)
serializedHeaders[key] = new String(serializedValue, 'ISO-8859-1')
}
// Update FlowFile with serialized headersserializedHeaders.each { key, value ->
flowFile = session.putAttribute(flowFile, "${key}", value)
}
session.transfer(flowFile, REL_SUCCESS)
... View more
11-06-2024
11:07 PM
1 Kudo
Hi All, This could be a silly question, Apologies for that. How do i serialize the headers sent from PublishKafka_2_6 processor ? Currently, the processor i am using is sending headers along with the payload as a string ID:N1,GRANTED-ID:[root,UI1] {
...
<Payload>
...
} we have other applications communicating with Kafka , which sends out headers along with payload as GRANTED-ID: ��♣sr‼java.util.ArrayListx��↔��a�♥☺I♦sizexp☻w♦☻t♦roott♦UI1x
ID: ��♣t♣N1{
...
<payload>
...
} Is it possible for NiFi also to send across as the above format?
... View more
Labels:
- Labels:
-
Apache NiFi
11-06-2024
10:47 PM
2 Kudos
@joseomjr , Thank you for responding, I instead chose a hard way approach. I thought why not create a custom Nar, which takes in 2 parameters, 1 for the json template with placeholders and 2 with its respective values. used sort of a recursion to create the final output. for and input like Template {"details":{"name":"${name}","age":"${age}","superpower":"${superpower}"}} value : {"name": "Clark Kent", "age": "35","superpower": "Superman"} gives output as { "details": { "name": "Clark Kent", "age": "35", "superpower": "Superman" } }
... View more
07-19-2024
07:26 AM
I have a python script that takes in text input in the format like this {"order":{"Testorder":{"operation":"create","specification":{"name":"${name}","version":"1.0.0-SNAPSHOT"},"parameters":{"controllerName":"${controllername}","parameters":{"parameter1":"${value1}","parameter2":"${value2}"}}}}}
{"name": "TestService", "controllername": "MyController_d","value1": "test","value2": "speed"} First line is a JSON Template with place holders. Second line another JSON string which holds the value to be replaced in the first line. (whole input (first line and second line together) are not a valid JSON) It creates an output like this {
"order" : {
"Testorder" : {
"operation" : "create",
"specification" : {
"name" : "TestService",
"version" : "1.0.2-SNAPSHOT"
},
"parameters" : {
"controllerName" : "MyController_d",
"parameters" : {
"parameter1" : "test",
"parameter2" : "strength"
}
}
}
}
} I have implemented this using a python script inside ExecuteScript processor import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from string import Template
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
incoming_flow = input_text.split('\n', 1)
if len(incoming_flow) < 2:
raise ValueError("Input data does not contain both template and values.")
template_str, values_str = incoming_flow
template_str = template_str.strip()
values_str = values_str.strip()
json.loads(template_str)
parameter_obj = json.loads(values_str)
tpl = Template(template_str)
json_string = tpl.substitute(parameter_obj)
# Replace placeholders and unwanted characters
replacements = {
"u'": "'",
'u"': '"',
'"{': '{',
'}"': '}',
"'": '"',
"False": "false",
"True": "true",
'"[': "[",
']"': "]"
}
for old, new in replacements.items():
json_string = json_string.replace(old, new)
outputStream.write(bytearray(json_string.encode('utf-8')))
except Exception as e:
traceback.print_exc(file=sys.stdout)
raise e
flowFile = session.get()
if flowFile is not None:
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.putAttribute(flowFile, 'mime.type', 'application/json')
session.transfer(flowFile, REL_SUCCESS)
except ValueError as ve:
session.putAttribute(flowFile, 'error_val', str({"Script.Exception": str(ve)}))
session.transfer(flowFile, REL_FAILURE)
except Exception as e:
session.putAttribute(flowFile, 'error_val', str({"Script.Exception": str(e)}))
session.transfer(flowFile, REL_FAILURE) Though there are a lot of string manipulation, so far it is working fine. Since python is deprecated in the ExecuteScript Processor. Was wondering if it would be possible to do this same operations using any other processor in NiFi without writing any of custom script. we're using 1.24.0 version of NiFi. Can the ReplaceText processor be used to achieve this or is there any other recommendation?
... View more
Labels:
- Labels:
-
Apache NiFi
06-12-2024
08:32 PM
1 Kudo
Hi @MattWho , I have figured it out, I set the access policy recieve data via site-to-site and its has now started to work. i used an api call to set the value referring to this. Access Policies | CDP Private Cloud (cloudera.com) thank you so much for your help. TO Summarize, nifi.properties bash-4.4$ cat conf/nifi.properties | grep remote
nifi.remote.input.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.remote.input.secure=true
nifi.remote.input.socket.port=10443
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.remote.contents.cache.expiration=30 secs
in another pod
nifi.remote.input.host=nifi-1.nifi-headless.namespace.svc.cluster.local nifi.web.https.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443
and respectively on another pod
nifi.web.https.host=nifi-1.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443 set access policies created reporting task url set is podname.svc/https port eg https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi set management controller service created an input port and remote group to send data
... View more
06-11-2024
10:39 PM
1 Kudo
Hi @MattWho , I tried a similar attempt with SiteToSiteBulletinReportingTask it complains that there is no port. and in the site to site component state it has
o.a.n.w.s.NiFiAuthenticationFilter Authentication Success [CN=nifi-api-admin] 10.255.14.191 GET https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi-api/site-to-site
o.a.n.r.SiteToSiteBulletinReportingTask SiteToSiteBulletinReportingTask[id=0adf334f-0190-1000-0000-00000230b61a] Error running task SiteToSiteBulletinReportingTask[id=0adf334f-0190-1000-0000-00000230b61a] due to org.apache.nifi.processor.exception.ProcessException: Failed to send Bulletins to destination due to IOException:Could not find Port with name 'prov' for remote NiFi instance" o.a.n.remote.StandardRemoteProcessGroup Unable to connect to RemoteProcessGroup[https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi] due to org.apache.http.NoHttpResponseException: nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443 failed to respond please let me know if you can help here. thank you
... View more