Support Questions
Find answers, ask questions, and share your expertise

Messaging API with Kafka : lineage

Solved Go to solution

Messaging API with Kafka : lineage

Contributor

Hello,

I try to create a Process Entity with Kafka :

{
    "message": {
        "entities": [
            {
                "id": {
                    "id": "-1467290565135246000",
                    "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
                    "state": "ACTIVE",
                    "typeName": "Process",
                    "version": 0
                },
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
                "traitNames": [],
                "traits": {},
                "typeName": "Process",
                "values": {
                    "qualifiedName": "qn_process_cp",
                    "processor_name": "file_copy_",
                    "name": "n_process_cp",
                     "inputs": [{"guid": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208","typeName": "hdfs_path"}],
                     "outputs": [{"guid": "513cd2cd-3139-4baa-a017-16f83ca2b383","typeName": "hdfs_path"}]
                }
            }
        ],
        "type": "ENTITY_CREATE",
        "user": "admin"
    },
    "version": {
        "version": "1.0.0"
    }
}

I have this error message (on Atlas log) :

2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ Set property __modifiedBy = "admin" to vertex[id=163872776 type=Process guid=null] (GraphHelper:413)
2017-07-25 13:24:58,645 DEBUG - [NotificationHookConsumer thread-0:] ~ <== createStructVertex(Process) (EntityGraphMapper:188)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Referenceable) (AtlasGraphUtilsV1:132)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> addProperty(vertex[id=163872776 type=Process guid=null], __superTypeNames, Asset) (AtlasGraphUtilsV1:132)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=null], __guid, f789fb6e-ad0e-4702-b787-e2f5f6a61bdf) (AtlasGraphUtilsV1:141)
2017-07-25 13:24:58,646 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __guid in vertex[id=163872776 type=Process guid=null] (AtlasGraphUtilsV1:159)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> setProperty(vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf], __version, 0) (AtlasGraphUtilsV1:141)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ Setting property __version in vertex[id=163872776 type=Process guid=f789fb6e-ad0e-4702-b787-e2f5f6a61bdf] (AtlasGraphUtilsV1:159)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapAttributes(CREATE, Process) (EntityGraphMapper:211)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasTypeRegistry.getType(Process) (AtlasStructType:82)
2017-07-25 13:24:58,648 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasTypeRegistry.getType(Process): org.apache.atlas.type.AtlasEntityType@75b239ad (AtlasStructType:108)
2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapArrayValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc69927c) (EntityGraphMapper:550)
2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> mapObjectIdValue(org.apache.atlas.repository.store.graph.v1.AttributeMutationContext@fc5b7afb) (EntityGraphMapper:437)
2017-07-25 13:24:58,649 DEBUG - [NotificationHookConsumer thread-0:] ~ <== AtlasErrorCode.getMessage([null]) (AtlasErrorCode:125)
2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132)
2017-07-25 13:24:58,650 WARN  - [NotificationHookConsumer thread-0:] ~ Error handling message (NotificationHookConsumer:325)
org.apache.atlas.exception.AtlasBaseException: ObjectId is not valid null
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapObjectIdValue(EntityGraphMapper.java:455)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapCollectionElementsToVertex(EntityGraphMapper.java:644)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapArrayValue(EntityGraphMapper.java:568)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapToVertexByTypeCategory(EntityGraphMapper.java:318)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttribute(EntityGraphMapper.java:260)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributes(EntityGraphMapper.java:221)
        at org.apache.atlas.repository.store.graph.v1.EntityGraphMapper.mapAttributesAndClassifications(EntityGraphMapper.java:127)
        at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:230)
        at org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1.createOrUpdate(AtlasEntityStoreV1.java:246)
        at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.handleMessage(NotificationHookConsumer.java:266)
        at org.apache.atlas.notification.NotificationHookConsumer$HookConsumer.run(NotificationHookConsumer.java:235)

Atlas version : 0.8

Kafka version : 0.10.1

Any suggestions ?

