Member since
02-17-2022
19
Posts
4
Kudos Received
2
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
7703 | 03-03-2022 08:32 AM | |
9430 | 02-21-2022 12:06 PM |
02-10-2025
12:58 PM
Hello all - I'm trying to run the below spark-shell command from the bin directory of spark 3.4.3 extracted location. I specified the master as my Kubernetes environment as I'd like my executors to run on the k8s environment. I created a service account with all necessary permissions. # kubectl auth can-i create pod --as=system:serviceaccount:my-namespace:spark-sa -n my-namespace yes Command: # export K8S_TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='spark-sa')].data.token}"|base64 --decode) # ./bin/spark-shell \ --master k8s://https://my-k8s-cluster:6443 \ --deploy-mode client \ --name spark-shell-poc \ --conf spark.executor.instances=1 \ --conf spark.kubernetes.container.image=my-docker-hub/spark_poc:v1.4 \ --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \ --conf spark.kubernetes.namespace=dynx-center-resources \ --conf spark.driver.pod.name=dynx-spark-driver \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \ --conf spark.kubernetes.authenticate.submission.oauthToken=$K8S_TOKEN Even though I specified the service account to use and its token, it always ends up in using 'system:anonymous' user to create pods in my k8s environment and because of that I get the below error (snippet from a huge stack trace). 25/02/06 14:36:32 WARN ExecutorPodsSnapshotsStoreImpl: Exception when notifying snapshot subscriber. io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: https://my-k8s-cluster:6443/api/v1/namespaces/dynx-center-resources/pods. Message: pods is forbidden: User "system:anonymous" cannot create resource "pods" in API group "" in the namespace "dynx-center-resources". Received status: Status(apiVersion=v1, code=403, details=StatusDetails(causes=[], group=null, kind=pods, name=null, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=pods is forbidden: User "system:anonymous" cannot create resource "pods" in API group "" in the namespace "dynx-center-resources", metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Forbidden, status=Failure, additionalProperties={}). at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238) at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:538) at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:558) at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleCreate(OperationSupport.java:349) at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:711) at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:93) at io.fabric8.kubernetes.client.dsl.internal.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:42) at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.create(BaseOperation.java:1113) at io.fabric8.kubernetes.client.dsl.internal.BaseOperation.create(BaseOperation.java:93) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$requestNewExecutors$1(ExecutorPodsAllocator.scala:440) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.requestNewExecutors(ExecutorPodsAllocator.scala:417) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36(ExecutorPodsAllocator.scala:370) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$onNewSnapshots$36$adapted(ExecutorPodsAllocator.scala:363) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.onNewSnapshots(ExecutorPodsAllocator.scala:363) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3(ExecutorPodsAllocator.scala:134) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$3$adapted(ExecutorPodsAllocator.scala:134) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.org$apache$spark$scheduler$cluster$k8s$ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber$$processSnapshotsInternal(ExecutorPodsSnapshotsStoreImpl.scala:143) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl$SnapshotsSubscriber.processSnapshots(ExecutorPodsSnapshotsStoreImpl.scala:131) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsSnapshotsStoreImpl.$anonfun$addSubscriber$1(ExecutorPodsSnapshotsStoreImpl.scala:85) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) As a part of troubleshooting, I tried running the below curl command using the same service account user token and got the results. curl -X GET https://my-k8s-cluster:6443/api --header "Authorization: Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6IjdWOXgwTjdIeUdCTGx2eEItOXZ3eDlSV1I1UXd1d0MtTXJENFBhXzNDTTgifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJkeW54LWNlbnRlci1yZXNvdXJjZXMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoic3Bhcmstc2EiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoic3Bhcmstc2EiLCJrdWJlcm5ldGVzxxxxxxxxxxxxxxxxxxxxxmMS03NzI5LTQ5OTAtYWZkOC1mYjZiNzU4ZDg5YzAiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6ZHlueC1jZW50ZXItcmVzb3VyY2VzOnNwYXJrLXNhIn0.TWAQYmu_N-N1gnZ1hYYn_wvavs9f9w33v0P0Kgchd1eETO8TpHlYS_JSt8jzWlX6C4JF293Q8VRk8p1Nx3zRdqjZnYWmMvJYCaq5mBAyvXAW8fXW_ZtQD7HJPUEUb2ZDXUz3b2XLgvJoWui8vhqZBYUev67YgHHRspgkwDbLrRIB1oRPbx_2osYMQW3tPxoThyzUqdvyBij3hjW-syrsp_sR1ir-78XzIZpkV2OBFds7u8vd0IqoWLOtmnZwdq1RKCKtFk292VfWSbN0HYJUs_aJUeaqLpekopZLfDM2U_GT0ImwBUOL2EILpb-K1xdWr4-Jv4qPsFBLFh31S2OMAg" --insecure { "kind": "APIVersions", "versions": [ "v1" ], "serverAddressByClientCIDRs": [ { "clientCIDR": "0.0.0.0/0", "serverAddress": "10.14.3.19:6443" } ] }% However, if I run the spark-submit command via cluster deploy mode, it runs without any issue and produce the desired output. # ./bin/spark-submit \ --master k8s://https://my-k8s-cluster:6443 \ --deploy-mode cluster \ --name spark-poc \ --conf spark.executor.instances=2 \ --conf spark.kubernetes.container.image=my-docker-hub/spark_poc:v1.4 \ --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \ --conf spark.kubernetes.namespace=dynx-center-resources \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \ --conf spark.kubernetes.authenticate.submission.oauthToken=$K8S_TOKEN \ --class org.apache.spark.examples.SparkPi \ local:///opt/spark/examples/jars/spark-examples_2.12-3.4.3.jar 1000 Not sure what I'm missing. Appreciate any help on this.
... View more
Labels:
- Labels:
-
Apache Spark
12-27-2022
08:59 AM
Hello - I'm facing a connection issue with my Flink setup. My Flink cluster has 3 nodes with the exact configuration. In that task manager and job manager are not connecting/communicating with each other in a cluster. However, zookeeper was connected/communicating with each other. When it is configured as standalone setup, everything works however when the high-availability is set to zookeeper, the task manager and job manager does not communicate with each other. I can confirm the processes task manager and job manager is running however the job manager port 8081 is not begin to listen in 2 of the nodes. One of the node has this port began listening and I can able to view the UI from the browser. I can see 1 task manager which is running in the same node listed under available slots. Flink: 1.14.4 Zookeeper: 3.4.14 Appreciate any inputs/help on this. -Prabu.
... View more
Labels:
- Labels:
-
Apache Flink
09-12-2022
07:40 AM
Hi @araujo, My apologies for coming back late. The job reads from 2 Kafka sources and send the data to Kinesis stream (sink). Thanks, Prabu.
... View more
08-29-2022
12:19 PM
@araujo 1. The error is recurring in the logs. 2. To start/stop Flink, I use 'service flink-jobmanager start/stop'. 3. To start the Flink job, I use: $FLINK_HOME/bin/flink run -d /opt/flink-jobs/lib/job1.jar --pathToPropertyFile /opt/flink-jobs/conf/job1.properties 4. To cancel the Flink job, I get the ID from 'flink list' command and I use: $FLINK_HOME/bin/flink cancel <ID> Please let me know if you need anything. Appreciate your help on this. Thanks, Prabu.
... View more
08-16-2022
11:11 AM
Hello, I am running Flink 1.14.4 with 3 jobs running in it. JVM: openjdk version "1.8.0_312". All jobs are running without any issues (at least not obviously noticed) however, I see the below exception in the log. Im not able to understand the reason for the exception. Not sure what is the issue. Appreciate any help on this. Exception: java.nio.file.InvalidPathException: Nul character not allowed (full stack trace below) 2022-08-15 16:35:15.350 [flink-rest-server-netty-worker-thread-5] WARN org.apache.flink.runtime.rest.FileUploadHandler - File upload failed.
java.nio.file.InvalidPathException: Nul character not allowed: bash' '-c' #cmdlinux})).(#p new java.lang.ProcessBuilder(#cmds)).(#p.redirectErrorStream(true)).(#process #p.start()).(#ros (@org.apache.struts2.ServletActionContext@getResponse().getOutputStream())).(@org.apache.commons.io.IOUtils@copy(#process.getInputStream() #ros)).(#ros.flush())}b
at sun.nio.fs.UnixPath.checkNotNul(UnixPath.java:93)
at sun.nio.fs.UnixPath.normalizeAndCheck(UnixPath.java:83)
at sun.nio.fs.UnixPath.<init>(UnixPath.java:71)
at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
at sun.nio.fs.AbstractPath.resolve(AbstractPath.java:53)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:189)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:71)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748) Thanks, Prabu.
... View more
Labels:
- Labels:
-
Apache Flink
03-04-2022
06:18 AM
1 Kudo
Hello @araujo - Yes. That is the issue. From Java 11, we may need to explicitly set the keystore type. Thanks, Prabu
... View more
03-03-2022
08:32 AM
2 Kudos
Hi @araujo, I figured out the issue. It seems we have to provide the keystore type and truststore type along with the keystore and truststore file and passwords. Once I have them added to the source building code, I was able to connect to NiFi without any issues. Now my Flink flow is connected to NiFi and processing data. Sharing the NiFiSource creation logic for reference: org.apache.flink.streaming.api.functions.source.SourceFunction<org.apache.flink.streaming.connectors.nifi.NiFiDataPacket> nifiSource; SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() .urls(new HashSet(Arrays.asList(config.getString(clusterId + ".url").split(",")))) .portName(config.getString(clusterId + ".port")) .requestBatchCount(config.getInt("batch-count")) .truststoreFilename(config.getString("truststore-file")) .truststorePass(config.getString("truststore-password")) .keystoreFilename(config.getString("keystore-file")) .keystorePass(config.getString("keystore-password")) .keystoreType(KeystoreType.JKS) .truststoreType(KeystoreType.JKS) .buildConfig(); nifiSource = new org.apache.flink.streaming.connectors.nifi.NiFiSource(clientConfig); Appreciate your help and patience. Thanks, Prabu.
... View more
03-02-2022
07:26 AM
Hi @araujo - Have you had a chance to look into this? Please let me know for your thoughts/suggestions. Appreciate your help and patience. Thanks, Prabu. PS: Please ignore the reply on 02-25-2022 at 10:41 AM which was incomplete. I don't see a delete option to delete it.
... View more
02-25-2022
10:44 AM
Hi @araujo: Here you go: Andre: You upgraded Flink *and* NiFi versions at once. I don't think the Flink version would make a difference, but there's been some security changes in NiFi 1.15. I'm not sure how those would affect your case, but there's a possibility that they could require some additional configuration. Prabu: What would be the additional configuration that I may require? Please advise. Andre: The 401 error (authentication error) seen in your logs is probably related to the anonymous login. The anonymous login indicates to me that there's something wrong with your keystore. Are you able to share the keystore and truststore details I asked before? Prabu: The authentication request sent from Flink node does not have the user name filled in. It was NOT the case when I run Flink in 1.8.0 version. Please refer the below log. When running in Flink 1.8.0 version, instead of 'anonymous' it had the 'owner' of the certificate added. In my certificate, I set the nifi_node1 as owner and other Flink and NiFi nodes as SAN (SubjectAlternativeName). 2022-02-22 21:27:39,381 INFO [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 16.13.8.6 [<anonymous>] GET https://nifi_node1:8443/nifi-api/site-to-site Keystore details added at the bottom. Andre: It's also weird that you had to add the truststore contents to the default Java cacerts. You should not need to do that if the truststore was being passed correctly to the job. Prabu: I did add the truststore to default Java cacerts as a part of troubleshooting the issue. The SSLHandshakeException issue resolved once the keystore is imported to default Java cacerts. I do set the truststore using nifi.truststore-file property and nifi.keystore-file property to set keystore file. Andre: Are you running your job in cluster mode? Is that on YARN? Wherever that job is running you must ensure that the keystore and truststore are available for the job to read them, at the right location. Prabu: Ideally I will run the Flink and NiFi in separate cluster. To resolve the issues, Im currently running Flink in standalone node with NiFi running in cluster (3 nodes). I have the keystore and truststore files are in place and configured correctly. I have had the same configuration when my Flink was running in 1.8.0 version where I did not face this issue. Andre: Could you please also share the command line that you use to submit the job? Prabu: I submit the job by running: FLINK_HOME/bin/flink run -d BASE_DIR/flows/FLOW_NAME/FLOW_NAME-FLOW_VERSION.jar --pathToPropertyFile BASE_DIR/flows/FLOW_NAME/FLOW_NAME.properties. The truststore and keystore properties are set in FLOW_NAME.properties file. [prabu@prabus prabu]# keytool -list -v -keystore nifi-keystore.jks
Enter keystore password:
Keystore type: jks
Keystore provider: SUN
Your keystore contains 2 entries
Alias name: my-nifi
Creation date: Feb 22, 2022
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=nifi_node1, O=xxxxx, L=yyyyy, ST=zzzz, C=US
Issuer: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: bdcb82652ab865e438a32e6fca785deb
Valid from: Fri Feb 18 00:00:00 UTC 2022 until: Sat Feb 18 23:59:59 UTC 2023
Certificate fingerprints:
SHA1: 6B:8C:65:89:2A:FA:A6:11:0C:FC:22:DA:B9:70:22:25:16:88:9C:20
SHA256: 93:C2:62:34:CD:88:EA:6B:F6:7F:CF:0C:29:1D:96:3D:20:41:AB:D8:68:AC:D5:31:72:46:55:48:F3:D6:6C:FF
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
Extensions:
#1: ObjectId: 1.3.6.1.4.1.11129.2.4.2 Criticality=false
0000: 04 82 01 69 01 67 00 76 00 AD F7 BE FA 7C FF 10 ...i.g.v........
0010: C8 8B 9D 3D 9C 1E 3E 18 6A B4 67 29 5D CF B1 0C ...=..>.j.g)]...
0020: 24 CA 85 86 34 EB DC 82 8A 00 00 01 7F 0E 76 7D $...4.........v.
0030: 71 00 00 04 03 00 47 30 45 02 21 00 A6 B5 8D 45 q.....G0E.!....E
0040: E2 A7 7B 59 88 B4 21 26 6E 10 89 D0 FA 8F 22 75 ...Y..!&n....."u
0050: 61 20 0F 33 BD 98 C3 E4 33 A6 D7 EF 02 20 77 E7 a .3....3.... w.
0060: FE 4A C3 5A 01 97 A0 92 A8 92 E2 67 5A E7 14 53 .J.Z.......gZ..S
0070: 7B 54 86 C0 BD D1 4C 47 F2 A6 8C CA 9C FB 00 75 .T....LG.......u
0080: 00 7A 32 8C 54 D8 B7 2D B6 20 EA 38 E0 52 1E E9 .z2.T..-. .8.R..
0090: 84 16 70 32 13 85 4D 3B D2 2B C1 3A 57 A3 52 EB ..p2..M;.+.:W.R.
00A0: 52 00 00 01 7F 0E 76 7D 2C 00 00 04 03 00 46 30 R.....v.,.....F0
00B0: 44 02 20 2F B5 FE 82 E0 9D CC 14 27 44 34 68 0E D. /.......'D4h.
00C0: 0F F5 15 20 A6 F7 F4 67 98 FC E9 9F 7F 82 46 9A ... ...g......F.
00D0: B6 43 3A 02 20 4D 74 21 1A 90 8A B2 44 4E 3D 0E .C:. Mt!....DN=.
00E0: EA 56 9B 9B 71 25 2D 04 AE A9 D9 9D AC 96 DC FD .V..q%-.........
00F0: 86 92 47 7A AD 00 76 00 E8 3E D0 DA 3E F5 06 35 ..Gz..v..>..>..5
0100: 32 E7 57 28 BC 89 6B C9 03 D3 CB D1 11 6B EC EB 2.W(..k......k..
0110: 69 E1 77 7D 6D 06 BD 6E 00 00 01 7F 0E 76 7D 07 i.w.m..n.....v..
0120: 00 00 04 03 00 47 30 45 02 20 22 F9 A7 9A C8 C5 .....G0E. ".....
0130: 60 6D 25 80 14 F2 6C 78 24 82 24 72 0F 88 24 45 `m%...lx$.$r..$E
0140: EE 5D 86 68 79 BC 7C A0 B4 FD 02 21 00 EA 19 9D .].hy......!....
0150: 91 81 F5 EC 90 8C AB 0C D1 09 80 30 72 4C 1D CF ...........0rL..
0160: DD 2C 7E 7B FD 3E EC BE 01 9D 60 AD 0C .,...>....`..
#2: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
[
accessMethod: caIssuers
accessLocation: URIName: http://crt.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crt
,
accessMethod: ocsp
accessLocation: URIName: http://ocsp.comodoca.com
]
]
#3: ObjectId: 2.5.9.55 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 9A F3 2B DA CF AD 4F B6 2F BB 2A 48 48 2A 12 B7 ..+...O./.*HH*..
0010: 1B 42 C1 24 .B.$
]
]
#4: ObjectId: 2.5.9.95 Criticality=true
BasicConstraints:[
CA:false
PathLen: undefined
]
#5: ObjectId: 2.5.9.85 Criticality=false
CRLDistributionPoints [
[DistributionPoint:
[URIName: http://crl.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crl]
]]
#6: ObjectId: 2.5.9.23 Criticality=false
CertificatePolicies [
[CertificatePolicyId: [1.3.6.1.4.1.6449.1.2.1.3.4]
[PolicyQualifierInfo: [
qualifierID: 1.3.6.1.7.5.7.2.1
qualifier: 0000: 16 12 68 74 74 70 73 3A 2F 2F 73 65 63 74 69 67 ..https://sectig
0010: 6F 2E 63 6F 6D 2F 43 50 53 o.com/CPS
]] ]
[CertificatePolicyId: [2.23.140.1.2.2]
[] ]
]
#7: ObjectId: 2.5.9.3 Criticality=false
ExtendedKeyUsages [
serverAuth
clientAuth
]
#8: ObjectId: 2.5.9.5 Criticality=true
KeyUsage [
DigitalSignature
Key_Encipherment
]
#9: ObjectId: 2.5.9.7 Criticality=false
SubjectAlternativeName [
DNSName: nifi_node1
DNSName: flink_node1
DNSName: flink_node2
DNSName: flink_node3
DNSName: nifi_node2
DNSName: nifi_node3
]
#10: ObjectId: 2.5.9.4 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: F6 EF 7D 85 31 FA 04 A9 12 F8 BB 1D DB 6D 24 B0 ....1........m$.
0010: 1E 7D 95 31 ...1
]
]
*******************************************
*******************************************
Alias name: inter
Creation date: Feb 22, 2022
Entry type: trustedCertEntry
Owner: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Issuer: CN=COMODO RSA Certification Authority, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: 36825e7fb5a481937a8a9b9f6d1736bb93ca6
Valid from: Wed Feb 12 00:00:00 UTC 2014 until: Sun Feb 11 23:59:59 UTC 2029
Certificate fingerprints:
SHA1: 10:4C:63:D2:54:6B:80:21:DD:10:5E:9F:BA:5A:8D:78:16:9F:6B:32
SHA256: 11:10:06:37:8A:FB:E8:E9:9B:B0:2B:A8:73:90:CA:42:9F:CA:27:73:F7:4D:7F:7E:B5:74:4D:AD:DF:68:01:4B
Signature algorithm name: SHA384withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
Extensions:
#1: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
[
accessMethod: caIssuers
accessLocation: URIName: http://crt.comodoca.com/COMODORSAAddTrustCA.crt
,
accessMethod: ocsp
accessLocation: URIName: http://ocsp.comodoca.com
]
]
#2: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: BB AF 7E 02 3D FB C6 F1 3C 84 9E AD EE 38 98 EC ....=...<....8..
0010: D9 32 32 D4 .22.
]
]
#3: ObjectId: 2.5.23.11 Criticality=true
BasicConstraints:[
CA:true
PathLen:0
]
#4: ObjectId: 2.5.29.31 Criticality=false
CRLDistributionPoints [
[DistributionPoint:
[URIName: http://crl.comodoca.com/COMODORSACertificationAuthority.crl]
]]
#5: ObjectId: 2.5.29.32 Criticality=false
CertificatePolicies [
[CertificatePolicyId: [2.5.2.3.0]
[] ]
[CertificatePolicyId: [2.23.14.1.2.2]
[] ]
]
#6: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
serverAuth
clientAuth
]
#7: ObjectId: 2.5.29.15 Criticality=true
KeyUsage [
DigitalSignature
Key_CertSign
Crl_Sign
]
#8: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 9C F3 2B DD CD BD 4F B6 2F BB 2A 48 48 2A 12 B7 ..+...O./.*HH*..
0010: 1B 42 C1 24 .B.$
]
]
*******************************************
*******************************************
Thanks, Prabu.
... View more
02-25-2022
10:41 AM
Andre: You upgraded Flink *and* NiFi versions at once. I don't think the Flink version would make a difference, but there's been some security changes in NiFi 1.15. I'm not sure how those would affect your case, but there's a possibility that they could require some additional configuration. Prabu: What would be the additional configuration that I may require? Please advise. Andre: The 401 error (authentication error) seen in your logs is probably related to the anonymous login. The anonymous login indicates to me that there's something wrong with your keystore. Are you able to share the keystore and truststore details I asked before? Prabu: The authentication request sent from Flink node does not have the user name filled in. It was NOT the case when I run Flink in 1.8.0 version. Please refer the below log. When running in Flink 1.8.0 version, instead of 'anonymous' it had the 'owner' of the certificate added. In my certificate, I set the nifi_node1 as owner and other Flink and NiFi nodes as SAN (SubjectAlternativeName). Please see below for keystore details. 2022-02-22 21:27:39,381 INFO [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 16.13.8.6 [<anonymous>] GET https://nifi_node1:8443/nifi-api/site-to-site Andre: It's also weird that you had to add the truststore contents to the default Java cacerts. You should not need to do that if the truststore was being passed correctly to the job. Prabu: I did add the truststore to default Java cacerts as a part of troubleshooting the issue. The SSLHandshakeException issue resolved once the keystore is imported to default Java cacerts. I do set the truststore using nifi.truststore-file property and nifi.keystore-file property to set keystore file. Andre: Are you running your job in cluster mode? Is that on YARN? Wherever that job is running you must ensure that the keystore and truststore are available for the job to read them, at the right location. Prabu: Ideally I will run the Flink and NiFi in separate cluster. To resolve the issues, Im currently running Flink in standalone node with NiFi running in cluster (3 nodes). I have the keystore and truststore files are in place and configured correctly. I have had the same configuration when my Flink was running in 1.8.0 version where I did not face this issue. Andre: Could you please also share the command line that you use to submit the job? Prabu: I submit the job by running: FLINK_HOME/bin/flink run -d BASE_DIR/flows/FLOW_NAME/FLOW_NAME-FLOW_VERSION.jar --pathToPropertyFile BASE_DIR/flows/FLOW_NAME/FLOW_NAME.properties. The truststore and keystore properties are set in FLOW_NAME.properties file. [prabu@prabus prabu]# keytool -list -v -keystore nifi-keystore.jks
Enter keystore password:
Keystore type: jks
Keystore provider: SUN
Your keystore contains 2 entries
Alias name: my-nifi
Creation date: Feb 22, 2022
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=nifi_node1, O=xxxxx, L=yyyyy, ST=zzzz, C=US
Issuer: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: bdcb82652ab865e438a32e6fca785deb
Valid from: Fri Feb 18 00:00:00 UTC 2022 until: Sat Feb 18 23:59:59 UTC 2023
Certificate fingerprints:
SHA1: 6B:8C:65:89:2A:FA:A6:11:0C:FC:22:DA:B9:70:22:25:16:88:9C:20
SHA256: 93:C2:62:34:CD:88:EA:6B:F6:7F:CF:0C:29:1D:96:3D:20:41:AB:D8:68:AC:D5:31:72:46:55:48:F3:D6:6C:FF
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
Extensions:
#1: ObjectId: 1.3.6.1.4.1.11129.2.4.2 Criticality=false
0000: 04 82 01 69 01 67 00 76 00 AD F7 BE FA 7C FF 10 ...i.g.v........
0010: C8 8B 9D 3D 9C 1E 3E 18 6A B4 67 29 5D CF B1 0C ...=..>.j.g)]...
0020: 24 CA 85 86 34 EB DC 82 8A 00 00 01 7F 0E 76 7D $...4.........v.
0030: 71 00 00 04 03 00 47 30 45 02 21 00 A6 B5 8D 45 q.....G0E.!....E
0040: E2 A7 7B 59 88 B4 21 26 6E 10 89 D0 FA 8F 22 75 ...Y..!&n....."u
0050: 61 20 0F 33 BD 98 C3 E4 33 A6 D7 EF 02 20 77 E7 a .3....3.... w.
0060: FE 4A C3 5A 01 97 A0 92 A8 92 E2 67 5A E7 14 53 .J.Z.......gZ..S
0070: 7B 54 86 C0 BD D1 4C 47 F2 A6 8C CA 9C FB 00 75 .T....LG.......u
0080: 00 7A 32 8C 54 D8 B7 2D B6 20 EA 38 E0 52 1E E9 .z2.T..-. .8.R..
0090: 84 16 70 32 13 85 4D 3B D2 2B C1 3A 57 A3 52 EB ..p2..M;.+.:W.R.
00A0: 52 00 00 01 7F 0E 76 7D 2C 00 00 04 03 00 46 30 R.....v.,.....F0
00B0: 44 02 20 2F B5 FE 82 E0 9D CC 14 27 44 34 68 0E D. /.......'D4h.
00C0: 0F F5 15 20 A6 F7 F4 67 98 FC E9 9F 7F 82 46 9A ... ...g......F.
00D0: B6 43 3A 02 20 4D 74 21 1A 90 8A B2 44 4E 3D 0E .C:. Mt!....DN=.
00E0: EA 56 9B 9B 71 25 2D 04 AE A9 D9 9D AC 96 DC FD .V..q%-.........
00F0: 86 92 47 7A AD 00 76 00 E8 3E D0 DA 3E F5 06 35 ..Gz..v..>..>..5
0100: 32 E7 57 28 BC 89 6B C9 03 D3 CB D1 11 6B EC EB 2.W(..k......k..
0110: 69 E1 77 7D 6D 06 BD 6E 00 00 01 7F 0E 76 7D 07 i.w.m..n.....v..
0120: 00 00 04 03 00 47 30 45 02 20 22 F9 A7 9A C8 C5 .....G0E. ".....
0130: 60 6D 25 80 14 F2 6C 78 24 82 24 72 0F 88 24 45 `m%...lx$.$r..$E
0140: EE 5D 86 68 79 BC 7C A0 B4 FD 02 21 00 EA 19 9D .].hy......!....
0150: 91 81 F5 EC 90 8C AB 0C D1 09 80 30 72 4C 1D CF ...........0rL..
0160: DD 2C 7E 7B FD 3E EC BE 01 9D 60 AD 0C .,...>....`..
#2: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
[
accessMethod: caIssuers
accessLocation: URIName: http://crt.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crt
,
accessMethod: ocsp
accessLocation: URIName: http://ocsp.comodoca.com
]
]
#3: ObjectId: 2.5.9.55 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 9A F3 2B DA CF AD 4F B6 2F BB 2A 48 48 2A 12 B7 ..+...O./.*HH*..
0010: 1B 42 C1 24 .B.$
]
]
#4: ObjectId: 2.5.9.95 Criticality=true
BasicConstraints:[
CA:false
PathLen: undefined
]
#5: ObjectId: 2.5.9.85 Criticality=false
CRLDistributionPoints [
[DistributionPoint:
[URIName: http://crl.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crl]
]]
#6: ObjectId: 2.5.9.23 Criticality=false
CertificatePolicies [
[CertificatePolicyId: [1.3.6.1.4.1.6449.1.2.1.3.4]
[PolicyQualifierInfo: [
qualifierID: 1.3.6.1.7.5.7.2.1
qualifier: 0000: 16 12 68 74 74 70 73 3A 2F 2F 73 65 63 74 69 67 ..https://sectig
0010: 6F 2E 63 6F 6D 2F 43 50 53 o.com/CPS
]] ]
[CertificatePolicyId: [2.23.140.1.2.2]
[] ]
]
#7: ObjectId: 2.5.9.3 Criticality=false
ExtendedKeyUsages [
serverAuth
clientAuth
]
#8: ObjectId: 2.5.9.5 Criticality=true
KeyUsage [
DigitalSignature
Key_Encipherment
]
#9: ObjectId: 2.5.9.7 Criticality=false
SubjectAlternativeName [
DNSName: nifi_node1
DNSName: flink_node1
DNSName: flink_node2
DNSName: flink_node3
DNSName: nifi_node2
DNSName: nifi_node3
]
#10: ObjectId: 2.5.9.4 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: F6 EF 7D 85 31 FA 04 A9 12 F8 BB 1D DB 6D 24 B0 ....1........m$.
0010: 1E 7D 95 31 ...1
]
]
*******************************************
*******************************************
Alias name: inter
Creation date: Feb 22, 2022
Entry type: trustedCertEntry
Owner: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Issuer: CN=COMODO RSA Certification Authority, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: 36825e7fb5a481937a8a9b9f6d1736bb93ca6
Valid from: Wed Feb 12 00:00:00 UTC 2014 until: Sun Feb 11 23:59:59 UTC 2029
Certificate fingerprints:
SHA1: 10:4C:63:D2:54:6B:80:21:DD:10:5E:9F:BA:5A:8D:78:16:9F:6B:32
SHA256: 11:10:06:37:8A:FB:E8:E9:9B:B0:2B:A8:73:90:CA:42:9F:CA:27:73:F7:4D:7F:7E:B5:74:4D:AD:DF:68:01:4B
Signature algorithm name: SHA384withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3
Extensions:
#1: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
[
accessMethod: caIssuers
accessLocation: URIName: http://crt.comodoca.com/COMODORSAAddTrustCA.crt
,
accessMethod: ocsp
accessLocation: URIName: http://ocsp.comodoca.com
]
]
#2: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: BB AF 7E 02 3D FB C6 F1 3C 84 9E AD EE 38 98 EC ....=...<....8..
0010: D9 32 32 D4 .22.
]
]
#3: ObjectId: 2.5.23.11 Criticality=true
BasicConstraints:[
CA:true
PathLen:0
]
#4: ObjectId: 2.5.29.31 Criticality=false
CRLDistributionPoints [
[DistributionPoint:
[URIName: http://crl.comodoca.com/COMODORSACertificationAuthority.crl]
]]
#5: ObjectId: 2.5.29.32 Criticality=false
CertificatePolicies [
[CertificatePolicyId: [2.5.2.3.0]
[] ]
[CertificatePolicyId: [2.23.14.1.2.2]
[] ]
]
#6: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
serverAuth
clientAuth
]
#7: ObjectId: 2.5.29.15 Criticality=true
KeyUsage [
DigitalSignature
Key_CertSign
Crl_Sign
]
#8: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 9C F3 2B DD CD BD 4F B6 2F BB 2A 48 48 2A 12 B7 ..+...O./.*HH*..
0010: 1B 42 C1 24 .B.$
]
]
*******************************************
*******************************************
... View more