Support Questions
Find answers, ask questions, and share your expertise

Apache Atlas Entity Notification Error (kafka)

Apache Atlas Entity Notification Error (kafka)

Expert Contributor

Hi

I have installed Apache Atlas naively on my laptop in Ubuntu 16.04. I have used the embedded Berkeley DB and elastic option. Atlas is up and running on localhost:21000

Types are created, but entity creation fails when done via Kafka notification. I ran the Storm integration test provided with the storm addon project. I always get the following error.

189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.executor - Processing received message FOR 6 TUPLE: source: count:4, stream: default, id: {}, [bertels, 370]
189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.task - Emitting: globalCount default [2635]
189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: count:4, stream: default, id: {}, [bertels, 370]
189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.executor - Execute done TUPLE source: count:4, stream: default, id: {}, [bertels, 370] TASK: 6 DELTA: 
2017-09-12 16:58:49,456 ERROR - [main:] ~ {"version":{"version":"1.0.0"},"message":{"entities":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345504","version":0,"typeName":"storm_topology","state":"ACTIVE"},"typeName":"storm_topology","values":{"name":"word-count","startTime":"2017-09-12T14:55:46.903Z","outputs":[],"id":"word-count-1-1505228146","inputs":[],"qualifiedName":"word-count","nodes":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345502","version":0,"typeName":"storm_bolt","state":"ACTIVE"},"typeName":"storm_bolt","values":{"name":"globalCount","driverClass":"org.apache.storm.testing.TestGlobalCount","conf":{"TestGlobalCount._count":"0"},"inputs":["count"]},"traitNames":[],"traits":{},"systemAttributes":{}},{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345503","version":0,"typeName":"storm_spout","state":"ACTIVE"},"typeName":"storm_spout","values":{"outputs":["count"],"name":"words","driverClass":"org.apache.storm.testing.TestWordSpout","conf":{"TestWordSpout._isDistributed":"true"}},"traitNames":[],"traits":{},"systemAttributes":{}},{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345501","version":0,"typeName":"storm_bolt","state":"ACTIVE"},"typeName":"storm_bolt","values":{"name":"count","outputs":["globalCount"],"driverClass":"org.apache.storm.topology.BasicBoltExecutor","conf":{},"inputs":["words"]},"traitNames":[],"traits":{},"systemAttributes":{}}],"owner":"arsalan","clusterName":"primary"},"traitNames":[],"traits":{},"systemAttributes":{}}],"type":"ENTITY_CREATE","user":"arsalan"}} (FailedMessagesLogger:95)
189129 [main] ERROR o.a.a.h.AtlasHook - Failed to notify atlas for entity [[{Id='(type: storm_topology, id: <unassigned>)', traits=[], values={outputs=[], owner=arsalan, nodes=[{Id='(type: storm_bolt, id: <unassigned>)', traits=[], values={name=globalCount, conf={TestGlobalCount._count=0}, driverClass=org.apache.storm.testing.TestGlobalCount, inputs=[count]}}, {Id='(type: storm_spout, id: <unassigned>)', traits=[], values={name=words, outputs=[count], conf={TestWordSpout._isDistributed=true}, driverClass=org.apache.storm.testing.TestWordSpout}}, {Id='(type: storm_bolt, id: <unassigned>)', traits=[], values={name=count, outputs=[globalCount], conf={}, driverClass=org.apache.storm.topology.BasicBoltExecutor, inputs=[words]}}], inputs=[], qualifiedName=word-count, clusterName=primary, name=word-count, startTime=2017-09-12T14:55:46.903Z, id=word-count-1-1505228146}}]] after 3 retries. Quitting
org.apache.atlas.notification.NotificationException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
	at org.apache.atlas.kafka.KafkaNotification.sendInternalToProducer(KafkaNotification.java:236) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.kafka.KafkaNotification.sendInternal(KafkaNotification.java:209) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.notification.AbstractNotification.send(AbstractNotification.java:84) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.hook.AtlasHook.notifyEntitiesInternal(AtlasHook.java:133) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:118) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:171) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:105) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.atlas.storm.hook.StormAtlasHook.notify(StormAtlasHook.java:102) [classes/:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.7.0.jar:?]
	at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) [clojure-1.7.0.jar:?]
	at org.apache.storm.LocalCluster$submit_hook.invoke(LocalCluster.clj:45) [storm-core-1.0.0.jar:1.0.0]
	at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:52) [storm-core-1.0.0.jar:1.0.0]
	at org.apache.storm.LocalCluster.submitTopology(Unknown Source) [storm-core-1.0.0.jar:1.0.0]
	at org.apache.atlas.storm.hook.StormTestUtil.submitTopology(StormTestUtil.java:67) [test-classes/:?]
	at org.apache.atlas.storm.hook.StormAtlasHookIT.testAddEntities(StormAtlasHookIT.java:75) [test-classes/:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:80) [testng-6.1.1.jar:?]
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:673) [testng-6.1.1.jar:?]
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:842) [testng-6.1.1.jar:?]
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1166) [testng-6.1.1.jar:?]
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) [testng-6.1.1.jar:?]
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) [testng-6.1.1.jar:?]
	at org.testng.TestRunner.runWorkers(TestRunner.java:1178) [testng-6.1.1.jar:?]
	at org.testng.TestRunner.privateRun(TestRunner.java:757) [testng-6.1.1.jar:?]
	at org.testng.TestRunner.run(TestRunner.java:608) [testng-6.1.1.jar:?]
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:334) [testng-6.1.1.jar:?]
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:329) [testng-6.1.1.jar:?]
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:291) [testng-6.1.1.jar:?]
	at org.testng.SuiteRunner.run(SuiteRunner.java:240) [testng-6.1.1.jar:?]
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) [testng-6.1.1.jar:?]
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86) [testng-6.1.1.jar:?]
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1158) [testng-6.1.1.jar:?]
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1083) [testng-6.1.1.jar:?]
	at org.testng.TestNG.run(TestNG.java:999) [testng-6.1.1.jar:?]
	at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72) [testng-plugin.jar:?]
	at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127) [testng-plugin.jar:?]
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) ~[kafka-clients-0.10.0.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) ~[kafka-clients-0.10.0.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) ~[kafka-clients-0.10.0.0.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) ~[kafka-clients-0.10.0.0.jar:?]
	at org.apache.atlas.kafka.KafkaNotification.sendInternalToProducer(KafkaNotification.java:219) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	... 42 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
189137 [Thread-19-words-executor[8 8]] INFO  o.a.s.d.task - Emitting: words default [nathan]

I tried to write my own addon but I am stuck because of this error. Any suggestions ?