Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Kafka Producer on Apache Atlas via command line

avatar
Contributor

Hi,

I tried to use this command line :

> $KAFKA_BORKER_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

> $KAFKA_BORKER_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

When I typed a test message like:

test message 1

I had this error message :

[2017-06-23 10:17:43,467] ERROR Error when sending message to topic test with key: null, value: 14 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Do you have a suggestion ?

A this time, I am investigated around Ranger configuration.

Thanks for your help.

1 ACCEPTED SOLUTION

avatar
Super Collaborator

@Smart Data ,

Can you please check if the port 9092 is correct in the broker list and the broker is up and running ?

Also , is your cluster kerberized ?

Please note that :

The issue has nothing to do with Atlas or Ranger . When Atlas starts up , it creates 2 kafka topics ATLAS_HOOK and ATLAS_ENTITIES . The user doesn't have to create any topic.

View solution in original post

4 REPLIES 4

avatar
Super Collaborator

@Smart Data ,

Can you please check if the port 9092 is correct in the broker list and the broker is up and running ?

Also , is your cluster kerberized ?

Please note that :

The issue has nothing to do with Atlas or Ranger . When Atlas starts up , it creates 2 kafka topics ATLAS_HOOK and ATLAS_ENTITIES . The user doesn't have to create any topic.

avatar
Contributor

Thanks for your comment @Sharmadha Sainath

I use HDP SandBox, so I use the default configuration.

Mu cluster is not kerberized.

First, I tried to use kafka-console-producer.sh to create an entity on ATLAS_HOOK. I had the same error message.

Command :

 lsof -i -P -n  | grep kafka

Result :

java       x      kafka  x  IPv6 x      0t0  TCP *:60141 (LISTEN)
java       x      kafka  x  IPv6 x      0t0  TCP 192.x.x.x:49648->192.x.x.x:2181 (ESTABLISHED)
java       x      kafka  x  IPv6 x      0t0  TCP 192.x.x.x:6667 (LISTEN)
java       x      kafka  x  IPv6 x      0t0  TCP 1192.x.x.x:6667->192.x.x.x:32837 (ESTABLISHED)
java       x      kafka  x  IPv6 x      0t0  TCP 192.x.x.x:47712->192.x.x.x:6667 (CLOSE_WAIT)
java       x      kafka  x  IPv6 x      0t0  TCP 192.x.x.x:45976->192.x.x.x:6188 (ESTABLISHED)
java       x      kafka  x  IPv6 x      0t0  TCP 192.x.x.x:6667->192.x.x.x:47717 (ESTABLISHED)

Command :

$KAFKA_BROKER_HOME/bin/kafka-topics.sh --describe --topic ATLAS_HOOK --zookeeper localhost:2181

Result :

Topic:ATLAS_HOOK        PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: ATLAS_HOOK       Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001


avatar
Super Collaborator
@Smart Data

Can you please try with "--broker-list localhost:6667" ?

broker seems to be running on port 6667.

To verify the port number on which kafka broker is running , get into zookeeper client shell

using $ZOOKEEPER_HOME/bin/zkCli.sh and get the broker port.

The following image is taken after running zookeeper client shell and note that get /brokers/ids/0 lists the port.

16569-screen-shot-2017-06-23-at-92139-pm.png

Best way to check the processes running on a port is using lsof -i:6667.

In your case , "kakfa" in the 3rd column of output lsof -i -P -n | grep kafka is the kafka user and not the process itself.

Also , best practice is to use the hostname itself instead of "localhost".

avatar
Contributor

Thank you su much @Sharmadha Sainath!

Your comment is very useful !!!

I tested these commands :

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list MY_HOSTNAME:6667 --topic ATLAS_HOOK

Then in Kafka Procuder console:

{"version": {"version": "1.0.0"}, "message": {"entities": [{"jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "id": {"jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "id": "-1467290565135246000", "version": 0, "typeName": "hdfs_path", "state": "ACTIVE"}, "typeName": "hdfs_path", "values": {"qualifiedName": "TestKafka", "owner": "admin", "description": "Test kafka", "path":"/user/data/testkafka.csv", posixPermissions": null, "createTime": "1970-01-01T00:00:00.000Z", "isSymlink": false, "extendedAttributes": null, "numberOfReplicas": 0, "name": "DataKFK"}, "traitNames": [], "traits": {} }], "type": "ENTITY_CREATE", "user": "admin"} }

So, Atlas notifies into ATLAS_ENTITIES topic this new entity. I could see it on Atlas UI !

Have a good day !