Member since
12-19-2022
6
Posts
0
Kudos Received
0
Solutions
03-13-2023
09:11 AM
I have a Nifi pipeline (cluster with 3 nodes) that drop postgresql constraints and triggers, transform data in several tables and insert transformed data in a destination database, and recreate constraints and triggers. I need the pipeline to wait that all the flowfiles have been transformed to release one flowfile to recreate constraints and triggers. I cannot leave the pipeline without waiting all flowfiles because the first flowfile that arrived to the recreate constraint processor will recreate constraints while other flowfiles are still transforming and inserting data to destination database. Schema of what we need : What I have tried : I create a temporary table having a status column that is updated with true when data in the flowfile has been inserted in destination database. Then I have a groovy code that allows me to compare the number of flowfile initially created and the count of the status column equals true. If this comparison is true, the release identifier is created. It means that the last flowfile having the equality passes and recreate the constraints and triggers. This is not working because the count of several flowfile is the same (I don't know why), so several flowfile are passing the release or none of the flowfile pass the release processor. Do you an idea to stop the pipeline until all the flowfile arrived to this point (red bar on schema above) ?
... View more
Labels:
- Labels:
-
Apache NiFi
02-23-2023
04:47 AM
Hello, in K8S I have deployed Nifi with 2 nodes and Request : 2 cpu 4Gi RAM / Limits : 8 cpu 16Gi RAM. Heap init is 2G and max 4. My pipeline works with data requested from postgre database less than 10 millions rows (one table). When I have more data (e.g 11 millions rows) that I want to get with executesql record my pipeline fail with this log : 2023-02-23 08:47:48,466 ERROR [Index Provenance Events-2] o.a.n.p.index.lucene.EventIndexTask Failed to index Provenance Events
org.apache.lucene.store.AlreadyClosedException: this IndexWriter is closed And next log is : Caused by: java.nio.file.FileAlreadyExistsException: /opt/nifi/nifi-current/provenance_repository/lucene-8-index-1676991780200/_48.cfs What can be the reasons ? It's like RAM/CPU are not enough but not sure. 11 millions doesn't seem to be alot.
... View more
Labels:
- Labels:
-
Apache NiFi
02-13-2023
08:23 AM
I work in Kubernetes. I deployed Nifi and SFTP server in other environment and everything works. So in the other environment there is a problem I don't understand. I applied the same deployment for both environment. Can the IP be blacklisted ?
... View more
02-13-2023
06:36 AM
Hello thank you for your advices. In nifi-app.log I only saw : "Stopping processor. Stopping scheduling". I never put ssh information in listsftp and it worked well before.Maybe the problem is on SFTP side. When I ssh on this SFTP server on my shell laptop it says "sftp only...".
... View more
02-13-2023
06:06 AM
Hello, I have an error I don't understand with ListSFTP processor. When I click on "Verify properties" for this processor it lists well an SFTP folder. Successfully listed contents of Remote Directory [/data/folder_1/] on [hosname:port]. Found 2 objects. Of those, 2 match the filter. But when I execute this processor I have the error I never seen: ListSFTP[id=X] Processing failed: org.apache.nifi.processors.standard.socket.ClientConnectException: SSH Client connection failed [hosname:port] - Caused by: java.net.SocketTimeoutException: connect timed out Why can I list when verify properties but processor doesn't work ? (Nifi version : 1.17.0) I changed nothing on Nifi and don't know what to do to correct the problem or what to do to investigate.
... View more
Labels:
- Labels:
-
Apache NiFi
12-19-2022
04:27 AM
I am using external zookeeper and I have applied this yaml in order to have Nifi in cluster mode in Kubernetes (see below) apiVersion: apps/v1
kind: Deployment
metadata:
name: nifi
labels:
name: nifi
app: nifi
annotations:
app.kubernetes.io/name: nifi
app.kubernetes.io/part-of: nifi
spec:
revisionHistoryLimit: 1
strategy:
type: Recreate
selector:
matchLabels:
app: nifi
template:
metadata:
labels:
app: nifi
spec:
automountServiceAccountToken: false
enableServiceLinks: false
restartPolicy: Always
securityContext:
runAsGroup: 1000
runAsUser: 1000
runAsNonRoot: true
seccompProfile:
type: RuntimeDefault
containers:
- name: nifi
image: XXX
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
name: nifi
- containerPort: 8082
name: cluster
env:
- name: "NIFI_SENSITIVE_PROPS_KEY"
value: "XXX"
- name: ALLOW_ANONYMOUS_LOGIN
value: "no"
- name: SINGLE_USER_CREDENTIALS_USERNAME
value: XXX
- name: SINGLE_USER_CREDENTIALS_PASSWORD
value: XXX
- name: NIFI_WEB_HTTP_HOST
value: "0.0.0.0"
- name: NIFI_WEB_HTTP_PORT
value: "8080"
# - name: NIFI_WEB_PROXY_HOST
# value: 0.0.0.0:8080
# - name: HOSTNAME
# value: "nifi1"
- name: NIFI_ANALYTICS_PREDICT_ENABLED
value: "true"
- name: NIFI_ELECTION_MAX_CANDIDATES
value: "2"
- name: NIFI_ELECTION_MAX_WAIT
value: "1 min"
- name: NIFI_CLUSTER_IS_NODE
value: "true"
- name: NIFI_JVM_HEAP_INIT
value: "3g"
- name: NIFI_JVM_HEAP_MAX
value: "4g"
- name: NIFI_CLUSTER_NODE_CONNECTION_TIMEOUT
value: "2 min"
- name: NIFI_CLUSTER_PROTOCOL_CONNECTION_HANDSHAKE_TIMEOUT
value: "2 min"
- name: NIFI_CLUSTER_NODE_PROTOCOL_MAX_THREADS
value: "15"
- name: NIFI_CLUSTER_NODE_PROTOCOL_PORT
value: "8082"
- name: NIFI_CLUSTER_NODE_READ_TIMEOUT
value: "15"
- name: NIFI_ZK_CONNECT_STRING
value: "zookeeper:2181"
- name: NIFI_CLUSTER_NODE_ADDRESS
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: status.podIP
livenessProbe:
exec:
command:
- pgrep
- java
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
successThreshold: 1
readinessProbe:
exec:
command:
- pgrep
- java
initialDelaySeconds: 180
periodSeconds: 30
timeoutSeconds: 10
failureThreshold: 3
successThreshold: 1
resources:
requests:
cpu: 400m
memory: 1Gi
limits:
cpu: 500m
memory: 2Gi
securityContext:
allowPrivilegeEscalation: false
privileged: false
capabilities:
drop:
- ALL
I have one node in my Nifi cluster and node adress is : 0.0.0.0:8080 I would like to have 3 nodes and I think having 0.0.0.0:8080 for nodes is not good. I don't know how to deploy the 3 nodes in Kubernetes and with different node address. How should I do it and modify my yaml ?
... View more
Labels:
- Labels:
-
Apache NiFi
-
Apache Zookeeper