Thks

1 ACCEPTED SOLUTION

Accepted Solutions

Re: Messaging API with Kafka : lineage

Expert Contributor

@Smart Data

Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.

More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...

The following command works. please let us know if you still see the issue.

echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK

View solution in original post

9 REPLIES 9

Re: Messaging API with Kafka : lineage

Expert Contributor

@Smart Data

This error typically comes when the new entity while being created tries to reference something that should already exist.

2017-07-25 13:24:58,650 DEBUG - [NotificationHookConsumer thread-0:] ~ ==> AtlasErrorCode.getMessage([null]): ObjectId is not valid null (AtlasErrorCode:132)

I would start by checking if the 2 guids referenced exist in Atlas DB before this message gets processed.

"d5c27069-d7b6-4b58-8e61-adbdc4b8d208" & "513cd2cd-3139-4baa-a017-16f83ca2b383"

Re: Messaging API with Kafka : lineage

Contributor

Hi @Ashutosh Mestry

2 entities exist :

513cd2cd-3139-4baa-a017-16f83ca2b383:

{

    "requestId": "pool-2-thread-9 - 8629abee-a74c-43a4-a969-758fd20704e4",
    "definition": {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
        "id": {
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "id": "513cd2cd-3139-4baa-a017-16f83ca2b383",
            "version": 0,
            "typeName": "hdfs_path",
            "state": "ACTIVE"
        },
        "typeName": "hdfs_path",
        "values": {
            "name": "Data_Kafka2",
            "path": "/user/data/dept.csv",
            "posixPermissions": null,
            "createTime": "1970-01-01T00:00:00.000Z",
            "description": "Test with Kafka",
            "isSymlink": false,
            "extendedAttributes": null,
            "numberOfReplicas": 0,
            "qualifiedName": "deptData2",
            "isFile": false,
            "fileSize": 0,
            "owner": "admin",
            "modifiedTime": "1970-01-01T00:00:00.000Z",
            "clusterName": null,
            "group": null
        },
        "traitNames": [
            "Catalog.Technical.IT",
            "Retainable"
        ],
        "traits": {
            "Catalog.Technical.IT": {
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct",
                "typeName": "Catalog.Technical.IT",
                "values": {
                    "name": "Catalog.Technical.IT",
                    "acceptable_use": null,
                    "description": "technical test",
                    "available_as_tag": false,
                    "taxonomy.namespace": "atlas.taxonomy"
                }
            },
            "Retainable": {
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Struct",
                "typeName": "Retainable",
                "values": {
                    "retentionPeriod": 100
                }
            }
        },
        "systemAttributes": {
            "createdBy": "admin",
            "modifiedBy": "admin",
            "createdTime": "2017-06-28T07:32:37.288Z",
            "modifiedTime": "2017-06-29T12:34:30.304Z"
        }
    }

}

d5c27069-d7b6-4b58-8e61-adbdc4b8d208 :

{

    "requestId": "pool-2-thread-6 - e16f006a-962d-45b5-a2b6-6914ec6ea0ab",
    "definition": {
        "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
        "id": {
            "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
            "id": "d5c27069-d7b6-4b58-8e61-adbdc4b8d208",
            "version": 0,
            "typeName": "hdfs_path",
            "state": "ACTIVE"
        },
        "typeName": "hdfs_path",
        "values": {
            "name": "Data_Kafka1",
            "path": "/user/data/dept.csv",
            "posixPermissions": null,
            "createTime": "1970-01-01T00:00:00.000Z",
            "description": "Test with Kafka",
            "isSymlink": false,
            "extendedAttributes": null,
            "numberOfReplicas": 0,
            "qualifiedName": "deptData",
            "isFile": false,
            "fileSize": 0,
            "owner": "admin",
            "modifiedTime": "1970-01-01T00:00:00.000Z",
            "clusterName": null,
            "group": null
        },
        "traitNames": [ ],
        "traits": { },
        "systemAttributes": {
            "createdBy": "admin",
            "modifiedBy": "admin",
            "createdTime": "2017-07-21T07:40:37.357Z",
            "modifiedTime": "2017-07-21T07:52:59.405Z"
        }
    }

}


