Support Questions
Find answers, ask questions, and share your expertise

Authentication Issue: X509CertificateExtractor No client certificate found in request.

Explorer

Hello all,

I am running into an issue with authentication when trying to connect my Flink (version 1.14.3) server with NiFi. NiFi 1.15.3 is running in secure mode in cluster with 3 nodes. When my Flink flow try to connect to NiFi, it throws:

 

Flink Log:

 

org.apache.flink.util.SerializedThrowable: Tried all cluster URLs but none of those was accessible. Last Exception was org.apache.nifi.remote.util.SiteToSiteRestApiClient$HttpGetFailedException: response code 401:Unauthorized with explanation: null
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:372)
	at org.apache.nifi.remote.client.SiteInfoProvider.refreshRemoteInfo(SiteInfoProvider.java:69)
	at org.apache.nifi.remote.client.SiteInfoProvider.getPortIdentifier(SiteInfoProvider.java:220)
	at org.apache.nifi.remote.client.SiteInfoProvider.getOutputPortIdentifier(SiteInfoProvider.java:204)
	at org.apache.nifi.remote.client.socket.SocketClient.getPortIdentifier(SocketClient.java:79)
	at org.apache.nifi.remote.client.socket.SocketClient.createTransaction(SocketClient.java:121)
	at org.apache.flink.streaming.connectors.nifi.NiFiSource.run(NiFiSource.java:91)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Caused by: org.apache.flink.util.SerializedThrowable: response code 401:Unauthorized with explanation: null
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.execute(SiteToSiteRestApiClient.java:1203)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.execute(SiteToSiteRestApiClient.java:1237)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.fetchController(SiteToSiteRestApiClient.java:419)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:394)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:361)
	... 9 common frames omitted

 

In my NiFi logs:

 

2022-02-22 21:27:39,381 DEBUG [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authenticating [null]
2022-02-22 21:27:39,381 DEBUG [NiFi Web Server-1203] o.a.n.w.s.x509.X509CertificateExtractor No client certificate found in request.
2022-02-22 21:27:39,381 DEBUG [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authenticating [null]
2022-02-22 21:27:39,381 DEBUG [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authenticating [null]
2022-02-22 21:27:39,381 INFO [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 16.13.8.6 [<anonymous>] GET https://nifi_node1:8443/nifi-api/site-to-site
2022-02-22 21:27:39,381 WARN [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Failed 16.13.8.6 GET https://nifi_node1:8443/nifi-api/site-to-site [Anonymous authentication has not been configured.]
2022-02-22 21:27:39,382 DEBUG [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Failed
org.apache.nifi.web.security.InvalidAuthenticationException: Anonymous authentication has not been configured.
	at org.apache.nifi.web.security.anonymous.NiFiAnonymousAuthenticationProvider.authenticate(NiFiAnonymousAuthenticationProvider.java:46)
	at org.springframework.security.authentication.ProviderManager.authenticate(ProviderManager.java:182)
	at org.apache.nifi.web.security.NiFiAuthenticationFilter.authenticate(NiFiAuthenticationFilter.java:73)
	at org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:56)
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:336)
	at org.apache.nifi.web.security.NiFiAuthenticationFilter.authenticate(NiFiAuthenticationFilter.java:94)
	at org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:56)
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:336)
	at org.springframework.security.oauth2.server.resource.web.BearerTokenAuthenticationFilter.doFilterInternal(BearerTokenAuthenticationFilter.java:121)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:336)
	at org.apache.nifi.web.security.NiFiAuthenticationFilter.authenticate(NiFiAuthenticationFilter.java:94)
	at org.apache.nifi.web.security.NiFiAuthenticationFilter.doFilter(NiFiAuthenticationFilter.java:56)
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:336)
	at org.springframework.security.web.csrf.CsrfFilter.doFilterInternal(CsrfFilter.java:117)
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
	at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:336)
	at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:211)
	at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:183)
	at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
	at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.filter.TimerFilter.doFilter(TimerFilter.java:51)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.filter.ExceptionFilter.doFilter(ExceptionFilter.java:46)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:201)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.eclipse.jetty.servlets.DoSFilter.doFilterChain(DoSFilter.java:487)
	at org.eclipse.jetty.servlets.DoSFilter.doFilter(DoSFilter.java:336)
	at org.eclipse.jetty.servlets.DoSFilter.doFilter(DoSFilter.java:301)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.security.headers.StrictTransportSecurityFilter.doFilter(StrictTransportSecurityFilter.java:48)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.security.headers.XContentTypeOptionsFilter.doFilter(XContentTypeOptionsFilter.java:48)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.security.headers.XSSProtectionFilter.doFilter(XSSProtectionFilter.java:48)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.security.headers.ContentSecurityPolicyFilter.doFilter(ContentSecurityPolicyFilter.java:47)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.apache.nifi.web.security.headers.XFrameOptionsFilter.doFilter(XFrameOptionsFilter.java:48)
	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
	at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:600)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
	at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763)
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:191)
	at org.eclipse.jetty.server.handler.HandlerList.handle(HandlerList.java:59)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.Server.handle(Server.java:516)
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:400)
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:645)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:392)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:555)
	at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:410)
	at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:164)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
	at java.lang.Thread.run(Thread.java:748)

 

