Support Questions

Find answers, ask questions, and share your expertise
Announcements
Celebrating as our community reaches 100,000 members! Thank you!

Messaging API with Kafka : lineage

avatar
Contributor

Hello,

I try to create a Process Entity with Kafka :

{
    "message": {
        "entities": [
            {
                "id": {
                    "id": "-1467290565135246000",
                    "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
                    "state": "ACTIVE",
                    "typeName": "Process",
                    "version": 0
                },
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
                "traitNames": [],
                "traits": {},
                "typeName": "Process",
                "values": {
                    "qualifiedName": "qn_process_cp",
                    "processor_name": "file_copy_",
                    "name": "n_process_cp",
                     "inputs": [{"guid": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208","typeName": "hdfs_path"}],
                     "outputs": [{"guid": "513cd2cd-3139-4baa-a017-16f83ca2b383","typeName": "hdfs_path"}]
                }
            }
        ],
        "type": "ENTITY_CREATE",
        "user": "admin"
    },
    "version": {
        "version": "1.0.0"
    }
}

I have this error message (on Atlas log) :

2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ Set property __modifiedBy = "admin" to vertex[id=163872776 type=Process guid=null] (GraphHelper:413)
2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ <== createStructVertex(Process) (EntityGraphMapper:188)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Referenceable) (AtlasGraphUtilsV1:132)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Asset) (AtlasGraphUtilsV1:132)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=null], __guid, f789fb6e-ad0e-4702-b787-e2f5f6a61bdf) (AtlasGraphUtilsV1:141)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __guid in vertex[id=163872776 type=Process guid=null] (AtlasGraphUtilsV1:159)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf], __version, 0) (AtlasGraphUtilsV1:141)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __version in vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf] (AtlasGraphUtilsV1:159)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapAttributes(CREATE, Process) (EntityGraphMapper:211)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasTypeRegistry.getType(Process) (AtlasStructType:82)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasTypeRegistry.getType(Process): org.apache.atlas.type.AtlasEntityType@75b239ad (AtlasStructType:108)
2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapArrayValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc69927c) (EntityGraphMapper:550)
2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapObjectIdValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc5b7afb) (EntityGraphMapper:437)
2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasErrorCode.getMessage([null]) (AtlasErrorCode:125)
2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132)
2017-07-25 13:24:58,650 WARN  - [NotificationHookConsumer thread-0:] ~ Error handling message (NotificationHookConsumer:325)
org.apache.atlas.exception.AtlasBaseException: ObjectId is not valid null
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapObjectIdValue(EntityGraphMapper.java:455)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapCollectionElementsToVertex(EntityGraphMapper.java:644)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapArrayValue(EntityGraphMapper.java:568)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapToVertexByTypeCategory(EntityGraphMapper.java:318)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttribute(EntityGraphMapper.java:260)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributes(EntityGraphMapper.java:221)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributesAndClassifications(EntityGraphMapper.java:127)
        at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:230)
        at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:246)
        at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.handleMessage(NotificationHookConsumer.java:266)
        at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.run(NotificationHookConsumer.java:235)

Atlas version : 0.8

Kafka version : 0.10.1

Any suggestions ?

Thks

1 ACCEPTED SOLUTION

avatar
Expert Contributor

@Smart Data

Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.

More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...

The following command works. please let us know if you still see the issue.

echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK

View solution in original post

9 REPLIES 9

avatar
Expert Contributor

@Smart Data

This error typically comes when the new entity while being created tries to reference something that should already exist.

2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132)

I would start by checking if the 2 guids referenced exist in Atlas DB before this message gets processed.

"d5c27069-d7b6-4b58-8e61-adbdc4b8d208" & "513cd2cd-3139-4baa-a017-16f83ca2b383"

avatar
Contributor

Hi @Ashutosh Mestry

2 entities exist :

513cd2cd-3139-4baa-a017-16f83ca2b383:

