Archives of Support Questions (Read Only)

This is an archived board for historical reference. Information and links may no longer be available or relevant
Announcements
This board is archived and read-only for historical reference. To ask a new question, please post a new topic on the appropriate active board.

kafka connect file import and export

avatar

Hi - i'm trying for kafka file import and export but its failing with timed out.

ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da=ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da}) (org.apache.kafka.connect.runtime.WorkerSourceTask:239)

[2017-01-02 05:51:08,891] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)

i check both kafka server and zookeeper and those are running fine.

no other error am seeing in logs..

please help me in fixing the issue

thanks,sathish

1 ACCEPTED SOLUTION

avatar
@sathish jeganathan

You need to add below properties in /etc/kafka/conf/connect-standalone.properties if you are using SASL.

producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.kerberos.service.name=kafka

View solution in original post

19 REPLIES 19

avatar

below are my source and sink properties.

producer:

name=local-file-source

connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector

tasks.max=1

file=/tmp/test.txt (with 777 mod)

topic=newtest

Sink:

name=local-file-sink

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector

tasks.max=1

file=/tmp/test.sink.txt (with 777 mod)

topics=newtest

avatar
@sathish jeganathan

Check connect-standalone.properties file and see if you have the valid broker hostname and port (bootstrap.servers).

avatar

yes.. its localhost:6667. initially it was localhost:9092 but i changed to 6667

thanks,

sathish

avatar

@sathish jeganathan Please replace localhost with the broker hostname and re-run the connect command.

avatar
@Sandeep Nemuri

yup., i've tried that too.. but still same error

thanks,

sathish

avatar

Can you change the log4j properties to DEBUG mode in connect-log4j.properties and see what is the error it is throwing ?

avatar

Here is the content of connect-log4j.properties

log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=DEBUG
log4j.logger.org.I0Itec.zkclient=DEBUG

avatar

there are no such parms for kafak connect-log4j.properties.. can you please let me know the parameter

thanks,

sathish

avatar

see the below error msg's ..it says connection refused for the broker host.

DEBUG Connection with tstr400367.abc-test.com/10.246.131.35 disconnected (org.apache.kafka.common.network.Selector:307)

java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)

at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)

at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)

at org.apache.kafka.common.network.Selector.poll(Selector.java:274)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)

at java.lang.Thread.run(Thread.java:745)

i've mentioned proper node name with port id, now am not sure what to check

thanks,

sathish