NiFi Properties (grepped for http and pasted the output):

 

nifi.remote.input.http.enabled=false
nifi.remote.input.http.transaction.ttl=30 sec
nifi.web.http.host=
nifi.web.http.port=
nifi.web.http.network.interface.default=
nifi.web.https.host=nifi_node1
nifi.web.https.port=8443
nifi.web.https.network.interface.default=
nifi.web.https.ciphersuites.include=
nifi.web.https.ciphersuites.exclude=
nifi.security.user.saml.signature.algorithm=http://www.w3.org/2001/04/xmldsig-more#rsa-sha256
nifi.security.user.saml.signature.digest.algorithm=http://www.w3.org/2001/04/xmlenc#sha256
nifi.security.user.saml.http.client.truststore.strategy=JDK
nifi.security.user.saml.http.client.connect.timeout=30 secs
nifi.security.user.saml.http.client.read.timeout=30 secs

 

I'm using same certificates in all NiFi nodes and Flink node. The certs are all in place and I confirmed there is one PrivateKeyEntry with ExtendedKeyUsages set to serverAuth and clientAuth. I searched for solution and I could not able to find one in the forum.

Note: The IP: 16.13.8.6 in the log is my Flink node

Please let me know what I'm missing.

Appreciate any help on this!

Thanks,
Prabu.

 

 

1 ACCEPTED SOLUTION

Explorer

Hi @araujo,

 

I figured out the issue. It seems we have to provide the keystore type and truststore type along with the keystore and truststore file and passwords. Once I have them added to the source building code, I was able to connect to NiFi without any issues. Now my Flink flow is connected to NiFi and processing data.

 

Sharing the NiFiSource creation logic for reference:

org.apache.flink.streaming.api.functions.source.SourceFunction<org.apache.flink.streaming.connectors.nifi.NiFiDataPacket> nifiSource;
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.urls(new HashSet(Arrays.asList(config.getString(clusterId + ".url").split(","))))
.portName(config.getString(clusterId + ".port"))
.requestBatchCount(config.getInt("batch-count"))
.truststoreFilename(config.getString("truststore-file"))
.truststorePass(config.getString("truststore-password"))
.keystoreFilename(config.getString("keystore-file"))
.keystorePass(config.getString("keystore-password"))
.keystoreType(KeystoreType.JKS)
.truststoreType(KeystoreType.JKS)
.buildConfig();
nifiSource = new org.apache.flink.streaming.connectors.nifi.NiFiSource(clientConfig);

 

Appreciate your help and patience.

 

Thanks,

Prabu. 

View solution in original post

15 REPLIES 15

Master Collaborator

Hi, @spserd ,

 

I believe this issue is due to your Flink configuration, not NiFi.

Your Flink job must be configured to use a keystore, besides the truststore. The keystore will be used by the Flink application to authenticate with the NiFi nodes.

 

Can you please check (and share) the Flink job configuration?

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Explorer

Hi @araujo ,

I do use the keystore and truststore in my Flink job configuration.

Here are the Flink job configuration:

 

flow-version=1.2.1
flow-name=lease-flow
flow.parallelism=2
flow.check-pointing-interval=180000

nifi.batch-count=10
nifi.central.cluster-name=NiFi
nifi.central.url=https://nifi_node1:8443/nifi,https://nifi_node2:8443/nifi,https://nifi_node3:8443/nifi
nifi.central.port=LogAudit
nifi.keystore-password=xxxxx
nifi.truststore-password=yyyyy
nifi.keystore-file=/home/prabu/cyclone-nifi-keystore.jks
nifi.truststore-file=/home/prabu/truststore.jks

 

Also, I do have another Flink 1.8.0 version running in another cluster which connects to NiFi 1.15.3 with no issues. This cluster Im upgrading the Flink to 1.14.3 and facing this issue.

 

Appreciate your help!

Thanks,

Prabu.

Master Collaborator

@spserd ,

 

Would you be able to share the output of these commands:

keytool -keystore /home/prabu/cyclone-nifi-keystore.jks -list -v
keytool -keystore /home/prabu/truststore.jks -list -v

 

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Explorer

Hello @araujo,

 

I will share the keytool details soon. I noticed that the Flink 1.14.3 sends the S2S request with anonymous user.

NiFi user log:

2022-02-23 15:59:00,166 INFO [NiFi Web Server-16] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 15.17.8.6 [<anonymous>] GET https://nifi_node2:8443/nifi-api/site-to-site
2022-02-23 15:59:00,171 WARN [NiFi Web Server-16] o.a.n.w.s.NiFiAuthenticationFilter Authentication Failed 15.17.8.6 GET https://nifi_node2:8443/nifi-api/site-to-site [Anonymous authentication has not been configured.]
2022-02-23 15:59:00,173 DEBUG [NiFi Web Server-16] o.a.n.w.s.NiFiAuthenticationFilter Authentication Failed

 