{

    "requestId": "pool-2-thread-9 - 8629abee-a74c-43a4-a969-758fd20704e4",
    "definition": {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
        "id": {
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "id": "513cd2cd-3139-4baa-a017-16f83ca2b383",
            "version": 0,
            "typeName": "hdfs_path",
            "state": "ACTIVE"
        },
        "typeName": "hdfs_path",
        "values": {
            "name": "Data_Kafka2",
            "path": "/user/data/dept.csv",
            "posixPermissions": null,
            "createTime": "1970-01-01T00:00:00.000Z",
            "description": "Test with Kafka",
            "isSymlink": false,
            "extendedAttributes": null,
            "numberOfReplicas": 0,
            "qualifiedName": "deptData2",
            "isFile": false,
            "fileSize": 0,
            "owner": "admin",
            "modifiedTime": "1970-01-01T00:00:00.000Z",
            "clusterName": null,
            "group": null
        },
        "traitNames": [
            "Catalog.Technical.IT",
            "Retainable"
        ],
        "traits": {
            "Catalog.Technical.IT": {
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct",
                "typeName": "Catalog.Technical.IT",
                "values": {
                    "name": "Catalog.Technical.IT",
                    "acceptable_use": null,
                    "description": "technical test",
                    "available_as_tag": false,
                    "taxonomy.namespace": "atlas.taxonomy"
                }
            },
            "Retainable": {
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct",
                "typeName": "Retainable",
                "values": {
                    "retentionPeriod": 100
                }
            }
        },
        "systemAttributes": {
            "createdBy": "admin",
            "modifiedBy": "admin",
            "createdTime": "2017-06-28T07:32:37.288Z",
            "modifiedTime": "2017-06-29T12:34:30.304Z"
        }
    }

}

d5c27069-d7b6-4b58-8e61-adbdc4b8d208 :

{

    "requestId": "pool-2-thread-6 - e16f006a-962d-45b5-a2b6-6914ec6ea0ab",
    "definition": {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
        "id": {
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "id": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208",
            "version": 0,
            "typeName": "hdfs_path",
            "state": "ACTIVE"
        },
        "typeName": "hdfs_path",
        "values": {
            "name": "Data_Kafka1",
            "path": "/user/data/dept.csv",
            "posixPermissions": null,
            "createTime": "1970-01-01T00:00:00.000Z",
            "description": "Test with Kafka",
            "isSymlink": false,
            "extendedAttributes": null,
            "numberOfReplicas": 0,
            "qualifiedName": "deptData",
            "isFile": false,
            "fileSize": 0,
            "owner": "admin",
            "modifiedTime": "1970-01-01T00:00:00.000Z",
            "clusterName": null,
            "group": null
        },
        "traitNames": [ ],
        "traits": { },
        "systemAttributes": {
            "createdBy": "admin",
            "modifiedBy": "admin",
            "createdTime": "2017-07-21T07:40:37.357Z",
            "modifiedTime": "2017-07-21T07:52:59.405Z"
        }
    }

}


avatar
Expert Contributor

@Smart Data, can you tell some more details on how you create the process entity using Kafka?

avatar
Contributor

Hi @Sarath Subramanian

This is my message:

{
    "message": {
        "entities": [
            {
                "id": {
                    "id": "-1467290565135246000",
                    "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
                    "state": "ACTIVE",
                    "typeName": "hdfs_path",
                    "version": 0
                },
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
                "traitNames": [],
                "traits": {},
                "typeName": "hdfs_path",
                "values": {
                    "createTime": "1970-01-01T00:00:00.000Z",
                    "description": "Test with Kafka",
                    "extendedAttributes": null,
                    "isSymlink": false,
                    "name": "Data_Kafka",
                    "numberOfReplicas": 0,
                    "owner": "admin",
                    "path": "/user/data/dept.csv",
                    "posixPermissions": null,
                    "qualifiedName": "deptData"
                }
            }
        ],
        "type": "ENTITY_CREATE",
        "user": "admin"
    },
    "version": {
        "version": "1.0.0"
    }
}
 

avatar
Expert Contributor
@Smart Data

This is still puzzling. Would it be possible for you create the entity using REST APIs? That way the debug cycle is quick.

I will attempt the same at my end with the entities JSON above. Lets see where that leads us.

avatar
Contributor

@Ashutosh Mestry

With REST APIs: it works very well.

For my use case I need to use Kafka messaging.

avatar
Expert Contributor

@Smart Data

Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.

More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...

The following command works. please let us know if you still see the issue.

echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK

avatar
Expert Contributor

@Sarath Subramanian Thanks for the detailed investigation. @Sharmadha Sainath Thanks for your help today!

avatar
Contributor

@Sarath Subramanian

Excellent! It works. Your are the best 🙂

Thks also to @Ashutosh Mestry for helping.