Support Questions
Find answers, ask questions, and share your expertise
Announcements
Check out our newest addition to the community, the Cloudera Innovation Accelerator group hub.

kafka connect file import and export

Hi - i'm trying for kafka file import and export but its failing with timed out.

ERROR Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da=ProducerRecord(topic=newtest, partition=null, key=[B@63d673d3, value=[B@144e54da}) (org.apache.kafka.connect.runtime.WorkerSourceTask:239)

[2017-01-02 05:51:08,891] ERROR Failed to commit offsets for WorkerSourceTask{id=local-file-source-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:112)

i check both kafka server and zookeeper and those are running fine.

no other error am seeing in logs..

please help me in fixing the issue

thanks,sathish

1 ACCEPTED SOLUTION

@sathish jeganathan

You need to add below properties in /etc/kafka/conf/connect-standalone.properties if you are using SASL.

producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.kerberos.service.name=kafka

View solution in original post

19 REPLIES 19

below are my source and sink properties.

producer:

name=local-file-source

connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector

tasks.max=1

file=/tmp/test.txt (with 777 mod)

topic=newtest

Sink:

name=local-file-sink

connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector

tasks.max=1

file=/tmp/test.sink.txt (with 777 mod)

topics=newtest

@sathish jeganathan

Check connect-standalone.properties file and see if you have the valid broker hostname and port (bootstrap.servers).

yes.. its localhost:6667. initially it was localhost:9092 but i changed to 6667

thanks,

sathish

@sathish jeganathan Please replace localhost with the broker hostname and re-run the connect command.

@Sandeep Nemuri

yup., i've tried that too.. but still same error

thanks,

sathish

Can you change the log4j properties to DEBUG mode in connect-log4j.properties and see what is the error it is throwing ?

Here is the content of connect-log4j.properties

log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.zookeeper=DEBUG
log4j.logger.org.I0Itec.zkclient=DEBUG

there are no such parms for kafak connect-log4j.properties.. can you please let me know the parameter

thanks,

sathish

see the below error msg's ..it says connection refused for the broker host.

DEBUG Connection with tstr400367.abc-test.com/10.246.131.35 disconnected (org.apache.kafka.common.network.Selector:307)

java.net.ConnectException: Connection refused

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)

at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)

at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)

at org.apache.kafka.common.network.Selector.poll(Selector.java:274)

at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)

at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)

at java.lang.Thread.run(Thread.java:745)

i've mentioned proper node name with port id, now am not sure what to check

thanks,

sathish

@Sandeep Nemuri

i've setup port 6667 with sec.protocol to plaintextsasl.. but kafka connect by default running with producer properties (security.protocol = PLAINTEXT) . how can i override the parms for kafka connect. i've updated the parms in standalone.properties but kafka connect is not taking the parm while starting it. how should change the producer properties for kafka connect ?

thanks,

sathish

right now i see all below parms in connect-log4j properties

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n

log4j.logger.org.apache.zookeeper=ERROR

log4j.logger.org.I0Itec.zkclient=ERROR

thanks,

sathish

yes.. i've started in debug mode.. please gimme some time .. am going through the logs now

@sathish jeganathan

You need to add below properties in /etc/kafka/conf/connect-standalone.properties if you are using SASL.

producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.kerberos.service.name=kafka

yes.. its was problem with the security protocol.. i've changed them now and it started working now. is there any link or doc for parameter reference ?

thanks,

sathish

@Sandeep Nemuri

right now I'm testing with RDMS source(mysql) and kafka connect is failing for "connector.class". how can i find the correct connector class for rdms(mysql database).

i've tried with org.apache.kafka.connect.jdbc.JdbcSourceConnector,io.confulent.connect.jdbc.JdbcSourceConnector and both are not exist.

thanks,

sathish

@sathish jeganathan

I'm not sure about the mysql connector. But the class you are using belongs to https://github.com/confluentinc/kafka-connect-jdbc which is not shipped with HDP.

its not specific to mysql... connector class for "rdbms" source.

thanks,

sathish

i should have been more specific.. i'm looking for kafka connector class for rdbms as source. can you please let me know the connector class or any doc for reference...

thanks,

sathish

@sathish jeganathan

I am not sure about the connector, Request you to ask a new question for the same.