Support Questions

Find answers, ask questions, and share your expertise

Using KafkaAvroSerializer in Storm causes java.net.ConnectException: Connection refused (Connection refused)

avatar

In Storm, my KafkaBolt is setup with the following value-serializer:

com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer

I receive the following tracestack when data is ingested into the bolt. The same topology works fine if using the default StringSerializer from Kafka. The same environment is also able to successfully serializer/deserialize using the default Avro serdes via Schema Registry. In other words, this seems to be Kafka-serdes specific.

2017-03-20 19:02:19.984 o.a.s.d.executor [ERROR]
javax.ws.rs.ProcessingException: java.net.ConnectException: Connection refused (Connection refused)
        at org.glassfish.jersey.client.internal.HttpUrlConnector.apply(HttpUrlConnector.java:287) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:255) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.JerseyInvocation$2.call(JerseyInvocation.java:700) ~[stormjar.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:315) ~[stormjar.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:297) ~[stormjar.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:228) ~[stormjar.jar:?]
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:444) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:696) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:448) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.JerseyInvocation$Builder.post(JerseyInvocation.java:349) ~[stormjar.jar:?]
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.postEntity(SchemaRegistryClient.java:541) ~[stormjar.jar:?]
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.doRegisterSchemaMetadata(SchemaRegistryClient.java:228) ~[stormjar.jar:?]
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.registerSchemaMetadata(SchemaRegistryClient.java:221) ~[stormjar.jar:?]
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.addSchemaVersion(SchemaRegistryClient.java:250) ~[stormjar.jar:?]
        at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotSerializer.serialize(AbstractSnapshotSerializer.java:50) ~[stormjar.jar:?]
        at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[stormjar.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:463) ~[stormjar.jar:?]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:143) [stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__5211$tuple_action_fn__5213.invoke(executor.clj:728) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.daemon.executor$mk_task_receiver$fn__5132.invoke(executor.clj:460) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.disruptor$clojure_handler$reify__4647.onEvent(disruptor.clj:40) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:453) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:432) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.daemon.executor$fn__5211$fn__5224$fn__5277.invoke(executor.clj:847) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_111]
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_111]
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_111]
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_111]
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_111]
        at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_111]
        at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_111]
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) ~[?:1.8.0_111]
        at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) ~[?:1.8.0_111]
        at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) ~[?:1.8.0_111]
        at sun.net.www.http.HttpClient.New(HttpClient.java:308) ~[?:1.8.0_111]
        at sun.net.www.http.HttpClient.New(HttpClient.java:326) ~[?:1.8.0_111]
        at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202) ~[?:1.8.0_111]
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138) ~[?:1.8.0_111]
        at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032) ~[?:1.8.0_111]
        at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966) ~[?:1.8.0_111]
        at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1316) ~[?:1.8.0_111]
        at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1291) ~[?:1.8.0_111]
        at org.glassfish.jersey.client.internal.HttpUrlConnector$4.getOutputStream(HttpUrlConnector.java:385) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.CommittingOutputStream.commitStream(CommittingOutputStream.java:200) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.CommittingOutputStream.commitStream(CommittingOutputStream.java:194) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.CommittingOutputStream.write(CommittingOutputStream.java:228) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$UnCloseableOutputStream.write(WriterInterceptorExecutor.java:299) ~[stormjar.jar:?]
        at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:1982) ~[stormjar.jar:?]
        at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:995) ~[stormjar.jar:?]
        at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:932) ~[stormjar.jar:?]
        at com.fasterxml.jackson.jaxrs.base.ProviderBase.writeTo(ProviderBase.java:635) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:265) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:250) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:162) ~[stormjar.jar:?]
        at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1130) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.ClientRequest.writeEntity(ClientRequest.java:502) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.internal.HttpUrlConnector._apply(HttpUrlConnector.java:388) ~[stormjar.jar:?]
        at org.glassfish.jersey.client.internal.HttpUrlConnector.apply(HttpUrlConnector.java:285) ~[stormjar.jar:?]
        ... 28 more
1 ACCEPTED SOLUTION

avatar
Contributor
@Edgar Orendain

StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding).

It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right.

Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url?

Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// rootUrl is of http://<host>:<port>/api/v1
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

View solution in original post

4 REPLIES 4

avatar
Super Collaborator

@Satish Duggana can you please help answer this questions

avatar
@Edgar Orendain

Can you make your schema registry server accessible from Storm's Supervisor nodes.

avatar
Contributor
@Edgar Orendain

StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding).

It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right.

Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url?

Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// rootUrl is of http://<host>:<port>/api/v1
config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());

avatar

@Satish Duggana

@Sriharsha Chintalapani

The configs were the issue, thanks! I had tracked this down before and made the appropriate change, though kept running into the same problem. Debugging showed that the Kafka serializer was being intialized/configured correctly, though the issue kept coming up. The registry webservice was accessible from Storm supervisor nodes, and serializers showed the correct configs. After reading these comments, I booted up a fresh cluster, deployed the same topology with the same configs ... and wallah.

I'll have to keep an eye open for what changes could have caused this, or do in the future. Thanks for prompting me to double check all of this!