Support Questions

Find answers, ask questions, and share your expertise

PutElasticHttp Connection reset by peer: socket write error

avatar
Expert Contributor

Hi

I am reading a CSV file. converting it into JSON and splitting it based on records. I try to write these files into Elastic. I get the following error:

2016-12-04 19:42:18,008 ERROR [Timer-Driven Process Thread-10] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=ca053faa-0158-1000-8397-413cc421f011] PutElasticsearchHttp[id=ca053faa-0158-1000-8397-413cc421f011] failed to process due to org.apache.nifi.processor.exception.ProcessException: java.net.SocketException: Connection reset by peer: socket write error; rolling back session: org.apache.nifi.processor.exception.ProcessException: java.net.SocketException: Connection reset by peer: socket write error 2016-12-04 19:42:18,017 ERROR [Timer-Driven Process Thread-10] o.a.n.p.e.PutElasticsearchHttp org.apache.nifi.processor.exception.ProcessException: java.net.SocketException: Connection reset by peer: socket write error at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:315) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0.jar:1.0.0] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.0.0.jar:1.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_101] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_101] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] Caused by: java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) ~[na:1.8.0_101] at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) ~[na:1.8.0_101] at java.net.SocketOutputStream.write(SocketOutputStream.java:153) ~[na:1.8.0_101] at okio.Okio$1.write(Okio.java:80) ~[okio-1.8.0.jar:na] at okio.AsyncTimeout$1.write(AsyncTimeout.java:180) ~[okio-1.8.0.jar:na] at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.java:171) ~[okio-1.8.0.jar:na] at okio.RealBufferedSink.write(RealBufferedSink.java:41) ~[okio-1.8.0.jar:na] at okhttp3.internal.http.Http1xStream$FixedLengthSink.write(Http1xStream.java:286) ~[okhttp-3.3.1.jar:na] at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.java:171) ~[okio-1.8.0.jar:na] at okio.RealBufferedSink.write(RealBufferedSink.java:91) ~[okio-1.8.0.jar:na] at okhttp3.RequestBody$2.writeTo(RequestBody.java:96) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:756) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:613) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponse(RealCall.java:244) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:201) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:163) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.execute(RealCall.java:57) ~[okhttp-3.3.1.jar:na] at org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.sendRequestToElasticsearch(AbstractElasticsearchHttpProcessor.java:166) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:313) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] ... 12 common frames omitted

Any help is highly appriciated

1 ACCEPTED SOLUTION

avatar
Rising Star

@Arsalan Siddiqi awesome, the port can be tricky. It looks like from your log you may have put in a little typo, 9002 instead of 9200. Can you switch the port to 9200 and check?

This is in your log showing 9002:
Failed to connect to localhost/127.0.0.1:9002;

View solution in original post

9 REPLIES 9

avatar
Rising Star

Are you getting any errors/logs from elasticsearch ($ES_HOME/logs/elasticsearch.log)? Also assuming your hitting on the default ES port 9200 with HTTP? Are you able to actually hit Elasticsearch with the HTTP processor?

In the past I've hit plenty of errors while sending malformed JSON to elasticsearch, and also having port/iptables/firewall problems.

avatar
Expert Contributor

Hi @Devin Pinkston

Thanks for your reply.... I had made the mistake of using port 9300 instead of 9200....how can i resolve firewall issues... at the moment i have disabled the firewall but it has not helped. I have checked elasticsearch logs and i dont see any errors there...

10084-elasticsearchhttp.png

now that I have changed the port to 9200 i get the following error:

