Created 01-02-2017 06:05 AM
Hi - i'm trying for kafka file import and export but its failing with timed out.
ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da=ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da}) (org.apache.kafka.connect.runtime.WorkerSourceTask:239)
[2017-01-02 05:51:08,891] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)
i check both kafka server and zookeeper and those are running fine.
no other error am seeing in logs..
please help me in fixing the issue
thanks,sathish
Created 01-02-2017 12:06 PM
You need to add below properties in /etc/kafka/conf/connect-standalone.properties if you are using SASL.
producer.security.protocol=SASL_PLAINTEXT producer.sasl.kerberos.service.name=kafka consumer.security.protocol=SASL_PLAINTEXT consumer.sasl.kerberos.service.name=kafka
Created 01-02-2017 06:12 AM
below are my source and sink properties.
producer:
name=local-file-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/test.txt (with 777 mod)
topic=newtest
Sink:
name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=/tmp/test.sink.txt (with 777 mod)
topics=newtest
Created 01-02-2017 08:58 AM
Check connect-standalone.properties file and see if you have the valid broker hostname and port (bootstrap.servers).
Created 01-02-2017 09:06 AM
yes.. its localhost:6667. initially it was localhost:9092 but i changed to 6667
thanks,
sathish
Created 01-02-2017 09:10 AM
@sathish jeganathan Please replace localhost with the broker hostname and re-run the connect command.
Created 01-02-2017 09:13 AM
Created 01-02-2017 09:19 AM
Can you change the log4j properties to DEBUG mode in connect-log4j.properties and see what is the error it is throwing ?
Created 01-02-2017 10:00 AM
Here is the content of connect-log4j.properties
log4j.rootLogger=DEBUG, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n log4j.logger.org.apache.zookeeper=DEBUG log4j.logger.org.I0Itec.zkclient=DEBUG
Created 01-02-2017 09:57 AM
there are no such parms for kafak connect-log4j.properties.. can you please let me know the parameter
thanks,
sathish
Created 01-02-2017 10:11 AM
see the below error msg's ..it says connection refused for the broker host.
DEBUG Connection with tstr400367.abc-test.com/10.246.131.35 disconnected (org.apache.kafka.common.network.Selector:307)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Thread.java:745)
i've mentioned proper node name with port id, now am not sure what to check
thanks,
sathish