Member since
02-24-2025
10
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1008 | 04-24-2025 09:39 PM |
04-28-2025
10:40 PM
@joseomjr your code could not work at my end. ChatGPT help me with below code and it is working at my end. import java.util.zip.ZipInputStream
import java.util.zip.ZipEntry
import java.io.ByteArrayOutputStream
def flowFile = session.get()
if (!flowFile) return
try {
session.read(flowFile, { inputStream ->
ZipInputStream zipIn = new ZipInputStream(inputStream)
ZipEntry entry = zipIn.nextEntry
while (entry != null) {
if (!entry.isDirectory()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream()
byte[] buffer = new byte[8192]
int len
while ((len = zipIn.read(buffer)) > 0) {
baos.write(buffer, 0, len)
}
def contentBytes = baos.toByteArray()
def newFlowFile = session.create(flowFile)
newFlowFile = session.write(newFlowFile, { outStream ->
outStream.write(contentBytes)
} as OutputStreamCallback)
newFlowFile = session.putAttribute(newFlowFile, 'filename', entry.getName())
session.transfer(newFlowFile, REL_SUCCESS)
}
zipIn.closeEntry()
entry = zipIn.nextEntry
}
} as InputStreamCallback)
session.remove(flowFile)
} catch (Exception e) {
log.error("Failed to unzip FlowFile due to: ${e.message}", e)
session.transfer(flowFile, REL_FAILURE)
} I hope there is no security concern with this code!
... View more
04-25-2025
02:50 PM
Ok, to put it in context, we're currently using version 1.11.4, which has worked fairly well for us, but sometimes the nodes become desynchronized and won't reconnect. I restarted the cluster to fix this issue. However, now we're forced to change the version because we're uploading to BigQuery. When we run and activate the extraction processors "ExecuteSQL -> avroToJson" to release the queues, the BQ batch processor gives us this error, "Exceeded rated limit big query: too many table update operations for this table." This is because this NIFI is operating with the old version of the API, which is deprecated. GCP recommended we switch to the new API, and I understand that it's already using it since 1.16.3. Yesterday I tried it this way: OS: Cluster: 3 nodes on EC2 AWS instances, the instances are: Operating System: Amazon Linux 2 CPE OS Name: cpe:2.3:o:amazon:amazon_linux:2 Kernel: Linux 4.14.322-246.539.amzn2.x86_64 Architecture: x86-64 Nifi Productive 1. Current Nifi Version 1.11.4, with Java 1.8.0.382.b05 1.amzn2.0.1.x86_64, Apache-Zookeeper-3.6.3-bin Nifi Tested: 2. First Nifi Tested Version 1.16.3, with Java 1.8.0.382.b05 1.amzn2.0.1.x86_64, Apache-Zookeeper-3.6.3-bin 3. Second Nifi Version 1.28.1, with Java 11 Version java-11-amazon-corretto-headless, apache-zookeeper-3.6.3-bin For points 2 and 3, I created this thread. Now I'm at point 3 and I have the same problem. I inferred that it could be the Java version, but it isn't. However, I had to rollback and go back to 1.11.4, since we don't have a test environment, only a production one, and I couldn't stop the operation. It works without problems. I copied the configurations from my nodes to the others. I tried to solve but the logs did not tell me much, I only cut the most visible org.apache.nifi.web.server.jettyserver loading war ver-nar-1.16.3.nar-unpacked / nar-inf / bundled-dependencies / nifi-web-error-1.16.3.war, I checked the app, the user and the boopstrap, I did a search for error specifically for each log and nothing appeared, yesterday I checked the cluster was running with its 3 nodes and the status of nifi and they were running, also communication of the nodes was seen. To resume the subject I would have to wait for a problem to occur with the nodes to try again: nifi.properties, nodo 1: # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Core Properties # nifi.flow.configuration.file=./conf/flow.xml.gz nifi.flow.configuration.json.file=./conf/flow.json.gz nifi.flow.configuration.archive.enabled=true nifi.flow.configuration.archive.dir=./conf/archive/ nifi.flow.configuration.archive.max.time=5 days nifi.flow.configuration.archive.max.storage=500 MB nifi.flow.configuration.archive.max.count= nifi.flowcontroller.autoResumeState=true nifi.flowcontroller.graceful.shutdown.period=10 sec nifi.flowservice.writedelay.interval=500 ms nifi.administrative.yield.duration=60 sec # If a component has no work to do (is "bored"), how long should we wait before checking again for work? nifi.bored.yield.duration=10 millis nifi.queue.backpressure.count=10000 nifi.queue.backpressure.size=1 GB nifi.authorizer.configuration.file=./conf/authorizers.xml nifi.login.identity.provider.configuration.file=./conf/login-identity-providers.xml nifi.templates.directory=./conf/templates nifi.ui.banner.text= nifi.ui.autorefresh.interval=30 sec nifi.nar.library.directory=./lib nifi.nar.library.autoload.directory=./extensions nifi.nar.working.directory=./work/nar/ nifi.documentation.working.directory=./work/docs/components nifi.nar.unpack.uber.jar=false #################### # State Management # #################### nifi.state.management.configuration.file=./conf/state-management.xml # The ID of the local state provider nifi.state.management.provider.local=local-provider # The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster. nifi.state.management.provider.cluster=zk-provider # Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server nifi.state.management.embedded.zookeeper.start=false # Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties # Database Settings #nifi.database.directory=./database_repository nifi.database.directory=/data/database_repository # Repository Encryption properties override individual repository implementation properties nifi.repository.encryption.protocol.version= nifi.repository.encryption.key.id= nifi.repository.encryption.key.provider= nifi.repository.encryption.key.provider.keystore.location= nifi.repository.encryption.key.provider.keystore.password= # FlowFile Repository nifi.flowfile.repository.implementation=org.apache.nifi.controller.repository.WriteAheadFlowFileRepository nifi.flowfile.repository.wal.implementation=org.apache.nifi.wali.SequentialAccessWriteAheadLog # nifi.flowfile.repository.directory=./flowfile_repository nifi.flowfile.repository.directory=/data/flowfile_repository nifi.flowfile.repository.checkpoint.interval=2 mins nifi.flowfile.repository.always.sync=false nifi.flowfile.repository.retain.orphaned.flowfiles=true nifi.swap.manager.implementation=org.apache.nifi.controller.FileSystemSwapManager nifi.queue.swap.threshold=20000 # Content Repository nifi.content.repository.implementation=org.apache.nifi.controller.repository.FileSystemRepository nifi.content.claim.max.appendable.size=1 MB #nifi.content.repository.directory.default=./content_repository nifi.content.repository.directory.default=/data/content_repository nifi.content.repository.archive.max.retention.period=12 hours nifi.content.repository.archive.max.usage.percentage=50% nifi.content.repository.archive.enabled=true nifi.content.repository.always.sync=false nifi.content.viewer.url=../nifi-content-viewer/ # Provenance Repository Properties nifi.provenance.repository.implementation=org.apache.nifi.provenance.WriteAheadProvenanceRepository # Persistent Provenance Repository Properties #nifi.provenance.repository.directory.default=./provenance_repository nifi.provenance.repository.directory.default=/data/provenance_repository nifi.provenance.repository.max.storage.time=12 hours nifi.provenance.repository.max.storage.size=50 GB nifi.provenance.repository.rollover.time=10 mins nifi.provenance.repository.rollover.size=100 MB nifi.provenance.repository.query.threads=2 nifi.provenance.repository.index.threads=2 nifi.provenance.repository.compress.on.rollover=true nifi.provenance.repository.always.sync=false # Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are: # EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, Relationship, Details nifi.provenance.repository.indexed.fields=EventType, FlowFileUUID, Filename, ProcessorID, Relationship # FlowFile Attributes that should be indexed and made searchable. Some examples to consider are filename, uuid, mime.type nifi.provenance.repository.indexed.attributes= # Large values for the shard size will result in more Java heap usage when searching the Provenance Repository # but should provide better performance nifi.provenance.repository.index.shard.size=500 MB # Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from # the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved. nifi.provenance.repository.max.attribute.length=65536 nifi.provenance.repository.concurrent.merge.threads=2 # Volatile Provenance Respository Properties nifi.provenance.repository.buffer.size=100000 # Component and Node Status History Repository nifi.components.status.repository.implementation=org.apache.nifi.controller.status.history.VolatileComponentStatusRepository # Volatile Status History Repository Properties #nifi.components.status.repository.buffer.size=1440 nifi.components.status.repository.buffer.size=2880 nifi.components.status.snapshot.frequency=5 min # QuestDB Status History Repository Properties nifi.status.repository.questdb.persist.node.days=7 nifi.status.repository.questdb.persist.component.days=1 nifi.status.repository.questdb.persist.location=/data/status_repository # Site to Site properties nifi.remote.input.host=node-1 nifi.remote.input.secure=false nifi.remote.input.socket.port=10000 nifi.remote.input.http.enabled=true nifi.remote.input.http.transaction.ttl=30 sec nifi.remote.contents.cache.expiration=30 secs # web properties # ############################################# # For security, NiFi will present the UI on 127.0.0.1 and only be accessible through this loopback interface. # Be aware that changing these properties may affect how your instance can be accessed without any restriction. # We recommend configuring HTTPS instead. The administrators guide provides instructions on how to do this. nifi.web.http.host=node-1 nifi.web.http.port=8080 nifi.web.http.network.interface.default= ############################################# nifi.web.https.host= nifi.web.https.port= nifi.web.https.network.interface.default= nifi.web.https.application.protocols=http/1.1 nifi.web.jetty.working.directory=./work/jetty nifi.web.jetty.threads=200 nifi.web.max.header.size=16 KB nifi.web.proxy.context.path= nifi.web.proxy.host= nifi.web.max.content.size= nifi.web.max.requests.per.second=30000 nifi.web.max.access.token.requests.per.second=25 nifi.web.request.timeout=60 secs nifi.web.request.ip.whitelist= nifi.web.should.send.server.version=true nifi.web.request.log.format=%{client}a - %u %t "%r" %s %O "%{Referer}i" "%{User-Agent}i" # Filter JMX MBeans available through the System Diagnostics REST API nifi.web.jmx.metrics.allowed.filter.pattern= # Include or Exclude TLS Cipher Suites for HTTPS nifi.web.https.ciphersuites.include= nifi.web.https.ciphersuites.exclude= # security properties # nifi.sensitive.props.key=1234567890ab nifi.sensitive.props.key.protected= nifi.sensitive.props.algorithm=NIFI_PBKDF2_AES_GCM_256 nifi.sensitive.props.additional.keys= nifi.security.autoreload.enabled=false nifi.security.autoreload.interval=10 secs nifi.security.keystore= nifi.security.keystoreType= nifi.security.keystorePasswd= nifi.security.keyPasswd= nifi.security.truststore= nifi.security.truststoreType= nifi.security.truststorePasswd= nifi.security.user.authorizer=single-user-authorizer nifi.security.allow.anonymous.authentication=false nifi.security.user.login.identity.provider=single-user-provider nifi.security.user.jws.key.rotation.period=PT1H nifi.security.ocsp.responder.url= nifi.security.ocsp.responder.certificate= # OpenId Connect SSO Properties # nifi.security.user.oidc.discovery.url= nifi.security.user.oidc.connect.timeout=5 secs nifi.security.user.oidc.read.timeout=5 secs nifi.security.user.oidc.client.id= nifi.security.user.oidc.client.secret= nifi.security.user.oidc.preferred.jwsalgorithm= nifi.security.user.oidc.additional.scopes= nifi.security.user.oidc.claim.identifying.user= nifi.security.user.oidc.fallback.claims.identifying.user= nifi.security.user.oidc.claim.groups=groups nifi.security.user.oidc.truststore.strategy=JDK nifi.security.user.oidc.token.refresh.window=60 secs # Apache Knox SSO Properties # nifi.security.user.knox.url= nifi.security.user.knox.publicKey= nifi.security.user.knox.cookieName=hadoop-jwt nifi.security.user.knox.audiences= # SAML Properties # nifi.security.user.saml.idp.metadata.url= nifi.security.user.saml.sp.entity.id= nifi.security.user.saml.identity.attribute.name= nifi.security.user.saml.group.attribute.name= nifi.security.user.saml.request.signing.enabled=false nifi.security.user.saml.want.assertions.signed=true nifi.security.user.saml.signature.algorithm=http://www.w3.org/2001/04/xmldsig-more#rsa-sha256 nifi.security.user.saml.authentication.expiration=12 hours nifi.security.user.saml.single.logout.enabled=false nifi.security.user.saml.http.client.truststore.strategy=JDK nifi.security.user.saml.http.client.connect.timeout=30 secs nifi.security.user.saml.http.client.read.timeout=30 secs # Identity Mapping Properties # # These properties allow normalizing user identities such that identities coming from different identity providers # (certificates, LDAP, Kerberos) can be treated the same internally in NiFi. The following example demonstrates normalizing # DNs from certificates and principals from Kerberos into a common identity string: # # nifi.security.identity.mapping.pattern.dn=^CN=(.*?), OU=(.*?), O=(.*?), L=(.*?), ST=(.*?), C=(.*?)$ # nifi.security.identity.mapping.value.dn=$1@$2 # nifi.security.identity.mapping.transform.dn=NONE # nifi.security.identity.mapping.pattern.kerb=^(.*?)/instance@(.*?)$ # nifi.security.identity.mapping.value.kerb=$1@$2 # nifi.security.identity.mapping.transform.kerb=UPPER # Group Mapping Properties # # These properties allow normalizing group names coming from external sources like LDAP. The following example # lowercases any group name. # # nifi.security.group.mapping.pattern.anygroup=^(.*)$ # nifi.security.group.mapping.value.anygroup=$1 # nifi.security.group.mapping.transform.anygroup=LOWER # Listener Bootstrap properties # # This property defines the port used to listen for communications from NiFi Bootstrap. If this property # is missing, empty, or 0, a random ephemeral port is used. nifi.listener.bootstrap.port=0 # cluster common properties (all nodes must have same values) # nifi.cluster.protocol.heartbeat.interval=20 sec nifi.cluster.protocol.heartbeat.missable.max=8 nifi.cluster.protocol.is.secure=false # cluster node properties (only configure for cluster nodes) # nifi.cluster.is.node=true nifi.cluster.node.address=node-1 nifi.cluster.node.protocol.port=9089 nifi.cluster.node.protocol.max.threads=150 nifi.cluster.node.event.history.size=25 nifi.cluster.node.connection.timeout=90 sec nifi.cluster.node.read.timeout=90 sec nifi.cluster.node.max.concurrent.requests=100 nifi.cluster.firewall.file= nifi.cluster.flow.election.max.wait.time=5 mins nifi.cluster.flow.election.max.candidates= # cluster load balancing properties # nifi.cluster.load.balance.host=node-1 nifi.cluster.load.balance.port=6342 nifi.cluster.load.balance.connections.per.node=4 nifi.cluster.load.balance.max.thread.count=8 nifi.cluster.load.balance.comms.timeout=30 sec # zookeeper properties, used for cluster management # nifi.zookeeper.connect.string=node-1:2181,node-2:2181,node-3:2181 nifi.zookeeper.connect.timeout=60 secs nifi.zookeeper.session.timeout=60 secs nifi.zookeeper.root.node=/nifi nifi.zookeeper.client.secure=false nifi.zookeeper.security.keystore= nifi.zookeeper.security.keystoreType= nifi.zookeeper.security.keystorePasswd= nifi.zookeeper.security.truststore= nifi.zookeeper.security.truststoreType= nifi.zookeeper.security.truststorePasswd= nifi.zookeeper.jute.maxbuffer= # Zookeeper properties for the authentication scheme used when creating acls on znodes used for cluster management # Values supported for nifi.zookeeper.auth.type are "default", which will apply world/anyone rights on znodes # and "sasl" which will give rights to the sasl/kerberos identity used to authenticate the nifi node # The identity is determined using the value in nifi.kerberos.service.principal and the removeHostFromPrincipal # and removeRealmFromPrincipal values (which should align with the kerberos.removeHostFromPrincipal and kerberos.removeRealmFromPrincipal # values configured on the zookeeper server). nifi.zookeeper.auth.type= nifi.zookeeper.kerberos.removeHostFromPrincipal= nifi.zookeeper.kerberos.removeRealmFromPrincipal= # kerberos # nifi.kerberos.krb5.file= # kerberos service principal # nifi.kerberos.service.principal= nifi.kerberos.service.keytab.location= # kerberos spnego principal # nifi.kerberos.spnego.principal= nifi.kerberos.spnego.keytab.location= nifi.kerberos.spnego.authentication.expiration=12 hours # external properties files for variable registry # supports a comma delimited list of file locations nifi.variable.registry.properties= # analytics properties # nifi.analytics.predict.enabled=false nifi.analytics.predict.interval=3 mins nifi.analytics.query.interval=5 mins nifi.analytics.connection.model.implementation=org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares nifi.analytics.connection.model.score.name=rSquared nifi.analytics.connection.model.score.threshold=.90 # runtime monitoring properties nifi.monitor.long.running.task.schedule= nifi.monitor.long.running.task.threshold= # Enable automatic diagnostic at shutdown. nifi.diagnostics.on.shutdown.enabled=false # Include verbose diagnostic information. nifi.diagnostics.on.shutdown.verbose=false # The location of the diagnostics folder. nifi.diagnostics.on.shutdown.directory=./diagnostics # The maximum number of files permitted in the directory. If the limit is exceeded, the oldest files are deleted. nifi.diagnostics.on.shutdown.max.filecount=10 # The diagnostics folder's maximum permitted size in bytes. If the limit is exceeded, the oldest files are deleted. nifi.diagnostics.on.shutdown.max.directory.size=10 MB # Performance tracking properties ## Specifies what percentage of the time we should track the amount of time processors are using CPU, reading from/writing to content repo, etc. ## This can be useful to understand which components are the most expensive and to understand where system bottlenecks may be occurring. ## The value must be in the range of 0 (inclusive) to 100 (inclusive). A larger value will produce more accurate results, while a smaller value may be ## less expensive to compute. ## Results can be obtained by running "nifi.sh diagnostics <filename>" and then inspecting the produced file. nifi.performance.tracking.percentage=0 # NAR Provider Properties # # These properties allow configuring one or more NAR providers. A NAR provider retrieves NARs from an external source # and copies them to the directory specified by nifi.nar.library.autoload.directory. # # Each NAR provider property follows the format: # nifi.nar.library.provider.<identifier>.<property-name> # # Each NAR provider must have at least one property named "implementation". # # Example HDFS NAR Provider: # nifi.nar.library.provider.hdfs.implementation=org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider # nifi.nar.library.provider.hdfs.resources=/path/to/core-site.xml,/path/to/hdfs-site.xml # nifi.nar.library.provider.hdfs.storage.location=hdfs://hdfs-location # nifi.nar.library.provider.hdfs.source.directory=/nars # nifi.nar.library.provider.hdfs.kerberos.principal=nifi@NIFI.COM # nifi.nar.library.provider.hdfs.kerberos.keytab=/path/to/nifi.keytab # nifi.nar.library.provider.hdfs.kerberos.password= # # Example NiFi Registry NAR Provider: # nifi.nar.library.provider.nifi-registry.implementation=org.apache.nifi.registry.extension.NiFiRegistryNarProvider # nifi.nar.library.provider.nifi-registry.url=http://localhost:18080
... View more
04-25-2025
06:47 AM
@Shrink So I see you are trying to start a Process Group (which starts all the NiFi components within that process group). You are not setup with a production ready certificate nor production ready authentication and authorization configuration which makes setting up the necessary authorizations not possible. You would need to switch to using the managed authorizer which allow you to use the file-user-group-provider. This provider will allow you to define your NiFi node certificate DN as a user which you can then authorize as needed to make the rest-api call you want to make. Have you looked at using FlowFile Concurrency and Outbound Policy options available within the process group configuration to control the input and output of FlowFiles in and out of each process group? These settings would allow you to control the movement of FlowFiles from one PG to another and achieve I believe what you are trying to do with needing to programmatically start and stop Process groups via rest-api calls. Configuring a Process Group FlowFile Concurrency OutBound Policy Using rest-api calls first requires you to constantly check to make sure one PG is done processing all FlowFiles before you start the next. Not efficient design. You should try to design your dataflows so they are always running. Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more
04-24-2025
07:42 PM
Thank you @MattWho . I try third option first but it was not working at my side. Nifi shuts down saying no keystore file found. but second option works well for me. Thanks, I have also found below article where it explain how to change certificate https://www.linkedin.com/pulse/configuring-ssltls-authentication-apache-nifi-dhruv-sahu/ https://www.youtube.com/watch?v=9F6DnsD8TSo
... View more
02-26-2025
12:31 PM
Thank you @MattWho . Thanks for brief explanation. I understand the loop is causing large number of queue. Let me redesign the flow. Thanks !
... View more
02-25-2025
05:49 AM
@dan_lucas From the exception, this appears to be a configuration issue most likely. You'll want to verify the NiFi Expression Language statement used in the putFTP processor's "Remote Path" property. I assume you have something configured there like ${absolute.path}/${airlinename} in that property? If you manually connect to the FTP server can you successfully navigate the path? Please help our community grow. If you found any of the suggestions/solutions provided helped you with solving your issue or answering your question, please take a moment to login and click "Accept as Solution" on one or more of them that helped. Thank you, Matt
... View more