- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Using KafkaAvroSerializer in Storm causes java.net.ConnectException: Connection refused (Connection refused)
- Labels:
-
Apache Kafka
-
Apache Spark
-
Schema Registry
Created ‎03-20-2017 07:40 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In Storm, my KafkaBolt is setup with the following value-serializer:
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
I receive the following tracestack when data is ingested into the bolt. The same topology works fine if using the default StringSerializer from Kafka. The same environment is also able to successfully serializer/deserialize using the default Avro serdes via Schema Registry. In other words, this seems to be Kafka-serdes specific.
2017-03-20 19:02:19.984 o.a.s.d.executor [ERROR] javax.ws.rs.ProcessingException: java.net.ConnectException: Connection refused (Connection refused) at org.glassfish.jersey.client.internal.HttpUrlConnector.apply(HttpUrlConnector.java:287) ~[stormjar.jar:?] at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:255) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation$2.call(JerseyInvocation.java:700) ~[stormjar.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:315) ~[stormjar.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:297) ~[stormjar.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:228) ~[stormjar.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:444) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:696) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:448) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation$Builder.post(JerseyInvocation.java:349) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.postEntity(SchemaRegistryClient.java:541) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.doRegisterSchemaMetadata(SchemaRegistryClient.java:228) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.registerSchemaMetadata(SchemaRegistryClient.java:221) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.addSchemaVersion(SchemaRegistryClient.java:250) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotSerializer.serialize(AbstractSnapshotSerializer.java:50) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:463) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440) ~[stormjar.jar:?] at org.apache.storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:143) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__5211$tuple_action_fn__5213.invoke(executor.clj:728) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.daemon.executor$mk_task_receiver$fn__5132.invoke(executor.clj:460) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.disruptor$clojure_handler$reify__4647.onEvent(disruptor.clj:40) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:453) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:432) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.daemon.executor$fn__5211$fn__5224$fn__5277.invoke(executor.clj:847) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_111] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_111] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_111] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_111] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_111] at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_111] at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.New(HttpClient.java:308) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.New(HttpClient.java:326) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1316) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1291) ~[?:1.8.0_111] at org.glassfish.jersey.client.internal.HttpUrlConnector$4.getOutputStream(HttpUrlConnector.java:385) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.CommittingOutputStream.commitStream(CommittingOutputStream.java:200) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.CommittingOutputStream.commitStream(CommittingOutputStream.java:194) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.CommittingOutputStream.write(CommittingOutputStream.java:228) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$UnCloseableOutputStream.write(WriterInterceptorExecutor.java:299) ~[stormjar.jar:?] at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:1982) ~[stormjar.jar:?] at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:995) ~[stormjar.jar:?] at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:932) ~[stormjar.jar:?] at com.fasterxml.jackson.jaxrs.base.ProviderBase.writeTo(ProviderBase.java:635) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:265) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:250) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:162) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1130) ~[stormjar.jar:?] at org.glassfish.jersey.client.ClientRequest.writeEntity(ClientRequest.java:502) ~[stormjar.jar:?] at org.glassfish.jersey.client.internal.HttpUrlConnector._apply(HttpUrlConnector.java:388) ~[stormjar.jar:?] at org.glassfish.jersey.client.internal.HttpUrlConnector.apply(HttpUrlConnector.java:285) ~[stormjar.jar:?] ... 28 more
Created ‎03-22-2017 04:36 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding).
It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right.
Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url?
Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // rootUrl is of http://<host>:<port>/api/v1 config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
Created ‎03-22-2017 04:19 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Satish Duggana can you please help answer this questions
Created ‎03-22-2017 04:21 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Can you make your schema registry server accessible from Storm's Supervisor nodes.
Created ‎03-22-2017 04:36 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding).
It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right.
Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url?
Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // rootUrl is of http://<host>:<port>/api/v1 config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
Created ‎03-22-2017 11:41 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
The configs were the issue, thanks! I had tracked this down before and made the appropriate change, though kept running into the same problem. Debugging showed that the Kafka serializer was being intialized/configured correctly, though the issue kept coming up. The registry webservice was accessible from Storm supervisor nodes, and serializers showed the correct configs. After reading these comments, I booted up a fresh cluster, deployed the same topology with the same configs ... and wallah.
I'll have to keep an eye open for what changes could have caused this, or do in the future. Thanks for prompting me to double check all of this!