Re: Messaging API with Kafka : lineage

Expert Contributor

@Smart Data, can you tell some more details on how you create the process entity using Kafka?

Re: Messaging API with Kafka : lineage

Contributor

Hi @Sarath Subramanian

This is my message:

{
    "message": {
        "entities": [
            {
                "id": {
                    "id": "-1467290565135246000",
                    "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Id",
                    "state": "ACTIVE",
                    "typeName": "hdfs_path",
                    "version": 0
                },
                "jsonClass": "org.apache.atlas.typesystem.json.InstanceSerialization$_Reference",
                "traitNames": [],
                "traits": {},
                "typeName": "hdfs_path",
                "values": {
                    "createTime": "1970-01-01T00:00:00.000Z",
                    "description": "Test with Kafka",
                    "extendedAttributes": null,
                    "isSymlink": false,
                    "name": "Data_Kafka",
                    "numberOfReplicas": 0,
                    "owner": "admin",
                    "path": "/user/data/dept.csv",
                    "posixPermissions": null,
                    "qualifiedName": "deptData"
                }
            }
        ],
        "type": "ENTITY_CREATE",
        "user": "admin"
    },
    "version": {
        "version": "1.0.0"
    }
}
 

Re: Messaging API with Kafka : lineage

Expert Contributor
@Smart Data

This is still puzzling. Would it be possible for you create the entity using REST APIs? That way the debug cycle is quick.

I will attempt the same at my end with the entities JSON above. Lets see where that leads us.

Re: Messaging API with Kafka : lineage

Contributor

@Ashutosh Mestry

With REST APIs: it works very well.

For my use case I need to use Kafka messaging.

Re: Messaging API with Kafka : lineage

Expert Contributor

@Smart Data

Kafka messaging still uses v1 structured atlas messages. You have supplied object id references for inputs and outputs attribute values. If you change it to Id format (v1 structure), it works. We are currently in the process of migrating v1 kafka messages to start using v2 structure.

More information on the format of kafka message supported by atlas can be found here: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_data-governance/content/atlas_messaging_...

The following command works. please let us know if you still see the issue.

echo '{"message":{"entities":[{"id":{"id":"-1467290565135246000","jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","state":"ACTIVE","typeName":"Process","version":0},"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","traitNames":[],"traits":{},"typeName":"Process","values":{"qualifiedName":"qn_process_cp","processor_name":"file_copy_","name":"n_process_cp","inputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"b385b0ba-a806-44ae-b551-1a210c0e4c8a","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_a","qualifiedName":"hdfs_path_a","path":"hdfs_path_a"},"traitNames":[],"traits":{},"systemAttributes":{}}],"outputs":[{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference","id":{"jsonClass":"org.apache.atlas.typesystem.json.InstanceSerialization$_Id","id":"746dee24-0820-40e2-922c-759693085a0c","version":0,"typeName":"hdfs_path","state":"ACTIVE"},"typeName":"hdfs_path","values":{"name":"hdfs_path_b","qualifiedName":"hdfs_path_b","path":"hdfs_path_b"},"traitNames":[],"traits":{},"systemAttributes":{}}]}}],"type":"ENTITY_CREATE","user":"admin"},"version":{"version":"1.0.0"}}' | /usr/hdp/2.6.2.0-98/kafka/bin/kafka-console-producer.sh --broker-list localhost:6667 --topic ATLAS_HOOK

View solution in original post

Re: Messaging API with Kafka : lineage

Expert Contributor

@Sarath Subramanian Thanks for the detailed investigation. @Sharmadha Sainath Thanks for your help today!

Re: Messaging API with Kafka : lineage

Contributor

@Sarath Subramanian

Excellent! It works. Your are the best :)

Thks also to @Ashutosh Mestry for helping.