Member since
04-07-2022
38
Posts
11
Kudos Received
2
Solutions
My Accepted Solutions
| Title | Views | Posted |
|---|---|---|
| 838 | 03-17-2025 01:46 AM | |
| 5965 | 06-12-2024 08:32 PM |
05-07-2025
08:23 AM
Hi All, I am looking for an easier solution, I have nifi 1.28.1 in my system I have an invoke http process that which queries an application on an ID , it returns an array of json objects of the same kind in a paginated way. Each response contains an array of 50 json objects which is default, (of couse, the page size can be increased) There can be more that 100000 json objects retrieved from the application. The lenght of each json object will remain same. it just has the id and its status I want to generate a summary file which contain all the json objects and save it into an SFTP location. I am looking for a best approach to combine all the pages and save it into SFTP. I tried using the Merge Content Processor, but that did not work cause the first page response was not stored anywhere when the second page came in. Would it be makes sense to store and the array in a distributed cache, merge the array with next page and so forth till all the pages are complete? The distributed cache location can be pointed to a PVC Or would it be a better solution if i try to store in SFTP, read the same file from SFTP, append new page and overwrite the summary file with the new array, till all the pages are complete? Looking for some advice here. Thank you
... View more
Labels:
- Labels:
-
Apache NiFi
03-17-2025
01:46 AM
1 Kudo
Update : Solved it i was missing a '/' infront of the resources that i was providing it should have been "resource": "/data-transfer/output-ports/a2a202da-0195-1000-0000-000045d2086d", instead of "resource": "data-transfer/output-ports/a2a202da-0195-1000-0000-000045d2086d", comparing the policies created with UI and with API made me realize it
... View more
01-17-2025
01:39 AM
hi @satz , thanks for the reploy the encoding was the ISO-8859-1 I was able to make it possible for with a groovy script def Ids = flowFile.getAttribute('TestIDs')
def id = flowFile.getAttribute('ID')
if (Ids != null) {
def IdList = new ArrayList()
Ids.replaceAll("[\\[\\]]", "").split(",").each { tenant ->
IdList.add(tenant.trim())
}
headers['TestIDs'] = IdList
}
if (id != null) {
headers['ID'] = id
}
// Serialize the headersdef serializeHeader(headerValue) {
def byteArrayOutputStream = new ByteArrayOutputStream()
def objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
objectOutputStream.writeObject(headerValue)
objectOutputStream.flush()
return byteArrayOutputStream.toByteArray()
}
def serializedHeaders = [:]
headers.each { key, value ->
def serializedValue = serializeHeader(value)
serializedHeaders[key] = new String(serializedValue, 'ISO-8859-1')
}
// Update FlowFile with serialized headersserializedHeaders.each { key, value ->
flowFile = session.putAttribute(flowFile, "${key}", value)
}
session.transfer(flowFile, REL_SUCCESS)
... View more
11-06-2024
10:47 PM
2 Kudos
@joseomjr , Thank you for responding, I instead chose a hard way approach. I thought why not create a custom Nar, which takes in 2 parameters, 1 for the json template with placeholders and 2 with its respective values. used sort of a recursion to create the final output. for and input like Template {"details":{"name":"${name}","age":"${age}","superpower":"${superpower}"}} value : {"name": "Clark Kent", "age": "35","superpower": "Superman"} gives output as { "details": { "name": "Clark Kent", "age": "35", "superpower": "Superman" } }
... View more
06-12-2024
08:32 PM
1 Kudo
Hi @MattWho , I have figured it out, I set the access policy recieve data via site-to-site and its has now started to work. i used an api call to set the value referring to this. Access Policies | CDP Private Cloud (cloudera.com) thank you so much for your help. TO Summarize, nifi.properties bash-4.4$ cat conf/nifi.properties | grep remote
nifi.remote.input.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.remote.input.secure=true
nifi.remote.input.socket.port=10443
nifi.remote.input.http.enabled=true
nifi.remote.input.http.transaction.ttl=30 sec
nifi.remote.contents.cache.expiration=30 secs
in another pod
nifi.remote.input.host=nifi-1.nifi-headless.namespace.svc.cluster.local nifi.web.https.host=nifi-0.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443
and respectively on another pod
nifi.web.https.host=nifi-1.nifi-headless.namespace.svc.cluster.local
nifi.web.https.port=9443 set access policies created reporting task url set is podname.svc/https port eg https://nifi-0.nifi-headless.doc-norc.svc.cluster.local:9443/nifi set management controller service created an input port and remote group to send data
... View more
05-28-2024
03:18 AM
Could someone please help me with this ? Fetch Provenance data using SiteToSiteProvenanceRe... - Cloudera Community - 388418 configuration site to site is not working in http when nifi is running on https
... View more
12-11-2023
10:17 PM
we have nifi 1.23.2 running in k8s on cluster mode, we have 3 pods. we have been getting frequent flowSynchroniuzationException from the logs. This has been observed in previous versions also cluster due to: org.apache.nifi.controller.serialization.FlowSynchronizationException: Failed to connect node to cluster because local flow controller partially updated. Administrator should disconnect node and review flow for corruption."}
org.apache.nifi.controller.serialization.FlowSynchronizationException: Failed to connect node to cluster because local flow controller partially updated. Administrator should disconnect node and review flow for corruption.
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:1059)
at org.apache.nifi.controller.StandardFlowService.load(StandardFlowService.java:520)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:896)
at org.apache.nifi.NiFi.<init>(NiFi.java:172)
at org.apache.nifi.NiFi.<init>(NiFi.java:83)
at org.apache.nifi.NiFi.main(NiFi.java:332)
Caused by: org.apache.nifi.controller.serialization.FlowSynchronizationException: java.lang.IllegalStateException: A Parameter Context tries to inherit from another Parameter Context [5b8d6cf7-018c-1000-0000-00001849f526] that does not exist
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.synchronizeFlow(VersionedFlowSynchronizer.java:448)
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.sync(VersionedFlowSynchronizer.java:206)
at org.apache.nifi.controller.serialization.StandardFlowSynchronizer.sync(StandardFlowSynchronizer.java:42)
at org.apache.nifi.controller.FlowController.synchronize(FlowController.java:1530)
at org.apache.nifi.persistence.StandardFlowConfigurationDAO.load(StandardFlowConfigurationDAO.java:104)
at org.apache.nifi.controller.StandardFlowService.loadFromBytes(StandardFlowService.java:817)
at org.apache.nifi.controller.StandardFlowService.loadFromConnectionResponse(StandardFlowService.java:1028)
... 5 common frames omitted
Caused by: java.lang.IllegalStateException: A Parameter Context tries to inherit from another Parameter Context [5b8d6cf7-018c-1000-0000-00001849f526] that does not exist
at org.apache.nifi.controller.flow.AbstractFlowManager.withParameterContextResolution(AbstractFlowManager.java:559)
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.inheritParameterContexts(VersionedFlowSynchronizer.java:687)
at org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.synchronizeFlow(VersionedFlowSynchronizer.java:387)
... 11 common frames omitted pods except for 1 are all going to crash loop, we're unable to execute into the pod and delete the flowfiles. What could be the reason for this ? Please advise how to find the root cause and rectify this.
... View more
Labels:
- Labels:
-
Apache NiFi
12-05-2023
04:41 AM
@SAMSAL , Thank you. this works.
... View more
11-21-2023
08:45 AM
@SAMSAL thank you, that solved it
... View more
09-12-2023
07:03 AM
Turns out there is a workaround, there is an option to change the first processor to execute in single node, this will execute flowfiles from single node.
... View more