- Subscribe to RSS Feed
- Mark Question as New
- Mark Question as Read
- Float this Question for Current User
- Bookmark
- Subscribe
- Mute
- Printer Friendly Page
Messaging API with Kafka : lineage
- Labels:
-
Apache Atlas
-
Apache Kafka
Created 07-25-2017 01:47 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hello,
I try to create a Process Entity with Kafka :
{ "message": { "entities": [ { "id": { "id": "-1467290565135246000", "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "state": "ACTIVE", "typeName": "Process", "version": 0 }, "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "traitNames": [], "traits": {}, "typeName": "Process", "values": { "qualifiedName": "qn_process_cp", "processor_name": "file_copy_", "name": "n_process_cp", "inputs": [{"guid": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208","typeName": "hdfs_path"}], "outputs": [{"guid": "513cd2cd-3139-4baa-a017-16f83ca2b383","typeName": "hdfs_path"}] } } ], "type": "ENTITY_CREATE", "user": "admin" }, "version": { "version": "1.0.0" } }
I have this error message (on Atlas log) :
2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ Set property __modifiedBy = "admin" to vertex[id=163872776 type=Process guid=null] (GraphHelper:413) 2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ <== createStructVertex(Process) (EntityGraphMapper:188) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Referenceable) (AtlasGraphUtilsV1:132) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Asset) (AtlasGraphUtilsV1:132) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=null], __guid, f789fb6e-ad0e-4702-b787-e2f5f6a61bdf) (AtlasGraphUtilsV1:141) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __guid in vertex[id=163872776 type=Process guid=null] (AtlasGraphUtilsV1:159) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf], __version, 0) (AtlasGraphUtilsV1:141) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __version in vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf] (AtlasGraphUtilsV1:159) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapAttributes(CREATE, Process) (EntityGraphMapper:211) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasTypeRegistry.getType(Process) (AtlasStructType:82) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasTypeRegistry.getType(Process): org.apache.atlas.type.AtlasEntityType@75b239ad (AtlasStructType:108) 2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapArrayValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc69927c) (EntityGraphMapper:550) 2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapObjectIdValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc5b7afb) (EntityGraphMapper:437) 2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasErrorCode.getMessage([null]) (AtlasErrorCode:125) 2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132) 2017-07-25 13:24:58,650 WARN - [NotificationHookConsumer thread-0:] ~ Error handling message (NotificationHookConsumer:325) org.apache.atlas.exception.AtlasBaseException: ObjectId is not valid null at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapObjectIdValue(EntityGraphMapper.java:455) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapCollectionElementsToVertex(EntityGraphMapper.java:644) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapArrayValue(EntityGraphMapper.java:568) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapToVertexByTypeCategory(EntityGraphMapper.java:318) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttribute(EntityGraphMapper.java:260) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributes(EntityGraphMapper.java:221) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributesAndClassifications(EntityGraphMapper.java:127) at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:230) at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:246) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.handleMessage(NotificationHookConsumer.java:266) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.run(NotificationHookConsumer.java:235)
Atlas version : 0.8
Kafka version : 0.10.1
Any suggestions ?
Thks
Created 07-27-2017 06:54 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.
More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...
The following command works. please let us know if you still see the issue.
echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK
Created 07-25-2017 03:55 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This error typically comes when the new entity while being created tries to reference something that should already exist.
2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132)
I would start by checking if the 2 guids referenced exist in Atlas DB before this message gets processed.
"d5c27069-d7b6-4b58-8e61-adbdc4b8d208" & "513cd2cd-3139-4baa-a017-16f83ca2b383"
Created 07-26-2017 07:13 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
2 entities exist :
513cd2cd-3139-4baa-a017-16f83ca2b383:
{ "requestId": "pool-2-thread-9 - 8629abee-a74c-43a4-a969-758fd20704e4", "definition": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "id": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "id": "513cd2cd-3139-4baa-a017-16f83ca2b383", "version": 0, "typeName": "hdfs_path", "state": "ACTIVE" }, "typeName": "hdfs_path", "values": { "name": "Data_Kafka2", "path": "/user/data/dept.csv", "posixPermissions": null, "createTime": "1970-01-01T00:00:00.000Z", "description": "Test with Kafka", "isSymlink": false, "extendedAttributes": null, "numberOfReplicas": 0, "qualifiedName": "deptData2", "isFile": false, "fileSize": 0, "owner": "admin", "modifiedTime": "1970-01-01T00:00:00.000Z", "clusterName": null, "group": null }, "traitNames": [ "Catalog.Technical.IT", "Retainable" ], "traits": { "Catalog.Technical.IT": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct", "typeName": "Catalog.Technical.IT", "values": { "name": "Catalog.Technical.IT", "acceptable_use": null, "description": "technical test", "available_as_tag": false, "taxonomy.namespace": "atlas.taxonomy" } }, "Retainable": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct", "typeName": "Retainable", "values": { "retentionPeriod": 100 } } }, "systemAttributes": { "createdBy": "admin", "modifiedBy": "admin", "createdTime": "2017-06-28T07:32:37.288Z", "modifiedTime": "2017-06-29T12:34:30.304Z" } } }
d5c27069-d7b6-4b58-8e61-adbdc4b8d208 :
{ "requestId": "pool-2-thread-6 - e16f006a-962d-45b5-a2b6-6914ec6ea0ab", "definition": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "id": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "id": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208", "version": 0, "typeName": "hdfs_path", "state": "ACTIVE" }, "typeName": "hdfs_path", "values": { "name": "Data_Kafka1", "path": "/user/data/dept.csv", "posixPermissions": null, "createTime": "1970-01-01T00:00:00.000Z", "description": "Test with Kafka", "isSymlink": false, "extendedAttributes": null, "numberOfReplicas": 0, "qualifiedName": "deptData", "isFile": false, "fileSize": 0, "owner": "admin", "modifiedTime": "1970-01-01T00:00:00.000Z", "clusterName": null, "group": null }, "traitNames": [ ], "traits": { }, "systemAttributes": { "createdBy": "admin", "modifiedBy": "admin", "createdTime": "2017-07-21T07:40:37.357Z", "modifiedTime": "2017-07-21T07:52:59.405Z" } } }
Created 07-25-2017 03:57 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Smart Data, can you tell some more details on how you create the process entity using Kafka?
Created 07-26-2017 06:41 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This is my message:
{ "message": { "entities": [ { "id": { "id": "-1467290565135246000", "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "state": "ACTIVE", "typeName": "hdfs_path", "version": 0 }, "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "traitNames": [], "traits": {}, "typeName": "hdfs_path", "values": { "createTime": "1970-01-01T00:00:00.000Z", "description": "Test with Kafka", "extendedAttributes": null, "isSymlink": false, "name": "Data_Kafka", "numberOfReplicas": 0, "owner": "admin", "path": "/user/data/dept.csv", "posixPermissions": null, "qualifiedName": "deptData" } } ], "type": "ENTITY_CREATE", "user": "admin" }, "version": { "version": "1.0.0" } }
Created 07-26-2017 04:25 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
This is still puzzling. Would it be possible for you create the entity using REST APIs? That way the debug cycle is quick.
I will attempt the same at my end with the entities JSON above. Lets see where that leads us.
Created 07-27-2017 07:35 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Created 07-27-2017 06:54 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.
More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...
The following command works. please let us know if you still see the issue.
echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK
Created 07-27-2017 07:35 PM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
@Sarath Subramanian Thanks for the detailed investigation. @Sharmadha Sainath Thanks for your help today!
Created 07-28-2017 05:37 AM
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Excellent! It works. Your are the best 🙂
Thks also to @Ashutosh Mestry for helping.
