Created 04-09-2017 08:54 PM
But from command line read messages successfully. consumekafka-0-10.png
Created 04-10-2017 12:40 PM
Are you seeing an error or warn log message produced by the ConsumeKafka processor when you run it? What is the processors configured run strategy/schedule?
Created 04-11-2017 02:35 PM
Hi!
I do not see errors during work of processor NIFI (ConsumeKafka_0_10)
Log Nifi :
2017-04-11 17:25:37,726 INFO [Timer-Driven Process Thread-79] o.a.k.clients.consumer.ConsumerConfig ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [host.domain:6667, host.domain:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 10000 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = user-nifi retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest
Created 04-11-2017 02:38 PM
About "strategy/schedule" : Concurrent Tasks =1 , Run Schedule = 0 sec
---------------
From command line I can read messages after change "max.message.bytes" : /usr/hdp/2.5.3.0-37/kafka/bin/kafka-configs.sh --zookeeper host.domain:2181,host.domain:2181,host.domain.ua:2181 \ > --entity-type topics \ > --entity-name my-topic \ > --alter \ > --add-config max.message.bytes=2147483647 [2017-04-11 16:45:13,993] ERROR ***************************************************************************************************** *** WARNING: you are creating a topic where the max.message.bytes is greater than the broker's *** *** default max.message.bytes. This operation is dangerous. There are two potential side effects: *** *** - Consumers will get failures if their fetch.message.max.bytes (old consumer) or *** *** max.partition.fetch.bytes (new consumer) < the value you are using *** *** - Producer requests larger than replica.fetch.max.bytes will not replicate and hence have *** *** a higher risk of data loss *** *** You should ensure both of these settings are greater than the value set here before using *** *** this topic. *** ***************************************************************************************************** - value set here: 2147483647 - Default Broker replica.fetch.max.bytes: 1048576 - Default Broker max.message.bytes: 1000012 - Default Old Consumer fetch.message.max.bytes: 1048576 - Default New Consumer max.partition.fetch.bytes: 1048576
Created on 04-11-2017 05:21 PM - edited 08-18-2019 12:01 AM
The ConsumeKafka_0_10 processor allows dynamic properties:
You can try adding a new dynamic property to your ConsumeKafka_0_10 processor for "max.message.bytes" with a value of 2147483647 to see if that works for you.
List of Kafka properties can be found here:
http://kafka.apache.org/documentation.html#configuration
Thanks,
Matt
Created 04-25-2017 06:17 PM
The ConsumeKafka processor will only accept dynamic properties for Kafka consumers only.
max.message.bytes is a server configuration property.
I believe what you are really looking for on the consumer side is:
max.partition.fetch.bytes
This property will be accepted by the consumeKafka processor and you will not see the "Must be a known configuration parameter for this Kafka client" invalid tooltip notification.
Thanks,
Matt
Just as an FYI, I don't get pinged about any new answers/comments you make without the @<username> notation.
Created 04-12-2017 07:54 AM
thank you for an answer, but I get an error :
Created 04-12-2017 07:55 AM
Created 04-12-2017 07:56 AM