Member since
04-07-2022
32
Posts
10
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2410 | 06-12-2024 08:32 PM |
11-06-2024
11:07 PM
1 Kudo
Hi All, This could be a silly question, Apologies for that. How do i serialize the headers sent from PublishKafka_2_6 processor ? Currently, the processor i am using is sending headers along with the payload as a string ID:N1,GRANTED-ID:[root,UI1] {
...
<Payload>
...
} we have other applications communicating with Kafka , which sends out headers along with payload as GRANTED-ID: ��♣sr‼java.util.ArrayListx��↔��a�♥☺I♦sizexp☻w♦☻t♦roott♦UI1x
ID: ��♣t♣N1{
...
<payload>
...
} Is it possible for NiFi also to send across as the above format?
... View more
Labels:
- Labels:
-
Apache NiFi
11-06-2024
10:47 PM
2 Kudos
@joseomjr , Thank you for responding, I instead chose a hard way approach. I thought why not create a custom Nar, which takes in 2 parameters, 1 for the json template with placeholders and 2 with its respective values. used sort of a recursion to create the final output. for and input like Template {"details":{"name":"${name}","age":"${age}","superpower":"${superpower}"}} value : {"name": "Clark Kent", "age": "35","superpower": "Superman"} gives output as { "details": { "name": "Clark Kent", "age": "35", "superpower": "Superman" } }
... View more
07-19-2024
07:26 AM
I have a python script that takes in text input in the format like this {"order":{"Testorder":{"operation":"create","specification":{"name":"${name}","version":"1.0.0-SNAPSHOT"},"parameters":{"controllerName":"${controllername}","parameters":{"parameter1":"${value1}","parameter2":"${value2}"}}}}}
{"name": "TestService", "controllername": "MyController_d","value1": "test","value2": "speed"} First line is a JSON Template with place holders. Second line another JSON string which holds the value to be replaced in the first line. (whole input (first line and second line together) are not a valid JSON) It creates an output like this {
"order" : {
"Testorder" : {
"operation" : "create",
"specification" : {
"name" : "TestService",
"version" : "1.0.2-SNAPSHOT"
},
"parameters" : {
"controllerName" : "MyController_d",
"parameters" : {
"parameter1" : "test",
"parameter2" : "strength"
}
}
}
}
} I have implemented this using a python script inside ExecuteScript processor import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from string import Template
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
incoming_flow = input_text.split('\n', 1)
if len(incoming_flow) < 2:
raise ValueError("Input data does not contain both template and values.")
template_str, values_str = incoming_flow
template_str = template_str.strip()
values_str = values_str.strip()
json.loads(template_str)
parameter_obj = json.loads(values_str)
tpl = Template(template_str)
json_string = tpl.substitute(parameter_obj)
# Replace placeholders and unwanted characters
replacements = {
"u'": "'",
'u"': '"',
'"{': '{',
'}"': '}',
"'": '"',
"False": "false",
"True": "true",
'"[': "[",
']"': "]"
}
for old, new in replacements.items():
json_string = json_string.replace(old, new)
outputStream.write(bytearray(json_string.encode('utf-8')))
except Exception as e:
traceback.print_exc(file=sys.stdout)
raise e
flowFile = session.get()
if flowFile is not None:
try:
flowFile = session.write(flowFile, PyStreamCallback())
session.putAttribute(flowFile, 'mime.type', 'application/json')
session.transfer(flowFile, REL_SUCCESS)
except ValueError as ve:
session.putAttribute(flowFile, 'error_val', str({"Script.Exception": str(ve)}))
session.transfer(flowFile, REL_FAILURE)
except Exception as e:
session.putAttribute(flowFile, 'error_val', str({"Script.Exception": str(e)}))
session.transfer(flowFile, REL_FAILURE) Though there are a lot of string manipulation, so far it is working fine. Since python is deprecated in the ExecuteScript Processor. Was wondering if it would be possible to do this same operations using any other processor in NiFi without writing any of custom script. we're using 1.24.0 version of NiFi. Can the ReplaceText processor be used to achieve this or is there any other recommendation?
... View more
Labels:
- Labels:
-
Apache NiFi
06-12-2024
08:32 PM
1 Kudo
Hi @MattWho , I have figured it out, I set the access policy recieve data via site-to-site and its has now started to work. i used an api call to set the value referring to this. Access Policies | CDP Private Cloud (cloudera.com) thank you so much for your help. TO Summarize, nifi.properties bash-4.4$ cat conf/nifi.properties | grep remote
nifi.remote.input.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.remote.input.secure=true
nifi.remote.input.socket.port=10443
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.remote.contents.cache.expiration=30 secs
in another pod
nifi.remote.input.host=nifi-1.nifi-headless.namespace.svc.cluster.local nifi.web.https.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443
and respectively on another pod
nifi.web.https.host=nifi-1.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443 set access policies created reporting task url set is podname.svc/https port eg https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi set management controller service created an input port and remote group to send data
... View more
06-11-2024
10:39 PM
1 Kudo
Hi @MattWho , I tried a similar attempt with SiteToSiteBulletinReportingTask it complains that there is no port. and in the site to site component state it has
o.a.n.w.s.NiFiAuthenticationFilter Authentication Success [CN=nifi-api-admin] 10.255.14.191 GET https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi-api/site-to-site
o.a.n.r.SiteToSiteBulletinReportingTask SiteToSiteBulletinReportingTask[id=0adf334f-0190-1000-0000-00000230b61a] Error running task SiteToSiteBulletinReportingTask[id=0adf334f-0190-1000-0000-00000230b61a] due to org.apache.nifi.processor.exception.ProcessException: Failed to send Bulletins to destination due to IOException:Could not find Port with name 'prov' for remote NiFi instance" o.a.n.remote.StandardRemoteProcessGroup Unable to connect to RemoteProcessGroup[https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi] due to org.apache.http.NoHttpResponseException: nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443 failed to respond please let me know if you can help here. thank you
... View more
06-07-2024
10:27 PM
1 Kudo
Hi @MattWho , Thank you for your response. I have tried with /nifi also in the URLs, but unfortunately the data is not coming in we're using https, there is no value set for http . instead we have https nifi.web.https.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443
and respectively on other pod
nifi.web.https.host=nifi-1.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443 we also have proxy host nifi.web.proxy.context.path=/apigw/namespace/nifi
nifi.web.proxy.host=ckng.apps.ao0059.tre.nsn-rdnet.net:443, nifi-headless.namespace.svc.cluster.local:9443 which is same for both the pods Yes the host name is reachable bash-4.4$ ping nifi-1.nifi-headless.namespace.svc.cluster.local
PING nifi-1.nifi-headless.namespace.svc.cluster.local (10.255.8.118) 56(84) bytes of data.
64 bytes from nifi-1.nifi-headless.namespace.svc.cluster.local (10.255.8.118): icmp_seq=1 ttl=64 time=0.019 ms
64 bytes from nifi-1.nifi-headless.namespace.svc.cluster.local (10.255.8.118): icmp_seq=2 ttl=64 time=0.027 ms
and for nifi0
bash-4.4$ ping nifi-0.nifi-headless.namespace.svc.cluster.local
PING nifi-0.nifi-headless.namespace.svc.cluster.local (10.255.8.118) 56(84) bytes of data.
64 bytes from nifi-0.nifi-headless.namespace.svc.cluster.local (10.255.8.118): icmp_seq=1 ttl=64 time=0.019 ms
64 bytes from nifi-0.nifi-headless.namespace.svc.cluster.local (10.255.8.118): icmp_seq=2 ttl=64 time=0.027 ms the configurations match for both the pods yes , private key used have the proper EKUs and SAN entries
... View more
06-06-2024
03:29 AM
Hello @MattWho, Sorry to come back on this topic. I am trying to implement the same S2S reporting task in a Kubernetes environment. we have NiFi running in cluster mode. and we have 2 pods (it is usually 3, on embedded zookeeper. We're just working with 2 nodes for the time being) The configuration for s2s is set as follows bash-4.4$ cat conf/nifi.properties | grep remote
nifi.remote.input.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.remote.input.secure=true
nifi.remote.input.socket.port=10443
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.remote.contents.cache.expiration=30 secs I have tried with protocol RAW and HTTP we have also given permission to the user in Retrieve site-to-site details we have set the destination URL as https://<fullyqualifiedDNS>:portnumber https://nifi-0.nifi-headless.namespace.svc.cluster.local:9443 the host is nifi.remote.input.host=nifi-0.nifi-headless.doc-norc.svc.cluster.local port number we're using is nifi.web.https.port=9443 but the events are not coming into the port logs says the authentication is successful {"type":"log", "facility":"25", "host":"ao0059-cjts5-worker-0-lqm8m", "level":"INFO", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"706546b360714e94b74591ca351b0655", "system":"nifi-0", "time":"2024-06-06T12:06:13.189Z" ,"timezone":"UTC", "log":"[NiFi Web Server-1830] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 10.255.15.73 [CN=nifi-api-admin] GET https://nifi-0.nifi-headless.namespace.svc.cluster.local:9443/nifi-api/site-to-site"}
{"type":"log", "facility":"25", "host":"ao0059-cjts5-worker-0-lqm8m", "level":"INFO", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"706546b360714e94b74591ca351b0655", "system":"nifi-0", "time":"2024-06-06T09:51:41.644Z" ,"timezone":"UTC", "log":"[NiFi Web Server-854] o.a.n.w.s.NiFiAuthenticationFilter Authentication Success [CN=nifi-api-admin] 10.255.15.73 GET https://nifi-0.nifi-headless.namespace.svc.cluster.local:9443/nifi-api/site-to-site"} The log also says No events to send due to 'events' being null or empty. {"type":"log", "host":"ao0059-cjts5-worker-0-lqm8m", "level":"DEBUG", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"706546b360714e94b74591ca351b0655", "system":"nifi-0", "time":"2024-06-06T10:05:39.499Z" ,"timezone":"UTC", "log":"[Timer-Driven Process Thread-8] o.a.n.r.SiteToSiteProvenanceReportingTask SiteToSiteProvenanceReportingTask[id=ecacc388-018f-1000-ffff-ffff8c5138a7] Returning LOCAL State: StandardStateMap[version=-1, values={}]"}
{"type":"log", "host":"ao0059-cjts5-worker-0-lqm8m", "level":"DEBUG", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"706546b360714e94b74591ca351b0655", "system":"nifi-0", "time":"2024-06-06T10:05:39.499Z" ,"timezone":"UTC", "log":"[Timer-Driven Process Thread-8] o.a.n.r.SiteToSiteProvenanceReportingTask SiteToSiteProvenanceReportingTask[id=ecacc388-018f-1000-ffff-ffff8c5138a7] No events to send due to 'events' being null or empty."}
{"type":"log", "host":"ao0059-cjts5-worker-0-lqm8m", "level":"DEBUG", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"706546b360714e94b74591ca351b0655", "system":"nifi-0", "time":"2024-06-06T10:05:44.501Z" ,"timezone":"UTC", "log":"[Timer-Driven Process Thread-4] o.a.n.r.SiteToSiteProvenanceReportingTask SiteToSiteProvenanceReportingTask[id=ecacc388-018f-1000-ffff-ffff8c5138a7] Returning LOCAL State: StandardStateMap[version=-1, values={}]"}
{"type":"log", "host":"ao0059-cjts5-worker-0-lqm8m", "level":"DEBUG", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"706546b360714e94b74591ca351b0655", "system":"nifi-0", "time":"2024-06-06T10:05:44.502Z" ,"timezone":"UTC", "log":"[Timer-Driven Process Thread-4] o.a.n.r.SiteToSiteProvenanceReportingTask SiteToSiteProvenanceReportingTask[id=ecacc388-018f-1000-ffff-ffff8c5138a7] No events to send due to 'events' being null or empty."} there are entries on data provenance there was also an issue where the task complained that the input port is not available {"type":"log", "host":"ao0059-cjts5-worker-0-z5b82", "level":"ERROR", "event-type":"N_USER_OPER", "systemid":"nifi","neid":"174577b6145e4b87a86e5d9c397c8f75", "system":"nifi-0", "time":"2024-06-04T11:20:41.566Z" ,"timezone":"UTC", "log":"[Timer-Driven Process Thread-3] o.a.n.r.SiteToSiteProvenanceReportingTask SiteToSiteProvenanceReportingTask[id=e2e547c5-018f-1000-0000-00004876faee] Error running task SiteToSiteProvenanceReportingTask[id=e2e547c5-018f-1000-0000-00004876faee] due to org.apache.nifi.processor.exception.ProcessException: Failed to send Provenance Events to destination due to IOException:Could not find Port with name 'prov' for remote NiFi instance"} but now this is not coming up, though we have not made any changes on it. there are no other errors in the logs, have enabled debug to org.apache.nifi.reporting I tried the nifi-api https://nifi-0.nifi-headless.namespace.svc.cluster.local:9443/nifi-api/site-to-site/peers and https://nifi-0.nifi-headless.namespace.svc.cluster.local:9443/nifi-api/nifi-api/site-to-site Can you please help us with this? Thank you for your time
... View more
05-31-2024
04:51 AM
@MattWho I was able to make this running by running this on Windows. Using the same configurations as you have done. My docker runs on WSL, Nifi was not coming up when i changed the host name to localhost. Thank you so much for your time
... View more
05-29-2024
10:03 PM
1 Kudo
Hello @MattWho, thank you for looking into it. I have set nifi.remote.input.secure=true, # Site to Site properties 8c92690b14e6
nifi.remote.input.host=cd8e8c899db6
nifi.remote.input.secure=true
nifi.remote.input.socket.port=10000
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.remote.contents.cache.expiration=30 secs S2SProvenanceReportingTask is not producing any bulletin messages There are no errors in the user log. I can see the StandardRestrictedSSLContextService and SiteToSiteProvenanceReportingTask started in the logs 2024-05-30 04:48:43,666 INFO [Timer-Driven Process Thread-9] o.a.n.c.s.StandardControllerServiceNode Successfully enabled StandardControllerServiceNode[service=SSLContextService[id=c27f79ba-018f-1000-ada5-343b2ba8f4e2], name=StandardRestrictedSSLContextService, active=true]
2024-05-30 04:49:07,157 INFO [Timer-Driven Process Thread-1] o.a.n.c.s.TimerDrivenSchedulingAgent SiteToSiteProvenanceReportingTask[id=a971da9d-018f-1000-2b00-6824f28134d8] started. I am using the out of the box installation for NiFi , have not created the any other certificates. Have set the same values from the configurations that is used in nifi.security properties for configuring the StandardRestrictedSSLContextService. nifi.security.autoreload.enabled=false
nifi.security.autoreload.interval=10 secs
nifi.security.keystore=./conf/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=b465f3c4cb37f83f825a2166a656719f
nifi.security.keyPasswd=b465f3c4cb37f83f825a2166a656719f
nifi.security.truststore=./conf/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=e20ef7bb480f25c7e2446bbaffc1d95b
... View more
05-28-2024
09:14 PM
Hi @MattWho , Thank you for your response. I am not trying to send data unsecured over network. Nifi running in my local is on https and i want it to stay it that way. But I would also like to fetch the Provenance data I was trying to configure my nifi running on standalone mode based on what is described in the document https://community.cloudera.com/t5/Community-Articles/Extracting-NiFi-Provenance-Data-using/ta-p/248469 I have changed the nifi.remote.input.http.enabled as true also tried adding StandardRestrictedSSLContextService I have used the same value which is there in the truststore and keystore values in the nifi.properties. i can see logs like this // Another save pending = false
2024-05-29 04:05:25,991 INFO [Timer-Driven Process Thread-7] o.a.n.c.s.TimerDrivenSchedulingAgent SiteToSiteProvenanceReportingTask[id=a971da9d-018f-1000-2b00-6824f28134d8] started.
2024-05-29 04:05:26,214 INFO [Flow Service Tasks Thread-1] o.a.nifi.controller.StandardFlowService Saved flow controller org.apache.nifi.controller.FlowController@c18025a // Another save pending = false
2024-05-29 04:05:36,347 INFO [pool-7-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2024-05-29 04:05:36,348 INFO [pool-7-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Successfully checkpointed FlowFile Repository with 23 records in 0 milliseconds
2024-05-29 04:05:37,621 INFO [Timer-Driven Process Thread-6] o.a.n.p.store.WriteAheadStorePartition Successfully rolled over Event Writer for Provenance Event Store Partition[directory=./provenance_repository] due to MAX_TIME_REACHED. Event File was 17.76 KB and contained 10 events.
2024-05-29 04:05:56,348 INFO [pool-7-thread-1] o.a.n.c.r.WriteAheadFlowFileRepository Initiating checkpoint of FlowFile Repository
2024-05-29 04:05:56,352 INFO [pool-7-thread-1] o.a.n.wali.SequentialAccessWriteAheadLog Checkpointed Write-Ahead Log with 24 Records and 0 Swap Files in 3 milliseconds (Stop-the-world time = 1 milliseconds), max Transaction ID 48 But unfortunately, I do not see any data flowing into the input port
... View more