Created 07-25-2017 01:47 PM
Hello,
I try to create a Process Entity with Kafka :
{ "message": { "entities": [ { "id": { "id": "-1467290565135246000", "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "state": "ACTIVE", "typeName": "Process", "version": 0 }, "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "traitNames": [], "traits": {}, "typeName": "Process", "values": { "qualifiedName": "qn_process_cp", "processor_name": "file_copy_", "name": "n_process_cp", "inputs": [{"guid": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208","typeName": "hdfs_path"}], "outputs": [{"guid": "513cd2cd-3139-4baa-a017-16f83ca2b383","typeName": "hdfs_path"}] } } ], "type": "ENTITY_CREATE", "user": "admin" }, "version": { "version": "1.0.0" } }
I have this error message (on Atlas log) :
2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ Set property __modifiedBy = "admin" to vertex[id=163872776 type=Process guid=null] (GraphHelper:413) 2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ <== createStructVertex(Process) (EntityGraphMapper:188) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Referenceable) (AtlasGraphUtilsV1:132) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Asset) (AtlasGraphUtilsV1:132) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=null], __guid, f789fb6e-ad0e-4702-b787-e2f5f6a61bdf) (AtlasGraphUtilsV1:141) 2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __guid in vertex[id=163872776 type=Process guid=null] (AtlasGraphUtilsV1:159) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf], __version, 0) (AtlasGraphUtilsV1:141) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __version in vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf] (AtlasGraphUtilsV1:159) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapAttributes(CREATE, Process) (EntityGraphMapper:211) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasTypeRegistry.getType(Process) (AtlasStructType:82) 2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasTypeRegistry.getType(Process): org.apache.atlas.type.AtlasEntityType@75b239ad (AtlasStructType:108) 2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapArrayValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc69927c) (EntityGraphMapper:550) 2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapObjectIdValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc5b7afb) (EntityGraphMapper:437) 2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasErrorCode.getMessage([null]) (AtlasErrorCode:125) 2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132) 2017-07-25 13:24:58,650 WARN - [NotificationHookConsumer thread-0:] ~ Error handling message (NotificationHookConsumer:325) org.apache.atlas.exception.AtlasBaseException: ObjectId is not valid null at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapObjectIdValue(EntityGraphMapper.java:455) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapCollectionElementsToVertex(EntityGraphMapper.java:644) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapArrayValue(EntityGraphMapper.java:568) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapToVertexByTypeCategory(EntityGraphMapper.java:318) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttribute(EntityGraphMapper.java:260) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributes(EntityGraphMapper.java:221) at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributesAndClassifications(EntityGraphMapper.java:127) at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:230) at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:246) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.handleMessage(NotificationHookConsumer.java:266) at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.run(NotificationHookConsumer.java:235)
Atlas version : 0.8
Kafka version : 0.10.1
Any suggestions ?
Thks
Created 07-27-2017 06:54 PM
Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.
More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...
The following command works. please let us know if you still see the issue.
echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK
Created 07-25-2017 03:55 PM
This error typically comes when the new entity while being created tries to reference something that should already exist.
2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132)
I would start by checking if the 2 guids referenced exist in Atlas DB before this message gets processed.
"d5c27069-d7b6-4b58-8e61-adbdc4b8d208" & "513cd2cd-3139-4baa-a017-16f83ca2b383"
Created 07-26-2017 07:13 AM
2 entities exist :
513cd2cd-3139-4baa-a017-16f83ca2b383:
{ "requestId": "pool-2-thread-9 - 8629abee-a74c-43a4-a969-758fd20704e4", "definition": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "id": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "id": "513cd2cd-3139-4baa-a017-16f83ca2b383", "version": 0, "typeName": "hdfs_path", "state": "ACTIVE" }, "typeName": "hdfs_path", "values": { "name": "Data_Kafka2", "path": "/user/data/dept.csv", "posixPermissions": null, "createTime": "1970-01-01T00:00:00.000Z", "description": "Test with Kafka", "isSymlink": false, "extendedAttributes": null, "numberOfReplicas": 0, "qualifiedName": "deptData2", "isFile": false, "fileSize": 0, "owner": "admin", "modifiedTime": "1970-01-01T00:00:00.000Z", "clusterName": null, "group": null }, "traitNames": [ "Catalog.Technical.IT", "Retainable" ], "traits": { "Catalog.Technical.IT": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct", "typeName": "Catalog.Technical.IT", "values": { "name": "Catalog.Technical.IT", "acceptable_use": null, "description": "technical test", "available_as_tag": false, "taxonomy.namespace": "atlas.taxonomy" } }, "Retainable": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct", "typeName": "Retainable", "values": { "retentionPeriod": 100 } } }, "systemAttributes": { "createdBy": "admin", "modifiedBy": "admin", "createdTime": "2017-06-28T07:32:37.288Z", "modifiedTime": "2017-06-29T12:34:30.304Z" } } }
d5c27069-d7b6-4b58-8e61-adbdc4b8d208 :
{ "requestId": "pool-2-thread-6 - e16f006a-962d-45b5-a2b6-6914ec6ea0ab", "definition": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "id": { "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "id": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208", "version": 0, "typeName": "hdfs_path", "state": "ACTIVE" }, "typeName": "hdfs_path", "values": { "name": "Data_Kafka1", "path": "/user/data/dept.csv", "posixPermissions": null, "createTime": "1970-01-01T00:00:00.000Z", "description": "Test with Kafka", "isSymlink": false, "extendedAttributes": null, "numberOfReplicas": 0, "qualifiedName": "deptData", "isFile": false, "fileSize": 0, "owner": "admin", "modifiedTime": "1970-01-01T00:00:00.000Z", "clusterName": null, "group": null }, "traitNames": [ ], "traits": { }, "systemAttributes": { "createdBy": "admin", "modifiedBy": "admin", "createdTime": "2017-07-21T07:40:37.357Z", "modifiedTime": "2017-07-21T07:52:59.405Z" } } }
Created 07-25-2017 03:57 PM
@Smart Data, can you tell some more details on how you create the process entity using Kafka?
Created 07-26-2017 06:41 AM
This is my message:
{ "message": { "entities": [ { "id": { "id": "-1467290565135246000", "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id", "state": "ACTIVE", "typeName": "hdfs_path", "version": 0 }, "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference", "traitNames": [], "traits": {}, "typeName": "hdfs_path", "values": { "createTime": "1970-01-01T00:00:00.000Z", "description": "Test with Kafka", "extendedAttributes": null, "isSymlink": false, "name": "Data_Kafka", "numberOfReplicas": 0, "owner": "admin", "path": "/user/data/dept.csv", "posixPermissions": null, "qualifiedName": "deptData" } } ], "type": "ENTITY_CREATE", "user": "admin" }, "version": { "version": "1.0.0" } }
Created 07-26-2017 04:25 PM
This is still puzzling. Would it be possible for you create the entity using REST APIs? That way the debug cycle is quick.
I will attempt the same at my end with the entities JSON above. Lets see where that leads us.
Created 07-27-2017 07:35 AM
Created 07-27-2017 06:54 PM
Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.
More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...
The following command works. please let us know if you still see the issue.
echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK
Created 07-27-2017 07:35 PM
@Sarath Subramanian Thanks for the detailed investigation. @Sharmadha Sainath Thanks for your help today!
Created 07-28-2017 05:37 AM
Excellent! It works. Your are the best 🙂
Thks also to @Ashutosh Mestry for helping.