I downgraded the Flink version to 1.8.0 and with same certificate and NiFi conf. I noticed this time Flink sends S2S with user (owner of the certificate). I get the below logs in NiFi user log.

 

2022-02-23 15:29:30,650 INFO [NiFi Web Server-17] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 15.17.8.6 [CN=nifi_node1, O=xxxx, L=yyyy, ST=zzzz, C=US] GET https://nifi_node2:8443/nifi-api/site-to-site
2022-02-23 15:29:30,650 INFO [NiFi Web Server-17] o.a.n.w.s.NiFiAuthenticationFilter Authentication Success [CN=nifi_node1, O=xxxx, L=yyyy, ST=zzzz, C=US] 15.17.8.6 GET https://nifi_node2:8443/nifi-api/site-to-site

 

Note: I have created certificate with nifi_node1 as owner with other NiFi and Flink nodes as alternative hosts.

 

My question is why the Flink 1.14.3 version does not send the user id when it tries to authenticate itself with the NiFi? Is something changed with authentication logic wise between 1.8.0 and 1.14.3?

 

Appreciate your help and patience!

 

Thanks,

Prabu.

Master Collaborator

@spserd 

 

Interesting findings. Could you please share the differences in you pom file between the 2 runs? What are the Flink and NiFi artifacts you're using and their exact versions?

 

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Explorer

Hi @araujo,

 

Flink 1.8.0 with NiFi artifacts (org.apache.nifi) in 1.11.6  - SUCCESS case

Flink 1.14.3 with NiFi artifacts (org.apache.nifi) in 1.15.3 - FAILED case

 

I miss to specify, I do face the SSLHandshakeException when first started the Flink

javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target

So I added keystore and truststore to default Java's cacerts. Once keystores added and restarted, the SSLHandshakeException is not seen. After this I started seeing the below error (which I specified in my earlier reply as well)

 

org.apache.flink.util.SerializedThrowable: Tried all cluster URLs but none of those was accessible. Last Exception was org.apache.nifi.remote.util.SiteToSiteRestApiClient$HttpGetFailedException: response code 401:Unauthorized with explanation: null
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:370)
	at org.apache.nifi.remote.client.SiteInfoProvider.refreshRemoteInfo(SiteInfoProvider.java:69)
	at org.apache.nifi.remote.client.SiteInfoProvider.getPortIdentifier(SiteInfoProvider.java:220)
	at org.apache.nifi.remote.client.SiteInfoProvider.getOutputPortIdentifier(SiteInfoProvider.java:204)
	at org.apache.nifi.remote.client.socket.SocketClient.getPortIdentifier(SocketClient.java:79)
	at org.apache.nifi.remote.client.socket.SocketClient.createTransaction(SocketClient.java:121)
	at org.apache.flink.streaming.connectors.nifi.NiFiSource.run(NiFiSource.java:91)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Caused by: org.apache.flink.util.SerializedThrowable: response code 401:Unauthorized with explanation: null
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.execute(SiteToSiteRestApiClient.java:1177)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.execute(SiteToSiteRestApiClient.java:1211)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.fetchController(SiteToSiteRestApiClient.java:417)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:392)
	at org.apache.nifi.remote.util.SiteToSiteRestApiClient.getController(SiteToSiteRestApiClient.java:359)
	... 9 common frames omitted

 

Please let me know for any questions.

 

Appreciate your help!

Thanks,

Prabu.

Master Collaborator

Hi, @spserd ,

 

Interesting what you mentioned below:


Flink 1.8.0 with NiFi artifacts (org.apache.nifi) in 1.11.6  - SUCCESS case

Flink 1.14.3 with NiFi artifacts (org.apache.nifi) in 1.15.3 - FAILED case

You upgraded Flink *and* NiFi versions at once. I don't think the Flink version would make a difference, but there's been some security changes in NiFi 1.15. I'm not sure how those would affect your case, but there's a possibility that they could require some additional configuration.

 

The 401 error (authentication error) seen in your logs is probably related to the anonymous login. The anonymous login indicates to me that there's something wrong with your keystore. Are you able to share the keystore and truststore details I asked before?

 

It's also weird that you had to add the truststore contents to the default Java cacerts. You should not need to do that if the truststore was being passed correctly to the job.

 

Are you running your job in cluster mode? Is that on YARN? Wherever that job is running you must ensure that the keystore and truststore are available for the job to read them, at the right location.

 

Could you please also share the command line that you use to submit the job?

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Explorer

Andre: You upgraded Flink *and* NiFi versions at once. I don't think the Flink version would make a difference, but there's been some security changes in NiFi 1.15. I'm not sure how those would affect your case, but there's a possibility that they could require some additional configuration.

Prabu: What would be the additional configuration that I may require? Please advise.

 

Andre: The 401 error (authentication error) seen in your logs is probably related to the anonymous login. The anonymous login indicates to me that there's something wrong with your keystore. Are you able to share the keystore and truststore details I asked before?

