Created 03-20-2017 07:40 PM
In Storm, my KafkaBolt is setup with the following value-serializer:
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
I receive the following tracestack when data is ingested into the bolt. The same topology works fine if using the default StringSerializer from Kafka. The same environment is also able to successfully serializer/deserialize using the default Avro serdes via Schema Registry. In other words, this seems to be Kafka-serdes specific.
2017-03-20 19:02:19.984 o.a.s.d.executor [ERROR] javax.ws.rs.ProcessingException: java.net.ConnectException: Connection refused (Connection refused) at org.glassfish.jersey.client.internal.HttpUrlConnector.apply(HttpUrlConnector.java:287) ~[stormjar.jar:?] at org.glassfish.jersey.client.ClientRuntime.invoke(ClientRuntime.java:255) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation$2.call(JerseyInvocation.java:700) ~[stormjar.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:315) ~[stormjar.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:297) ~[stormjar.jar:?] at org.glassfish.jersey.internal.Errors.process(Errors.java:228) ~[stormjar.jar:?] at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:444) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:696) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:448) ~[stormjar.jar:?] at org.glassfish.jersey.client.JerseyInvocation$Builder.post(JerseyInvocation.java:349) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.postEntity(SchemaRegistryClient.java:541) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.doRegisterSchemaMetadata(SchemaRegistryClient.java:228) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.registerSchemaMetadata(SchemaRegistryClient.java:221) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.addSchemaVersion(SchemaRegistryClient.java:250) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotSerializer.serialize(AbstractSnapshotSerializer.java:50) ~[stormjar.jar:?] at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:463) ~[stormjar.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440) ~[stormjar.jar:?] at org.apache.storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:143) [stormjar.jar:?] at org.apache.storm.daemon.executor$fn__5211$tuple_action_fn__5213.invoke(executor.clj:728) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.daemon.executor$mk_task_receiver$fn__5132.invoke(executor.clj:460) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.disruptor$clojure_handler$reify__4647.onEvent(disruptor.clj:40) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:453) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:432) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.daemon.executor$fn__5211$fn__5224$fn__5277.invoke(executor.clj:847) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.0.2.2.1.2.0-10.jar:1.0.2.2.1.2.0-10] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_111] at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_111] at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_111] at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_111] at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_111] at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_111] at sun.net.NetworkClient.doConnect(NetworkClient.java:175) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.New(HttpClient.java:308) ~[?:1.8.0_111] at sun.net.www.http.HttpClient.New(HttpClient.java:326) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1316) ~[?:1.8.0_111] at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1291) ~[?:1.8.0_111] at org.glassfish.jersey.client.internal.HttpUrlConnector$4.getOutputStream(HttpUrlConnector.java:385) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.CommittingOutputStream.commitStream(CommittingOutputStream.java:200) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.CommittingOutputStream.commitStream(CommittingOutputStream.java:194) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.CommittingOutputStream.write(CommittingOutputStream.java:228) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$UnCloseableOutputStream.write(WriterInterceptorExecutor.java:299) ~[stormjar.jar:?] at com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:1982) ~[stormjar.jar:?] at com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:995) ~[stormjar.jar:?] at com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:932) ~[stormjar.jar:?] at com.fasterxml.jackson.jaxrs.base.ProviderBase.writeTo(ProviderBase.java:635) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.invokeWriteTo(WriterInterceptorExecutor.java:265) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor$TerminalWriterInterceptor.aroundWriteTo(WriterInterceptorExecutor.java:250) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.WriterInterceptorExecutor.proceed(WriterInterceptorExecutor.java:162) ~[stormjar.jar:?] at org.glassfish.jersey.message.internal.MessageBodyFactory.writeTo(MessageBodyFactory.java:1130) ~[stormjar.jar:?] at org.glassfish.jersey.client.ClientRequest.writeEntity(ClientRequest.java:502) ~[stormjar.jar:?] at org.glassfish.jersey.client.internal.HttpUrlConnector._apply(HttpUrlConnector.java:388) ~[stormjar.jar:?] at org.glassfish.jersey.client.internal.HttpUrlConnector.apply(HttpUrlConnector.java:285) ~[stormjar.jar:?] ... 28 more
Created 03-22-2017 04:36 PM
StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding).
It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right.
Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url?
Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // rootUrl is of http://<host>:<port>/api/v1 config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
Created 03-22-2017 04:19 PM
@Satish Duggana can you please help answer this questions
Created 03-22-2017 04:21 PM
Can you make your schema registry server accessible from Storm's Supervisor nodes.
Created 03-22-2017 04:36 PM
StringSerializer does not use schema-registry and it simply converts bytes using string#getBytes(encoding).
It seems the configured schema registry url on producer is not available. You may want to check whether the given URL is right.
Did you set property with key as SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name() with respective schema-registry url?
Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // rootUrl is of http://<host>:<port>/api/v1 config.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), rootUrl); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
Created 03-22-2017 11:41 PM
The configs were the issue, thanks! I had tracked this down before and made the appropriate change, though kept running into the same problem. Debugging showed that the Kafka serializer was being intialized/configured correctly, though the issue kept coming up. The registry webservice was accessible from Storm supervisor nodes, and serializers showed the correct configs. After reading these comments, I booted up a fresh cluster, deployed the same topology with the same configs ... and wallah.
I'll have to keep an eye open for what changes could have caused this, or do in the future. Thanks for prompting me to double check all of this!