2016-12-05 14:45:39,130 ERROR [Timer-Driven Process Thread-6] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=cef732f0-0158-1000-4c78-e6fd2c642944] PutElasticsearchHttp[id=cef732f0-0158-1000-4c78-e6fd2c642944] failed to process due to org.apache.nifi.processor.exception.ProcessException: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002; rolling back session: org.apache.nifi.processor.exception.ProcessException: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002 2016-12-05 14:45:39,130 ERROR [Timer-Driven Process Thread-6] o.a.n.p.e.PutElasticsearchHttp org.apache.nifi.processor.exception.ProcessException: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002 at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:315) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0.jar:1.0.0] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.0.0.jar:1.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] Caused by: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002 at okhttp3.internal.io.RealConnection.connectSocket(RealConnection.java:187) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.io.RealConnection.buildConnection(RealConnection.java:170) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.io.RealConnection.connect(RealConnection.java:111) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.StreamAllocation.findConnection(StreamAllocation.java:187) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.StreamAllocation.findHealthyConnection(StreamAllocation.java:123) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.StreamAllocation.newStream(StreamAllocation.java:93) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine.connect(HttpEngine.java:296) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine.sendRequest(HttpEngine.java:248) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponse(RealCall.java:243) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:201) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:163) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.execute(RealCall.java:57) ~[okhttp-3.3.1.jar:na] at org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.sendRequestToElasticsearch(AbstractElasticsearchHttpProcessor.java:166) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:313) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] ... 12 common frames omitted

avatar
Rising Star

@Arsalan Siddiqi awesome, the port can be tricky. It looks like from your log you may have put in a little typo, 9002 instead of 9200. Can you switch the port to 9200 and check?

This is in your log showing 9002:
Failed to connect to localhost/127.0.0.1:9002;

avatar
Expert Contributor

ahhhh .....thankyou so much...it works.... I will try the Putelasticsearch processor now that this one is working....

avatar
Rising Star

@Arsalan Siddiqi perfect. Keep in mind there is one ES processor that will use the 9300 port for transport (putelasticsearch).

avatar
Master Guru

Both PutElasticsearch and FetchElasticsearch use the transport client. There are Http versions of both (PutElasticsearchHttp and FetchElasticsearchHttp) that use the REST API.

avatar
Expert Contributor

I got the putelasticsearchHTTP processor to work... but when i try to use the putelasticsearch processor with the same data i get the following error:

2016-12-05 15:06:23,209 ERROR [Timer-Driven Process Thread-8] o.a.n.p.elasticsearch.PutElasticsearch PutElasticsearch[id=ce4b90b8-0158-1000-4eab-a028c319db21] Failed to insert into Elasticsearch due to None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{localhost/127.0.0.1:9300}]. More detailed information may be available in the NiFi logs.: NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{localhost/127.0.0.1:9300}]] 2016-12-05 15:06:23,209 ERROR [Timer-Driven Process Thread-8] o.a.n.p.elasticsearch.PutElasticsearch org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{localhost/127.0.0.1:9300}] at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:283) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:347) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59) ~[elasticsearch-2.1.0.jar:2.1.0] at org.apache.nifi.processors.elasticsearch.PutElasticsearch.onTrigger(PutElasticsearch.java:212) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.0.0.jar:1.0.0] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.0.0.jar:1.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

and in Elasticsearch logs:

[2016-12-05T15:06:23,113][WARN ][o.e.t.n.Netty4Transport ] [KXBLBSs] exception caught on transport layer [[id: 0xeb720e09, L:/127.0.0.1:9300 - R:/127.0.0.1:54211]], closing connection java.lang.IllegalStateException: Received message from unsupported version: [2.0.0] minimal compatible version is: [5.0.0] at org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1199) ~[elasticsearch-5.0.2.jar:5.0.2] at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:74) ~[transport-netty4-5.0.2.jar:5.0.2] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:513) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:467) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.5.Final.jar:4.1.5.Final] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

10085-putelasticsearch.png

avatar
Master Guru

@Arsalan Siddiqi PutElasticsearch and FetchElasticsearch support Elasticsearch 2.X clusters, it looks like you are trying to use ES 5.x. NiFi 1.1.0 and HDF 2.1.0 will have Elasticsearch 5 processors (PutElasticsearch5 and FetchElasticsearch5). In the meantime the Http processors should connect to both ES 2.X and 5.X clusters

avatar
Expert Contributor

thankyou @Matt Burgess I will use the Http processors in the meantime then!