Support Questions

Find answers, ask questions, and share your expertise

Kafka Connect with sample configuration not working

avatar
New Contributor

I am trying to use the Kafka Connect examples of write out to a file or console using the configuration files from within kafka's config folder [connect-console-sink.properties, config/connect-file-sink.properties]

I have messages in the topics, made these changes in the config files too.

Here's the last few entries from the log:

[2016-05-24 23:18:15,241] INFO Finished creating connector local-console-source (org.apache.kafka.connect.runtime.Worker:193)

[2016-05-24 23:18:15,245] INFO TaskConfig values:

task.class = class org.apache.kafka.connect.file.FileStreamSinkTask

(org.apache.kafka.connect.runtime.TaskConfig:165)

[2016-05-24 23:18:15,245] INFO Creating task local-console-source-0 (org.apache.kafka.connect.runtime.Worker:256)

[2016-05-24 23:18:15,246] INFO Instantiated task local-console-source-0 with version 0.9.0.2.4.0.0-169 of type org.apache.kafka.connect.file.FileStreamSinkTask (org.apache.kafka.connect.runtime.Worker:267)

[2016-05-24 23:18:15,253] INFO ConsumerConfig values:

request.timeout.ms = 40000

check.crcs = true

retry.backoff.ms = 100

ssl.truststore.password = null

ssl.keymanager.algorithm = SunX509

receive.buffer.bytes = 32768

ssl.cipher.suites = null

ssl.key.password = null

sasl.kerberos.ticket.renew.jitter = 0.05

ssl.provider = null

sasl.kerberos.service.name = null

session.timeout.ms = 30000

sasl.kerberos.ticket.renew.window.factor = 0.8

bootstrap.servers = [localhost:9092]

client.id =

fetch.max.wait.ms = 500

fetch.min.bytes = 1024

key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

sasl.kerberos.kinit.cmd = /usr/bin/kinit

auto.offset.reset = earliest

value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]

ssl.endpoint.identification.algorithm = null

max.partition.fetch.bytes = 1048576

ssl.keystore.location = null

ssl.truststore.location = null

ssl.keystore.password = null

metrics.sample.window.ms = 30000

metadata.max.age.ms = 300000

security.protocol = PLAINTEXT

auto.commit.interval.ms = 5000

ssl.protocol = TLS

sasl.kerberos.min.time.before.relogin = 60000

connections.max.idle.ms = 540000

ssl.trustmanager.algorithm = PKIX

group.id = connect-local-console-source

enable.auto.commit = false

metric.reporters = []

ssl.truststore.type = JKS

send.buffer.bytes = 131072

reconnect.backoff.ms = 50

metrics.num.samples = 2

ssl.keystore.type = JKS

heartbeat.interval.ms = 3000

(org.apache.kafka.clients.consumer.ConsumerConfig:165)

[2016-05-24 23:18:15,286] INFO Kafka version : 0.9.0.2.4.0.0-169 (org.apache.kafka.common.utils.AppInfoParser:82)

[2016-05-24 23:18:15,286] INFO Kafka commitId : 29fa247911f6823b (org.apache.kafka.common.utils.AppInfoParser:83)

[2016-05-24 23:18:15,290] INFO Created connector local-console-source (org.apache.kafka.connect.cli.ConnectStandalone:82)

^C

[2016-05-24 23:19:01,375] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)

[2016-05-24 23:19:01,375] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:136)

[2016-05-24 23:19:01,381] INFO Stopped ServerConnector@55ecdfba{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)

[2016-05-24 23:19:01,388] INFO Stopped o.e.j.s.ServletContextHandler@3b907954{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)

[2016-05-24 23:19:01,390] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:147)

[2016-05-24 23:19:01,390] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:62)

[2016-05-24 23:19:01,390] INFO Stopping task local-console-source-0 (org.apache.kafka.connect.runtime.Worker:305)

[2016-05-24 23:19:01,391] INFO Starting graceful shutdown of thread WorkerSinkTask-local-console-source-0 (org.apache.kafka.connect.util.ShutdownableThread:119)

[2016-05-24 23:19:01,391] ERROR Sink task WorkerSinkTask{id=local-console-source-0} was stopped before completing join group. Task initialization and start is being skipped (org.apache.kafka.connect.runtime.WorkerSinkTask:150)

Exception in thread "Thread-1" java.lang.NullPointerException

at org.apache.kafka.connect.file.FileStreamSinkTask.stop(FileStreamSinkTask.java:88)

at org.apache.kafka.connect.runtime.WorkerSinkTask.awaitStop(WorkerSinkTask.java:119)

at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:311)

at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.removeConnectorTasks(StandaloneHerder.java:238)

at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.stop(StandaloneHerder.java:68)

at org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)

at org.apache.kafka.connect.runtime.Connect$ShutdownHook.run(Connect.java:93)

[root@sandbox bin]#

1 ACCEPTED SOLUTION

avatar
New Contributor

I figured out the problem. The default configurations use 'bootstrap.servers=sandbox.hortonworks.com:6667' instead of 'bootstrap.servers=localhost:9092' as the URL usage is required.

View solution in original post

1 REPLY 1

avatar
New Contributor

I figured out the problem. The default configurations use 'bootstrap.servers=sandbox.hortonworks.com:6667' instead of 'bootstrap.servers=localhost:9092' as the URL usage is required.