Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Can't read messages from kafka topic through nifi ( ConsumeKafka_0_10 )

avatar
Expert Contributor

But from command line read messages successfully. consumekafka-0-10.png

8 REPLIES 8

avatar
Super Mentor

@Dmitro Vasilenko

Are you seeing an error or warn log message produced by the ConsumeKafka processor when you run it? What is the processors configured run strategy/schedule?

avatar
Expert Contributor

Hi!

I do not see errors during work of processor NIFI (ConsumeKafka_0_10)

Log Nifi :

2017-04-11 17:25:37,726 INFO [Timer-Driven Process Thread-79] o.a.k.clients.consumer.ConsumerConfig ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [host.domain:6667, host.domain:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 10000 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = user-nifi retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest

avatar
Expert Contributor

About "strategy/schedule" : Concurrent Tasks =1 , Run Schedule = 0 sec

---------------

From command line I can read messages after change "max.message.bytes" : /usr/hdp/2.5.3.0-37/kafka/bin/kafka-configs.sh --zookeeper host.domain:2181,host.domain:2181,host.domain.ua:2181 \ > --entity-type topics \ > --entity-name my-topic \ > --alter \ > --add-config max.message.bytes=2147483647 [2017-04-11 16:45:13,993] ERROR ***************************************************************************************************** *** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's *** *** default max.message.bytes. This operation is dangerous. There are two potential side effects: *** *** - Consumers will get failures if their fetch.message.max.bytes (old consumer) or *** *** max.partition.fetch.bytes (new consumer) < the value you are using *** *** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have *** *** a higher risk of data loss *** *** You should ensure both of these settings are greater than the value set here before using *** *** this topic. *** ***************************************************************************************************** - value set here: 2147483647 - Default Broker replica.fetch.max.bytes: 1048576 - Default Broker max.message.bytes: 1000012 - Default Old Consumer fetch.message.max.bytes: 1048576 - Default New Consumer max.partition.fetch.bytes: 1048576

avatar
Super Mentor
@Dmitro Vasilenko

The ConsumeKafka_0_10 processor allows dynamic properties:

14541-screen-shot-2017-04-11-at-110857-am.png

You can try adding a new dynamic property to your ConsumeKafka_0_10 processor for "max.message.bytes" with a value of 2147483647 to see if that works for you.

List of Kafka properties can be found here:

http://kafka.apache.org/documentation.html#configuration

Thanks,

Matt

avatar
Super Mentor
@Dmitro Vasilenko

The ConsumeKafka processor will only accept dynamic properties for Kafka consumers only.

max.message.bytes is a server configuration property.

I believe what you are really looking for on the consumer side is:

max.partition.fetch.bytes

This property will be accepted by the consumeKafka processor and you will not see the "Must be a known configuration parameter for this Kafka client" invalid tooltip notification.

Thanks,

Matt

Just as an FYI, I don't get pinged about any new answers/comments you make without the @<username> notation.

avatar
Expert Contributor

thank you for an answer, but I get an error :

avatar
Expert Contributor

avatar
Expert Contributor