Prabu: The authentication request sent from Flink node does not have the user name filled in. It was NOT the case when I run Flink in 1.8.0 version. Please refer the below log. When running in Flink 1.8.0 version, instead of 'anonymous' it had the 'owner' of the certificate added. In my certificate, I set the nifi_node1 as owner and other Flink and NiFi nodes as SAN (SubjectAlternativeName). Please see below for keystore details.

2022-02-22 21:27:39,381 INFO [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 16.13.8.6 [<anonymous>] GET https://nifi_node1:8443/nifi-api/site-to-site

 

Andre: It's also weird that you had to add the truststore contents to the default Java cacerts. You should not need to do that if the truststore was being passed correctly to the job.

Prabu: I did add the truststore to default Java cacerts as a part of troubleshooting the issue. The SSLHandshakeException issue resolved once the keystore is imported to default Java cacerts. I do set the truststore using nifi.truststore-file property and nifi.keystore-file property to set keystore file.

 

Andre: Are you running your job in cluster mode? Is that on YARN? Wherever that job is running you must ensure that the keystore and truststore are available for the job to read them, at the right location.

Prabu: Ideally I will run the Flink and NiFi in separate cluster. To resolve the issues, Im currently running Flink in standalone node with NiFi running in cluster (3 nodes). I have the keystore and truststore files are in place and configured correctly. I have had the same configuration when my Flink was running in 1.8.0 version where I did not face this issue.

 

Andre: Could you please also share the command line that you use to submit the job?

Prabu: I submit the job by running: FLINK_HOME/bin/flink run -d BASE_DIR/flows/FLOW_NAME/FLOW_NAME-FLOW_VERSION.jar --pathToPropertyFile BASE_DIR/flows/FLOW_NAME/FLOW_NAME.properties. The truststore and keystore properties are set in FLOW_NAME.properties file.

 

 

[prabu@prabus prabu]# keytool -list -v -keystore nifi-keystore.jks
Enter keystore password:  
Keystore type: jks
Keystore provider: SUN

Your keystore contains 2 entries

Alias name: my-nifi
Creation date: Feb 22, 2022
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=nifi_node1, O=xxxxx, L=yyyyy, ST=zzzz, C=US
Issuer: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: bdcb82652ab865e438a32e6fca785deb
Valid from: Fri Feb 18 00:00:00 UTC 2022 until: Sat Feb 18 23:59:59 UTC 2023
Certificate fingerprints:
	 SHA1: 6B:8C:65:89:2A:FA:A6:11:0C:FC:22:DA:B9:70:22:25:16:88:9C:20
	 SHA256: 93:C2:62:34:CD:88:EA:6B:F6:7F:CF:0C:29:1D:96:3D:20:41:AB:D8:68:AC:D5:31:72:46:55:48:F3:D6:6C:FF
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 1.3.6.1.4.1.11129.2.4.2 Criticality=false
0000: 04 82 01 69 01 67 00 76   00 AD F7 BE FA 7C FF 10  ...i.g.v........
0010: C8 8B 9D 3D 9C 1E 3E 18   6A B4 67 29 5D CF B1 0C  ...=..>.j.g)]...
0020: 24 CA 85 86 34 EB DC 82   8A 00 00 01 7F 0E 76 7D  $...4.........v.
0030: 71 00 00 04 03 00 47 30   45 02 21 00 A6 B5 8D 45  q.....G0E.!....E
0040: E2 A7 7B 59 88 B4 21 26   6E 10 89 D0 FA 8F 22 75  ...Y..!&n....."u
0050: 61 20 0F 33 BD 98 C3 E4   33 A6 D7 EF 02 20 77 E7  a .3....3.... w.
0060: FE 4A C3 5A 01 97 A0 92   A8 92 E2 67 5A E7 14 53  .J.Z.......gZ..S
0070: 7B 54 86 C0 BD D1 4C 47   F2 A6 8C CA 9C FB 00 75  .T....LG.......u
0080: 00 7A 32 8C 54 D8 B7 2D   B6 20 EA 38 E0 52 1E E9  .z2.T..-. .8.R..
0090: 84 16 70 32 13 85 4D 3B   D2 2B C1 3A 57 A3 52 EB  ..p2..M;.+.:W.R.
00A0: 52 00 00 01 7F 0E 76 7D   2C 00 00 04 03 00 46 30  R.....v.,.....F0
00B0: 44 02 20 2F B5 FE 82 E0   9D CC 14 27 44 34 68 0E  D. /.......'D4h.
00C0: 0F F5 15 20 A6 F7 F4 67   98 FC E9 9F 7F 82 46 9A  ... ...g......F.
00D0: B6 43 3A 02 20 4D 74 21   1A 90 8A B2 44 4E 3D 0E  .C:. Mt!....DN=.
00E0: EA 56 9B 9B 71 25 2D 04   AE A9 D9 9D AC 96 DC FD  .V..q%-.........
00F0: 86 92 47 7A AD 00 76 00   E8 3E D0 DA 3E F5 06 35  ..Gz..v..>..>..5
0100: 32 E7 57 28 BC 89 6B C9   03 D3 CB D1 11 6B EC EB  2.W(..k......k..
0110: 69 E1 77 7D 6D 06 BD 6E   00 00 01 7F 0E 76 7D 07  i.w.m..n.....v..
0120: 00 00 04 03 00 47 30 45   02 20 22 F9 A7 9A C8 C5  .....G0E. ".....
0130: 60 6D 25 80 14 F2 6C 78   24 82 24 72 0F 88 24 45  `m%...lx$.$r..$E
0140: EE 5D 86 68 79 BC 7C A0   B4 FD 02 21 00 EA 19 9D  .].hy......!....
0150: 91 81 F5 EC 90 8C AB 0C   D1 09 80 30 72 4C 1D CF  ...........0rL..
0160: DD 2C 7E 7B FD 3E EC BE   01 9D 60 AD 0C           .,...>....`..


#2: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
  [
   accessMethod: caIssuers
   accessLocation: URIName: http://crt.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crt
, 
   accessMethod: ocsp
   accessLocation: URIName: http://ocsp.comodoca.com
]
]

