Created 12-04-2016 06:58 PM
Hi
I am reading a CSV file. converting it into JSON and splitting it based on records. I try to write these files into Elastic. I get the following error:
2016-12-04 19:42:18,008 ERROR [Timer-Driven Process Thread-10] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=ca053faa-0158-1000-8397-413cc421f011] PutElasticsearchHttp[id=ca053faa-0158-1000-8397-413cc421f011] failed to process due to org.apache.nifi.processor.exception.ProcessException: java.net.SocketException: Connection reset by peer: socket write error; rolling back session: org.apache.nifi.processor.exception.ProcessException: java.net.SocketException: Connection reset by peer: socket write error 2016-12-04 19:42:18,017 ERROR [Timer-Driven Process Thread-10] o.a.n.p.e.PutElasticsearchHttp org.apache.nifi.processor.exception.ProcessException: java.net.SocketException: Connection reset by peer: socket write error at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:315) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0.jar:1.0.0] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.0.0.jar:1.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_101] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_101] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_101] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_101] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] Caused by: java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) ~[na:1.8.0_101] at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) ~[na:1.8.0_101] at java.net.SocketOutputStream.write(SocketOutputStream.java:153) ~[na:1.8.0_101] at okio.Okio$1.write(Okio.java:80) ~[okio-1.8.0.jar:na] at okio.AsyncTimeout$1.write(AsyncTimeout.java:180) ~[okio-1.8.0.jar:na] at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.java:171) ~[okio-1.8.0.jar:na] at okio.RealBufferedSink.write(RealBufferedSink.java:41) ~[okio-1.8.0.jar:na] at okhttp3.internal.http.Http1xStream$FixedLengthSink.write(Http1xStream.java:286) ~[okhttp-3.3.1.jar:na] at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.java:171) ~[okio-1.8.0.jar:na] at okio.RealBufferedSink.write(RealBufferedSink.java:91) ~[okio-1.8.0.jar:na] at okhttp3.RequestBody$2.writeTo(RequestBody.java:96) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine$NetworkInterceptorChain.proceed(HttpEngine.java:756) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine.readResponse(HttpEngine.java:613) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponse(RealCall.java:244) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:201) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:163) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.execute(RealCall.java:57) ~[okhttp-3.3.1.jar:na] at org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.sendRequestToElasticsearch(AbstractElasticsearchHttpProcessor.java:166) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:313) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] ... 12 common frames omitted
Any help is highly appriciated
Created 12-05-2016 01:52 PM
@Arsalan Siddiqi awesome, the port can be tricky. It looks like from your log you may have put in a little typo, 9002 instead of 9200. Can you switch the port to 9200 and check?
This is in your log showing 9002:Failed to connect to localhost/127.0.0.1:9002;
Created 12-05-2016 01:08 PM
Are you getting any errors/logs from elasticsearch ($ES_HOME/logs/elasticsearch.log)? Also assuming your hitting on the default ES port 9200 with HTTP? Are you able to actually hit Elasticsearch with the HTTP processor?
In the past I've hit plenty of errors while sending malformed JSON to elasticsearch, and also having port/iptables/firewall problems.
Created on 12-05-2016 01:49 PM - edited 08-19-2019 01:47 AM
Thanks for your reply.... I had made the mistake of using port 9300 instead of 9200....how can i resolve firewall issues... at the moment i have disabled the firewall but it has not helped. I have checked elasticsearch logs and i dont see any errors there...
now that I have changed the port to 9200 i get the following error:
2016-12-05 14:45:39,130 ERROR [Timer-Driven Process Thread-6] o.a.n.p.e.PutElasticsearchHttp PutElasticsearchHttp[id=cef732f0-0158-1000-4c78-e6fd2c642944] PutElasticsearchHttp[id=cef732f0-0158-1000-4c78-e6fd2c642944] failed to process due to org.apache.nifi.processor.exception.ProcessException: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002; rolling back session: org.apache.nifi.processor.exception.ProcessException: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002 2016-12-05 14:45:39,130 ERROR [Timer-Driven Process Thread-6] o.a.n.p.e.PutElasticsearchHttp org.apache.nifi.processor.exception.ProcessException: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002 at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:315) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0.jar:1.0.0] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.0.0.jar:1.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] Caused by: java.net.ConnectException: Failed to connect to localhost/127.0.0.1:9002 at okhttp3.internal.io.RealConnection.connectSocket(RealConnection.java:187) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.io.RealConnection.buildConnection(RealConnection.java:170) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.io.RealConnection.connect(RealConnection.java:111) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.StreamAllocation.findConnection(StreamAllocation.java:187) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.StreamAllocation.findHealthyConnection(StreamAllocation.java:123) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.StreamAllocation.newStream(StreamAllocation.java:93) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine.connect(HttpEngine.java:296) ~[okhttp-3.3.1.jar:na] at okhttp3.internal.http.HttpEngine.sendRequest(HttpEngine.java:248) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponse(RealCall.java:243) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall$ApplicationInterceptorChain.proceed(RealCall.java:201) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:163) ~[okhttp-3.3.1.jar:na] at okhttp3.RealCall.execute(RealCall.java:57) ~[okhttp-3.3.1.jar:na] at org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.sendRequestToElasticsearch(AbstractElasticsearchHttpProcessor.java:166) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp.onTrigger(PutElasticsearchHttp.java:313) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] ... 12 common frames omitted
Created 12-05-2016 01:52 PM
@Arsalan Siddiqi awesome, the port can be tricky. It looks like from your log you may have put in a little typo, 9002 instead of 9200. Can you switch the port to 9200 and check?
This is in your log showing 9002:Failed to connect to localhost/127.0.0.1:9002;
Created 12-05-2016 01:59 PM
ahhhh .....thankyou so much...it works.... I will try the Putelasticsearch processor now that this one is working....
Created 12-05-2016 02:06 PM
@Arsalan Siddiqi perfect. Keep in mind there is one ES processor that will use the 9300 port for transport (putelasticsearch).
Created 12-05-2016 02:23 PM
Both PutElasticsearch and FetchElasticsearch use the transport client. There are Http versions of both (PutElasticsearchHttp and FetchElasticsearchHttp) that use the REST API.
Created on 12-05-2016 02:12 PM - edited 08-19-2019 01:47 AM
I got the putelasticsearchHTTP processor to work... but when i try to use the putelasticsearch processor with the same data i get the following error:
2016-12-05 15:06:23,209 ERROR [Timer-Driven Process Thread-8] o.a.n.p.elasticsearch.PutElasticsearch PutElasticsearch[id=ce4b90b8-0158-1000-4eab-a028c319db21] Failed to insert into Elasticsearch due to None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{localhost/127.0.0.1:9300}]. More detailed information may be available in the NiFi logs.: NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{localhost/127.0.0.1:9300}]] 2016-12-05 15:06:23,209 ERROR [Timer-Driven Process Thread-8] o.a.n.p.elasticsearch.PutElasticsearch org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes are available: [{#transport#-1}{127.0.0.1}{localhost/127.0.0.1:9300}] at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:290) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:207) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.transport.support.TransportProxyClient.execute(TransportProxyClient.java:55) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.transport.TransportClient.doExecute(TransportClient.java:283) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.client.support.AbstractClient.execute(AbstractClient.java:347) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:85) ~[elasticsearch-2.1.0.jar:2.1.0] at org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:59) ~[elasticsearch-2.1.0.jar:2.1.0] at org.apache.nifi.processors.elasticsearch.PutElasticsearch.onTrigger(PutElasticsearch.java:212) ~[nifi-elasticsearch-processors-1.0.0.jar:1.0.0] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) [nifi-api-1.0.0.jar:1.0.0] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0.jar:1.0.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) [nifi-framework-core-1.0.0.jar:1.0.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_111] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_111] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
and in Elasticsearch logs:
[2016-12-05T15:06:23,113][WARN ][o.e.t.n.Netty4Transport ] [KXBLBSs] exception caught on transport layer [[id: 0xeb720e09, L:/127.0.0.1:9300 - R:/127.0.0.1:54211]], closing connection java.lang.IllegalStateException: Received message from unsupported version: [2.0.0] minimal compatible version is: [5.0.0] at org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1199) ~[elasticsearch-5.0.2.jar:5.0.2] at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:74) ~[transport-netty4-5.0.2.jar:5.0.2] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) [netty-codec-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:610) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:513) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:467) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) [netty-transport-4.1.5.Final.jar:4.1.5.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) [netty-common-4.1.5.Final.jar:4.1.5.Final] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
Created 12-05-2016 02:26 PM
@Arsalan Siddiqi PutElasticsearch and FetchElasticsearch support Elasticsearch 2.X clusters, it looks like you are trying to use ES 5.x. NiFi 1.1.0 and HDF 2.1.0 will have Elasticsearch 5 processors (PutElasticsearch5 and FetchElasticsearch5). In the meantime the Http processors should connect to both ES 2.X and 5.X clusters
Created 12-05-2016 03:20 PM
thankyou @Matt Burgess I will use the Http processors in the meantime then!