Support Questions

Find answers, ask questions, and share your expertise

kafka connect file import and export

avatar

Hi - i'm trying for kafka file import and export but its failing with timed out.

ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da=ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da}) (org.apache.kafka.connect.runtime.WorkerSourceTask:239)

[2017-01-02 05:51:08,891] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)

i check both kafka server and zookeeper and those are running fine.

no other error am seeing in logs..

please help me in fixing the issue

thanks,sathish

1 ACCEPTED SOLUTION

avatar
@sathish jeganathan

You need to add below properties in /etc/kafka/conf/connect-standalone.properties if you are using SASL.

producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.kerberos.service.name=kafka

View solution in original post

19 REPLIES 19

avatar

below are my source and sink properties.

producer:

name=local-file-source

connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector

tasks.max=1

file=/tmp/test.txt (with 777 mod)

topic=newtest

Sink:

name=local-file-sink

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector

tasks.max=1

file=/tmp/test.sink.txt (with 777 mod)

topics=newtest

avatar
@sathish jeganathan

Check connect-standalone.properties file and see if you have the valid broker hostname and port (bootstrap.servers).

avatar

yes.. its localhost:6667. initially it was localhost:9092 but i changed to 6667

thanks,

sathish

avatar

@sathish jeganathan Please replace localhost with the broker hostname and re-run the connect command.

avatar
@Sandeep Nemuri

yup., i've tried that too.. but still same error

thanks,

sathish

avatar

Can you change the log4j properties to DEBUG mode in connect-log4j.properties and see what is the error it is throwing ?

avatar

Here is the content of connect-log4j.properties

log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=DEBUG
log4j.logger.org.I0Itec.zkclient=DEBUG

avatar

there are no such parms for kafak connect-log4j.properties.. can you please let me know the parameter

thanks,

sathish

avatar

see the below error msg's ..it says connection refused for the broker host.

DEBUG Connection with tstr400367.abc-test.com/10.246.131.35 disconnected (org.apache.kafka.common.network.Selector:307)

java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)

at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)

at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)

at org.apache.kafka.common.network.Selector.poll(Selector.java:274)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)

at java.lang.Thread.run(Thread.java:745)

i've mentioned proper node name with port id, now am not sure what to check

thanks,

sathish