#3: ObjectId: 2.5.9.55 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 9A F3 2B DA CF AD 4F B6   2F BB 2A 48 48 2A 12 B7  ..+...O./.*HH*..
0010: 1B 42 C1 24                                        .B.$
]
]

#4: ObjectId: 2.5.9.95 Criticality=true
BasicConstraints:[
  CA:false
  PathLen: undefined
]

#5: ObjectId: 2.5.9.85 Criticality=false
CRLDistributionPoints [
  [DistributionPoint:
     [URIName: http://crl.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crl]
]]

#6: ObjectId: 2.5.9.23 Criticality=false
CertificatePolicies [
  [CertificatePolicyId: [1.3.6.1.4.1.6449.1.2.1.3.4]
[PolicyQualifierInfo: [
  qualifierID: 1.3.6.1.7.5.7.2.1
  qualifier: 0000: 16 12 68 74 74 70 73 3A   2F 2F 73 65 63 74 69 67  ..https://sectig
0010: 6F 2E 63 6F 6D 2F 43 50   53                       o.com/CPS

]]  ]
  [CertificatePolicyId: [2.23.140.1.2.2]
[]  ]
]

#7: ObjectId: 2.5.9.3 Criticality=false
ExtendedKeyUsages [
  serverAuth
  clientAuth
]

#8: ObjectId: 2.5.9.5 Criticality=true
KeyUsage [
  DigitalSignature
  Key_Encipherment
]

#9: ObjectId: 2.5.9.7 Criticality=false
SubjectAlternativeName [
  DNSName: nifi_node1
  DNSName: flink_node1
  DNSName: flink_node2
  DNSName: flink_node3
  DNSName: nifi_node2
  DNSName: nifi_node3
]

#10: ObjectId: 2.5.9.4 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: F6 EF 7D 85 31 FA 04 A9   12 F8 BB 1D DB 6D 24 B0  ....1........m$.
0010: 1E 7D 95 31                                        ...1
]
]



*******************************************
*******************************************


Alias name: inter
Creation date: Feb 22, 2022
Entry type: trustedCertEntry

Owner: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Issuer: CN=COMODO RSA Certification Authority, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: 36825e7fb5a481937a8a9b9f6d1736bb93ca6
Valid from: Wed Feb 12 00:00:00 UTC 2014 until: Sun Feb 11 23:59:59 UTC 2029
Certificate fingerprints:
	 SHA1: 10:4C:63:D2:54:6B:80:21:DD:10:5E:9F:BA:5A:8D:78:16:9F:6B:32
	 SHA256: 11:10:06:37:8A:FB:E8:E9:9B:B0:2B:A8:73:90:CA:42:9F:CA:27:73:F7:4D:7F:7E:B5:74:4D:AD:DF:68:01:4B
Signature algorithm name: SHA384withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
  [
   accessMethod: caIssuers
   accessLocation: URIName: http://crt.comodoca.com/COMODORSAAddTrustCA.crt
, 
   accessMethod: ocsp
   accessLocation: URIName: http://ocsp.comodoca.com
]
]

#2: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: BB AF 7E 02 3D FB C6 F1   3C 84 9E AD EE 38 98 EC  ....=...<....8..
0010: D9 32 32 D4                                        .22.
]
]

#3: ObjectId: 2.5.23.11 Criticality=true
BasicConstraints:[
  CA:true
  PathLen:0
]

#4: ObjectId: 2.5.29.31 Criticality=false
CRLDistributionPoints [
  [DistributionPoint:
     [URIName: http://crl.comodoca.com/COMODORSACertificationAuthority.crl]
]]

#5: ObjectId: 2.5.29.32 Criticality=false
CertificatePolicies [
  [CertificatePolicyId: [2.5.2.3.0]
[]  ]
  [CertificatePolicyId: [2.23.14.1.2.2]
[]  ]
]

#6: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
  serverAuth
  clientAuth
]

