Support Questions

Find answers, ask questions, and share your expertise

Has anyone sucessfully written an Atlas Hook? (kafka timeout error)

avatar
Expert Contributor

Hi

I have installed atlas natively on ubuntu (not in HDP). I want to write a hook to import data to Atlas. I am also using Berkely DB for the storage. I am facing the following error:

189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.executor - Processing received message FOR 6 TUPLE: source: count:4, stream: default, id: {}, [bertels, 370]189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.task - Emitting: globalCount default [2635]189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.executor - BOLT ack TASK: 6 TIME:  TUPLE: source: count:4, stream: default, id: {}, [bertels, 370]189115 [Thread-29-globalCount-executor[6 6]] INFO  o.a.s.d.executor - Execute done TUPLE source: count:4, stream: default, id: {}, [bertels, 370] TASK: 6 DELTA: 2017-09-12 16:58:49,456 ERROR - [main:] ~ {"version":{"version":"1.0.0"},"message":{"entities":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345504","version":0,"typeName":"storm_topology","state":"ACTIVE"},"typeName":"storm_topology","values":{"name":"word-count","startTime":"2017-09-12T14:55:46.903Z","outputs":[],"id":"word-count-1-1505228146","inputs":[],"qualifiedName":"word-count","nodes":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345502","version":0,"typeName":"storm_bolt","state":"ACTIVE"},"typeName":"storm_bolt","values":{"name":"globalCount","driverClass":"org.apache.storm.testing.TestGlobalCount","conf":{"TestGlobalCount._count":"0"},"inputs":["count"]},"traitNames":[],"traits":{},"systemAttributes":{}},{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345503","version":0,"typeName":"storm_spout","state":"ACTIVE"},"typeName":"storm_spout","values":{"outputs":["count"],"name":"words","driverClass":"org.apache.storm.testing.TestWordSpout","conf":{"TestWordSpout._isDistributed":"true"}},"traitNames":[],"traits":{},"systemAttributes":{}},{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"-88302553345501","version":0,"typeName":"storm_bolt","state":"ACTIVE"},"typeName":"storm_bolt","values":{"name":"count","outputs":["globalCount"],"driverClass":"org.apache.storm.topology.BasicBoltExecutor","conf":{},"inputs":["words"]},"traitNames":[],"traits":{},"systemAttributes":{}}],"owner":"arsalan","clusterName":"primary"},"traitNames":[],"traits":{},"systemAttributes":{}}],"type":"ENTITY_CREATE","user":"arsalan"}} (FailedMessagesLogger:95)189129 [main] ERROR o.a.a.h.AtlasHook - Failed to notify atlas for entity [[{Id='(type: storm_topology, id: <unassigned>)', traits=[], values={outputs=[], owner=arsalan, nodes=[{Id='(type: storm_bolt, id: <unassigned>)', traits=[], values={name=globalCount, conf={TestGlobalCount._count=0}, driverClass=org.apache.storm.testing.TestGlobalCount, inputs=[count]}}, {Id='(type: storm_spout, id: <unassigned>)', traits=[], values={name=words, outputs=[count], conf={TestWordSpout._isDistributed=true}, driverClass=org.apache.storm.testing.TestWordSpout}}, {Id='(type: storm_bolt, id: <unassigned>)', traits=[], values={name=count, outputs=[globalCount], conf={}, driverClass=org.apache.storm.topology.BasicBoltExecutor, inputs=[words]}}], inputs=[], qualifiedName=word-count, clusterName=primary, name=word-count, startTime=2017-09-12T14:55:46.903Z, id=word-count-1-1505228146}}]] after 3 retries. Quittingorg.apache.atlas.notification.NotificationException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.	at org.apache.atlas.kafka.KafkaNotification.sendInternalToProducer(KafkaNotification.java:236) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.kafka.KafkaNotification.sendInternal(KafkaNotification.java:209) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.notification.AbstractNotification.send(AbstractNotification.java:84) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.hook.AtlasHook.notifyEntitiesInternal(AtlasHook.java:133) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:118) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:171) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.hook.AtlasHook.notifyEntities(AtlasHook.java:105) [atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	at org.apache.atlas.storm.hook.StormAtlasHook.notify(StormAtlasHook.java:102) [classes/:?]	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]	at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.7.0.jar:?]	at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) [clojure-1.7.0.jar:?]	at org.apache.storm.LocalCluster$submit_hook.invoke(LocalCluster.clj:45) [storm-core-1.0.0.jar:1.0.0]	at org.apache.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:52) [storm-core-1.0.0.jar:1.0.0]	at org.apache.storm.LocalCluster.submitTopology(Unknown Source) [storm-core-1.0.0.jar:1.0.0]	at org.apache.atlas.storm.hook.StormTestUtil.submitTopology(StormTestUtil.java:67) [test-classes/:?]	at org.apache.atlas.storm.hook.StormAtlasHookIT.testAddEntities(StormAtlasHookIT.java:75) [test-classes/:?]	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:80) [testng-6.1.1.jar:?]	at org.testng.internal.Invoker.invokeMethod(Invoker.java:673) [testng-6.1.1.jar:?]	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:842) [testng-6.1.1.jar:?]	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1166) [testng-6.1.1.jar:?]	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125) [testng-6.1.1.jar:?]	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109) [testng-6.1.1.jar:?]	at org.testng.TestRunner.runWorkers(TestRunner.java:1178) [testng-6.1.1.jar:?]	at org.testng.TestRunner.privateRun(TestRunner.java:757) [testng-6.1.1.jar:?]	at org.testng.TestRunner.run(TestRunner.java:608) [testng-6.1.1.jar:?]	at org.testng.SuiteRunner.runTest(SuiteRunner.java:334) [testng-6.1.1.jar:?]	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:329) [testng-6.1.1.jar:?]	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:291) [testng-6.1.1.jar:?]	at org.testng.SuiteRunner.run(SuiteRunner.java:240) [testng-6.1.1.jar:?]	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52) [testng-6.1.1.jar:?]	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86) [testng-6.1.1.jar:?]	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1158) [testng-6.1.1.jar:?]	at org.testng.TestNG.runSuitesLocally(TestNG.java:1083) [testng-6.1.1.jar:?]	at org.testng.TestNG.run(TestNG.java:999) [testng-6.1.1.jar:?]	at org.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:72) [testng-plugin.jar:?]	at org.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:127) [testng-plugin.jar:?]Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) ~[kafka-clients-0.10.0.0.jar:?]	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) ~[kafka-clients-0.10.0.0.jar:?]	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) ~[kafka-clients-0.10.0.0.jar:?]	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) ~[kafka-clients-0.10.0.0.jar:?]	at org.apache.atlas.kafka.KafkaNotification.sendInternalToProducer(KafkaNotification.java:219) ~[atlas-notification-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]	... 42 moreCaused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.189137 [Thread-19-words-executor[8 8]] INFO  o.a.s.d.task - Emitting: words default [nathan] 

Even when I try to run the test case for the builtin Hook for Storm I get the same error. Any idea how to get past this?

1 ACCEPTED SOLUTION

avatar
Expert Contributor

thanks for the reply.

@anaik

Natively means that I have built and installed Atlas directly from the website on my laptop.

@Ashutosh Mestry

I am writing a hook for spark. Spark already has a very good notification mechanism via the event listeners. I would want to capture these events real time to send data to Atlas. Since I want to write a hook so I am not going to be focusing on the REST API.

I thought it would be a good idea to have a look at the storm bridge and run the test to better understand how can something similar be achieved for Spark.

I am also using intellij for the development. I imported the Strom hook project (located at atlas_home/addons/storm_bridge) into intelliJ and tried running the test case it comes with. I get the metadata timeout error from kafka. Atlas is starts successfully. The problem arises when sending a notification to kafka topics.

I was able to solve the issue. It turns out I needed to add the atlas.properties file to the project.

One last thing: I see that whenever atlas starts it does a type init :

2017-09-27 00:09:26,564 INFO  - [main:] ~ ==> AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs() (AtlasTypeDefStoreInitializer:109)
2017-09-27 00:09:26,572 INFO  - [main:] ~ ==> AtlasTypeDefStoreInitializer(/usr/hdp/2.6.2.0-162/atlas/models) (AtlasTypeDefStoreInitializer:146)
2017-09-27 00:09:26,602 INFO  - [main:] ~ No type in file /usr/hdp/2.6.2.0-162/atlas/models/-worker.log.0.current (AtlasTypeDefStoreInitializer:165)
2017-09-27 00:09:26,665 INFO  - [main:] ~ No new type in file /usr/hdp/2.6.2.0-162/atlas/models/0010-base_model.json (AtlasTypeDefStoreInitializer:178)
2017-09-27 00:09:26,669 INFO  - [main:] ~ No new type in file /usr/hdp/2.6.2.0-162/atlas/models/0020-fs_model.json (AtlasTypeDefStoreInitializer:178)
2017-09-27 00:09:26,673 INFO  - [main:] ~ No new type in file /usr/hdp/2.6.2.0-162/atlas/models/0030-hive_model.json (AtlasTypeDefStoreInitializer:178)

So I can simply add my Spark Types here at the same location and they will also be automatically loaded at startup.

In my case the location is:

/home/arsalan/Development/atlas/distro/target/apache-atlas-0.9-SNAPSHOT-bin/apache-atlas-0.9-SNAPSHOT/models/

However I notice a bit difference in the structure, say for example if we have a look at the 1080-storm_model.json file it defines the types under "entityDefs". As far as i know from the Type system the types should have been defined under the ClassType tag like shown in this tutorial: Atlas Custom Type Creation File located at : Type Defination Json Github Link

Any idea why the discrepancy?

View solution in original post

6 REPLIES 6

avatar
Expert Contributor

avatar
Expert Contributor

What are the setup steps you followed for atlas ? There's some mention of TestNG and StormIT in the logs, not sure what kind of testing you're doing Atlas.

If your atlas started successfully, can you share the logs ? Also what do you mean by installed atlas "natively" on ubuntu ?

avatar
Expert Contributor
@Arsalan Siddiqi

Thanks for reaching out.

If you could clarify few items below, it will help:

  • What is the purpose of hook? Hook are one of the ways to get data into Atlas. They are used in cases where the producer of data has well defined mechanism of sending notifications about their data. Atlas leverages that. In your case, does your producer have this in place?
  • If your producer does not have good notification mechanism in place, you could consider writing a small application that would enumerate the data and then use Atlas' REST APIs to update data to Atlas.

We use IntelliJ for development. There are few setup steps needed if you need to use integrated debugging via IntelliJ. Let me know if that is the case.

Attached are logs for a successful Atlas startup.

applicationlog.zip

avatar
Expert Contributor

thanks for the reply.

@anaik

Natively means that I have built and installed Atlas directly from the website on my laptop.

@Ashutosh Mestry

I am writing a hook for spark. Spark already has a very good notification mechanism via the event listeners. I would want to capture these events real time to send data to Atlas. Since I want to write a hook so I am not going to be focusing on the REST API.

I thought it would be a good idea to have a look at the storm bridge and run the test to better understand how can something similar be achieved for Spark.

I am also using intellij for the development. I imported the Strom hook project (located at atlas_home/addons/storm_bridge) into intelliJ and tried running the test case it comes with. I get the metadata timeout error from kafka. Atlas is starts successfully. The problem arises when sending a notification to kafka topics.

I was able to solve the issue. It turns out I needed to add the atlas.properties file to the project.

One last thing: I see that whenever atlas starts it does a type init :

2017-09-27 00:09:26,564 INFO  - [main:] ~ ==> AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs() (AtlasTypeDefStoreInitializer:109)
2017-09-27 00:09:26,572 INFO  - [main:] ~ ==> AtlasTypeDefStoreInitializer(/usr/hdp/2.6.2.0-162/atlas/models) (AtlasTypeDefStoreInitializer:146)
2017-09-27 00:09:26,602 INFO  - [main:] ~ No type in file /usr/hdp/2.6.2.0-162/atlas/models/-worker.log.0.current (AtlasTypeDefStoreInitializer:165)
2017-09-27 00:09:26,665 INFO  - [main:] ~ No new type in file /usr/hdp/2.6.2.0-162/atlas/models/0010-base_model.json (AtlasTypeDefStoreInitializer:178)
2017-09-27 00:09:26,669 INFO  - [main:] ~ No new type in file /usr/hdp/2.6.2.0-162/atlas/models/0020-fs_model.json (AtlasTypeDefStoreInitializer:178)
2017-09-27 00:09:26,673 INFO  - [main:] ~ No new type in file /usr/hdp/2.6.2.0-162/atlas/models/0030-hive_model.json (AtlasTypeDefStoreInitializer:178)

So I can simply add my Spark Types here at the same location and they will also be automatically loaded at startup.

In my case the location is:

/home/arsalan/Development/atlas/distro/target/apache-atlas-0.9-SNAPSHOT-bin/apache-atlas-0.9-SNAPSHOT/models/

However I notice a bit difference in the structure, say for example if we have a look at the 1080-storm_model.json file it defines the types under "entityDefs". As far as i know from the Type system the types should have been defined under the ClassType tag like shown in this tutorial: Atlas Custom Type Creation File located at : Type Defination Json Github Link

Any idea why the discrepancy?

avatar
Expert Contributor

Your observation about the models is correct, if you create a JSON and place it under the models directory it'll be picked up during atlas startup. The links you're looking at for new model implementation are referring to V1 model (which is no longer being used), please refer to http://atlas.apache.org/api/v2/index.html#syntax_json for the new JSON structures.

Let me know if you need more help on that.

avatar
Expert Contributor
@Arsalan Siddiqi

Your observation about the JSON is accurate. The JSON you see in the sample is represented in the old format. We now use the new format referred to as V2.

The V2 format is easy to understand as it is a JSON representation of the Java class. This is much easy to code compared to earlier approach.

I am attaching atlas-application.properties file that I use on IntelliJ development. atlas-applicationproperties.zip

Hope this helps.