#7: ObjectId: 2.5.29.15 Criticality=true
KeyUsage [
  DigitalSignature
  Key_CertSign
  Crl_Sign
]

#8: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 9C F3 2B DD CD BD 4F B6   2F BB 2A 48 48 2A 12 B7  ..+...O./.*HH*..
0010: 1B 42 C1 24                                        .B.$
]
]



*******************************************
*******************************************

Explorer

Hi @araujo: Here you go:

 

Andre: You upgraded Flink *and* NiFi versions at once. I don't think the Flink version would make a difference, but there's been some security changes in NiFi 1.15. I'm not sure how those would affect your case, but there's a possibility that they could require some additional configuration.

Prabu: What would be the additional configuration that I may require? Please advise.

 

Andre: The 401 error (authentication error) seen in your logs is probably related to the anonymous login. The anonymous login indicates to me that there's something wrong with your keystore. Are you able to share the keystore and truststore details I asked before?

Prabu: The authentication request sent from Flink node does not have the user name filled in. It was NOT the case when I run Flink in 1.8.0 version. Please refer the below log. When running in Flink 1.8.0 version, instead of 'anonymous' it had the 'owner' of the certificate added. In my certificate, I set the nifi_node1 as owner and other Flink and NiFi nodes as SAN (SubjectAlternativeName).

2022-02-22 21:27:39,381 INFO [NiFi Web Server-1203] o.a.n.w.s.NiFiAuthenticationFilter Authentication Started 16.13.8.6 [<anonymous>] GET https://nifi_node1:8443/nifi-api/site-to-site

 

Keystore details added at the bottom.

 

Andre: It's also weird that you had to add the truststore contents to the default Java cacerts. You should not need to do that if the truststore was being passed correctly to the job.

Prabu: I did add the truststore to default Java cacerts as a part of troubleshooting the issue. The SSLHandshakeException issue resolved once the keystore is imported to default Java cacerts. I do set the truststore using nifi.truststore-file property and nifi.keystore-file property to set keystore file.

 

Andre: Are you running your job in cluster mode? Is that on YARN? Wherever that job is running you must ensure that the keystore and truststore are available for the job to read them, at the right location.

Prabu: Ideally I will run the Flink and NiFi in separate cluster. To resolve the issues, Im currently running Flink in standalone node with NiFi running in cluster (3 nodes). I have the keystore and truststore files are in place and configured correctly. I have had the same configuration when my Flink was running in 1.8.0 version where I did not face this issue.

 

Andre: Could you please also share the command line that you use to submit the job?

Prabu: I submit the job by running: FLINK_HOME/bin/flink run -d BASE_DIR/flows/FLOW_NAME/FLOW_NAME-FLOW_VERSION.jar --pathToPropertyFile BASE_DIR/flows/FLOW_NAME/FLOW_NAME.properties. The truststore and keystore properties are set in FLOW_NAME.properties file.

 

[prabu@prabus prabu]# keytool -list -v -keystore nifi-keystore.jks
Enter keystore password:  
Keystore type: jks
Keystore provider: SUN

Your keystore contains 2 entries

Alias name: my-nifi
Creation date: Feb 22, 2022
Entry type: PrivateKeyEntry
Certificate chain length: 1
Certificate[1]:
Owner: CN=nifi_node1, O=xxxxx, L=yyyyy, ST=zzzz, C=US
Issuer: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: bdcb82652ab865e438a32e6fca785deb
Valid from: Fri Feb 18 00:00:00 UTC 2022 until: Sat Feb 18 23:59:59 UTC 2023
Certificate fingerprints:
	 SHA1: 6B:8C:65:89:2A:FA:A6:11:0C:FC:22:DA:B9:70:22:25:16:88:9C:20
	 SHA256: 93:C2:62:34:CD:88:EA:6B:F6:7F:CF:0C:29:1D:96:3D:20:41:AB:D8:68:AC:D5:31:72:46:55:48:F3:D6:6C:FF
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 1.3.6.1.4.1.11129.2.4.2 Criticality=false
0000: 04 82 01 69 01 67 00 76   00 AD F7 BE FA 7C FF 10  ...i.g.v........
0010: C8 8B 9D 3D 9C 1E 3E 18   6A B4 67 29 5D CF B1 0C  ...=..>.j.g)]...
0020: 24 CA 85 86 34 EB DC 82   8A 00 00 01 7F 0E 76 7D  $...4.........v.
0030: 71 00 00 04 03 00 47 30   45 02 21 00 A6 B5 8D 45  q.....G0E.!....E
0040: E2 A7 7B 59 88 B4 21 26   6E 10 89 D0 FA 8F 22 75  ...Y..!&n....."u
0050: 61 20 0F 33 BD 98 C3 E4   33 A6 D7 EF 02 20 77 E7  a .3....3.... w.
0060: FE 4A C3 5A 01 97 A0 92   A8 92 E2 67 5A E7 14 53  .J.Z.......gZ..S
0070: 7B 54 86 C0 BD D1 4C 47   F2 A6 8C CA 9C FB 00 75  .T....LG.......u
0080: 00 7A 32 8C 54 D8 B7 2D   B6 20 EA 38 E0 52 1E E9  .z2.T..-. .8.R..
0090: 84 16 70 32 13 85 4D 3B   D2 2B C1 3A 57 A3 52 EB  ..p2..M;.+.:W.R.
00A0: 52 00 00 01 7F 0E 76 7D   2C 00 00 04 03 00 46 30  R.....v.,.....F0
00B0: 44 02 20 2F B5 FE 82 E0   9D CC 14 27 44 34 68 0E  D. /.......'D4h.
00C0: 0F F5 15 20 A6 F7 F4 67   98 FC E9 9F 7F 82 46 9A  ... ...g......F.
00D0: B6 43 3A 02 20 4D 74 21   1A 90 8A B2 44 4E 3D 0E  .C:. Mt!....DN=.
00E0: EA 56 9B 9B 71 25 2D 04   AE A9 D9 9D AC 96 DC FD  .V..q%-.........
00F0: 86 92 47 7A AD 00 76 00   E8 3E D0 DA 3E F5 06 35  ..Gz..v..>..>..5
0100: 32 E7 57 28 BC 89 6B C9   03 D3 CB D1 11 6B EC EB  2.W(..k......k..
0110: 69 E1 77 7D 6D 06 BD 6E   00 00 01 7F 0E 76 7D 07  i.w.m..n.....v..
0120: 00 00 04 03 00 47 30 45   02 20 22 F9 A7 9A C8 C5  .....G0E. ".....
0130: 60 6D 25 80 14 F2 6C 78   24 82 24 72 0F 88 24 45  `m%...lx$.$r..$E
0140: EE 5D 86 68 79 BC 7C A0   B4 FD 02 21 00 EA 19 9D  .].hy......!....
0150: 91 81 F5 EC 90 8C AB 0C   D1 09 80 30 72 4C 1D CF  ...........0rL..
0160: DD 2C 7E 7B FD 3E EC BE   01 9D 60 AD 0C           .,...>....`..


#2: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
  [
   accessMethod: caIssuers
   accessLocation: URIName: http://crt.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crt
, 
   accessMethod: ocsp
   accessLocation: URIName: http://ocsp.comodoca.com
]
]

#3: ObjectId: 2.5.9.55 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: 9A F3 2B DA CF AD 4F B6   2F BB 2A 48 48 2A 12 B7  ..+...O./.*HH*..
0010: 1B 42 C1 24                                        .B.$
]
]

#4: ObjectId: 2.5.9.95 Criticality=true
BasicConstraints:[
  CA:false
  PathLen: undefined
]

#5: ObjectId: 2.5.9.85 Criticality=false
CRLDistributionPoints [
  [DistributionPoint:
     [URIName: http://crl.comodoca.com/COMODORSAOrganizationValidationSecureServerCA.crl]
]]

#6: ObjectId: 2.5.9.23 Criticality=false
CertificatePolicies [
  [CertificatePolicyId: [1.3.6.1.4.1.6449.1.2.1.3.4]
[PolicyQualifierInfo: [
  qualifierID: 1.3.6.1.7.5.7.2.1
  qualifier: 0000: 16 12 68 74 74 70 73 3A   2F 2F 73 65 63 74 69 67  ..https://sectig
0010: 6F 2E 63 6F 6D 2F 43 50   53                       o.com/CPS

]]  ]
  [CertificatePolicyId: [2.23.140.1.2.2]
[]  ]
]

#7: ObjectId: 2.5.9.3 Criticality=false
ExtendedKeyUsages [
  serverAuth
  clientAuth
]

#8: ObjectId: 2.5.9.5 Criticality=true
KeyUsage [
  DigitalSignature
  Key_Encipherment
]

#9: ObjectId: 2.5.9.7 Criticality=false
SubjectAlternativeName [
  DNSName: nifi_node1
  DNSName: flink_node1
  DNSName: flink_node2
  DNSName: flink_node3
  DNSName: nifi_node2
  DNSName: nifi_node3
]

#10: ObjectId: 2.5.9.4 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: F6 EF 7D 85 31 FA 04 A9   12 F8 BB 1D DB 6D 24 B0  ....1........m$.
0010: 1E 7D 95 31                                        ...1
]
]



*******************************************
*******************************************


Alias name: inter
Creation date: Feb 22, 2022
Entry type: trustedCertEntry

Owner: CN=COMODO RSA Organization Validation Secure Server CA, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Issuer: CN=COMODO RSA Certification Authority, O=COMODO CA Limited, L=Salford, ST=Greater Manchester, C=GB
Serial number: 36825e7fb5a481937a8a9b9f6d1736bb93ca6
Valid from: Wed Feb 12 00:00:00 UTC 2014 until: Sun Feb 11 23:59:59 UTC 2029
Certificate fingerprints:
	 SHA1: 10:4C:63:D2:54:6B:80:21:DD:10:5E:9F:BA:5A:8D:78:16:9F:6B:32
	 SHA256: 11:10:06:37:8A:FB:E8:E9:9B:B0:2B:A8:73:90:CA:42:9F:CA:27:73:F7:4D:7F:7E:B5:74:4D:AD:DF:68:01:4B
Signature algorithm name: SHA384withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions: 

#1: ObjectId: 1.3.6.1.5.5.7.1.1 Criticality=false
AuthorityInfoAccess [
  [
   accessMethod: caIssuers
   accessLocation: URIName: http://crt.comodoca.com/COMODORSAAddTrustCA.crt
, 
   accessMethod: ocsp
   accessLocation: URIName: http://ocsp.comodoca.com
]
]

#2: ObjectId: 2.5.29.35 Criticality=false
AuthorityKeyIdentifier [
KeyIdentifier [
0000: BB AF 7E 02 3D FB C6 F1   3C 84 9E AD EE 38 98 EC  ....=...<....8..
0010: D9 32 32 D4                                        .22.
]
]

#3: ObjectId: 2.5.23.11 Criticality=true
BasicConstraints:[
  CA:true
  PathLen:0
]

#4: ObjectId: 2.5.29.31 Criticality=false
CRLDistributionPoints [
  [DistributionPoint:
     [URIName: http://crl.comodoca.com/COMODORSACertificationAuthority.crl]
]]

#5: ObjectId: 2.5.29.32 Criticality=false
CertificatePolicies [
  [CertificatePolicyId: [2.5.2.3.0]
[]  ]
  [CertificatePolicyId: [2.23.14.1.2.2]
[]  ]
]

#6: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
  serverAuth
  clientAuth
]

#7: ObjectId: 2.5.29.15 Criticality=true
KeyUsage [
  DigitalSignature
  Key_CertSign
  Crl_Sign
]

#8: ObjectId: 2.5.29.14 Criticality=false
SubjectKeyIdentifier [
KeyIdentifier [
0000: 9C F3 2B DD CD BD 4F B6   2F BB 2A 48 48 2A 12 B7  ..+...O./.*HH*..
0010: 1B 42 C1 24                                        .B.$
]
]



*******************************************
*******************************************

 

Thanks,

Prabu.

Explorer

Hi @araujo - Have you had a chance to look into this? Please let me know for your thoughts/suggestions.

 

Appreciate your help and patience.

 

Thanks,

Prabu.

 

PS: Please ignore the reply on 02-25-2022 at 10:41 AM which was incomplete. I don't see a delete option to delete it. 

 

 

Master Collaborator

@spserd ,

 

Are the NiFi 1.11 and 1.15 clusters different clusters or the same cluster that you upgraded?

Is the application using the same keystore to connect to both clusters?

Would you be able to share the part of your application code where you construct your NiFiSource?

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Explorer

Hi @araujo,

 

I figured out the issue. It seems we have to provide the keystore type and truststore type along with the keystore and truststore file and passwords. Once I have them added to the source building code, I was able to connect to NiFi without any issues. Now my Flink flow is connected to NiFi and processing data.

 

Sharing the NiFiSource creation logic for reference:

org.apache.flink.streaming.api.functions.source.SourceFunction<org.apache.flink.streaming.connectors.nifi.NiFiDataPacket> nifiSource;
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
.urls(new HashSet(Arrays.asList(config.getString(clusterId + ".url").split(","))))
.portName(config.getString(clusterId + ".port"))
.requestBatchCount(config.getInt("batch-count"))
.truststoreFilename(config.getString("truststore-file"))
.truststorePass(config.getString("truststore-password"))
.keystoreFilename(config.getString("keystore-file"))
.keystorePass(config.getString("keystore-password"))
.keystoreType(KeystoreType.JKS)
.truststoreType(KeystoreType.JKS)
.buildConfig();
nifiSource = new org.apache.flink.streaming.connectors.nifi.NiFiSource(clientConfig);

 

Appreciate your help and patience.

 

Thanks,

Prabu. 

Community Manager

@spserd 

I'm happy to see you resolved your issue. Please mark the appropriate reply as the solution, as it will make it easier for others to find the answer in the future. 

 

 

 

Screen Shot 2019-08-06 at 1.54.47 PM.png

 

 


Cy Jervis, Manager, Community Program
Was your question answered? Make sure to mark the answer as the accepted solution.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Master Collaborator

Ah! Thanks a lot for taking the time to post the solution, @spserd !

 

I suspect that there was one detail that wasn't mentioned: did you also upgrade Java from 8 to 11 between the two clusters?

 

In Java 8 the default keystore/truststore type used to be JKS and in Java 11 the default changed to PKCS12. If the store type isn't set explicitly, this error would certainly happen when upgrading Java.

 

Cheers,

André

 

--
Was your question answered? Please take some time to click on "Accept as Solution" below this post.
If you find a reply useful, say thanks by clicking on the thumbs up button.

Explorer

Hello @araujo - Yes. That is the issue. From Java 11, we may need to explicitly set the keystore type.

 

Thanks,

Prabu

; ;