Member since
12-14-2021
4
Posts
0
Kudos Received
0
Solutions
11-19-2024
12:45 PM
Introduction Let's be clear up front: by "tactical", I mean, "not recommended for production use" 🙂 There are several better ways to achieve code re-use in Cloudera Data Engineering, including adding custom operators and libraries at the Virtual Cluster level, and @VishRajagopalan's CDE Dynamic Package Loader. However, if you just need something more "quick and dirty", then read on to learn how to load Python code from Resources in Cloudera Data Engineering. To quote the official documentation: A Resource in Cloudera Data Engineering (CDE) is a named collection of files used by a job or a session. Resources can include application code, configuration files, custom Docker images, and Python virtual environment specifications (requirements.txt). The key to understanding how to leverage Resources to provide custom code to your Airflow Jobs in Cloudera Data Engineering is to know how exactly Resources are made available at run-time. Once you know that, the rest is just "normal" Python path manipulation. TL;DR Resources are mounted at /app/mount/<my_resource_name>. There you go, that's the key bit of CDE-specific knowledge 🙂 The rest follows naturally; but, I'll provide a bit more of the background context... Background As you might know, the way Cloudera Data Engineering achieves its dynamic autoscaling capability is through its cloud-native architecture, which practically in this case means making use of containerization (as enabled by Apache Yunikorn and a bunch of other cool open source stuff). As is usual in the world of containerization, the way file/dir-like resources are made available to running containers is via "mounting". Basically, in the configuration for your container, you say, "whenever this container runs, make <thing_abc> available in the filesystem inside the container at filepath </x/y/z>". In the case at hand, the "thing" that we want to make available to Airflow when it runs in a containerized fashion within Cloudera Data Engineering is a Resource which we have created separately. And as noted above, for Resources mounted into Airflow containers in CDE, they will be made available inside the running container's filesystem under /app/mount/. There's nothing "special" about that location—in a technical sense, it is arbitrary—but it's a natural place that's easy enough to remember, once you know that's how it works 🙂 For details on how to specify that a given Job should run with a given Resource, read the documentation Updating the Airflow file mounts using the CDE CLI [Technical Preview]. When you do so, you will note that you can override the subdir under /app/mount/ at which your Resource will be mounted, by using the --airflow-file-mount-n-prefix flag. This is useful when your Resource has been created with A Very Long And Wordy Name, or if you have Resources the names of which may clash when mounted to the Job container. Example Below, we will go through a worked example, showing how we can use the above knowledge to load arbitrary code from Resources into your Airflow Jobs in CDE. In this example the loaded code is not doing anything particularly useful—we're simply loading a variable from a module—but this shows how the functionality works, and should give you all the understanding you need to extend this to your own use-cases. A timely warning 😬 ⚠ WARNING: When you read "load arbitrary code" in the paragraph above, the hair on the back of your neck should have stood up instinctively, motivating you to splash holy water at your screen. Any functionality allowing you to load and run arbitrary code at run-time is a potential security risk! In this case, by loading code from a Resource, you are giving the authority of the user who runs your CDE Job to also run whatever code is in said Resource. Who has access to the Resource? Could someone else have snuck in and uploaded malicious code while your back was turned!? 👺 This is one major reason why I have described the method in this article as "tactical" and "quick and dirty". As a technologist, it is incumbent upon you to always consider the security implications of your work. So, with that warning out of the way... Steps 1. Create your Python module This is the code that we will upload to the CDE Resource, which will ultimately be called from your Airflow Job. For this example, we will literally just define a literal: 2. Create the Resource in CDE Go into your CDE Virtual Cluster, and create a new resource. Note there are different "Types" of resources which can be created, and the public documentation only describes how to create a "Python Environment" type resource (because this is slightly more nuanced). In our case, we are just making a "Files" type resource, which is trivially straightforward. Note that the Resource Name used here will form part of the actual path from which we will load our code later! I am naming it simply "my_python_resource". Once the Resource is created, create a "mymodule" subdir inside the resource, and upload our one-line __init__.py within that subdir: 3. Create the Airflow Job in CDE There are various ways to do this; in this case, I chose to use the Pipeline UI, a visual editor for Airflow DAGs built-in to the CDE interface. The important thing is that our DAG contains a Python Script operator. This is where the (really basic) "magic" happens: we manipulate our Python path to include the special mount location where we know our Resource files will be available. Once that's done, we can import the variable from our amazing module, and use it 🙂 Here's the code: import sys sys.path.append("/app/mount/my_python_module") from mymodule import GREETING print(GREETING) Make sure to hit Save in the Pipeline UI editor, and wait for the success toast: 4. Configure the Airflow Job to use the Resource Currently, as far as I can tell, this can only be done using the CDE CLI. Power-users of CDE are likely using this rather nifty tool already. For others: consider this an opportunity to level-up your way of working with CDE 🙂 If you're going to be building at scale, you're going to want to use programmatic tools and APIs for automation—ideally, implementing a production-grade Continuous Integration / Continuous Deployment approach to your data engineering work—and the CDE CLI is a key part of that journey. @VishRajagopalan has a great article about how to do this with GitLab.; that article focuses on working with Cloudera AI (f.k.a. Cloudera Machine Learning), but the CI/CD principles apply completely to Cloudera Data Engineering as well. Once you have your CDE CLI downloaded and auth configured, you can use it to get information about the Airflow Job you created earlier using the Pipeline UI: $ ./cde job list [ { "name": "test-dag", "type": "airflow", "created": "2024-11-19T17:44:28Z", "modified": "2024-11-19T20:02:34Z", "retentionPolicy": "keep_indefinitely", "mounts": [ { "resourceName": "PipelineResource-4132-1732038266789" } ], "airflow": { "dagID": "test_dag", "dagFile": "dag-1732038665329.py", }, "schedule": { "enabled": false, "user": "rjendoubi", "start": "Tue, 19 Nov 2024 17:51:05 GMT", "catchup": true } } ] Then, "connect" the Resource to the Job like this, following the instructions in the relevant documentation page: ./cde job update --name test-dag --airflow-file-mount-1-resource my_python_module Note that this command is using both the Job Name and the Resource Name we defined above in steps 3 and 2 respectively. Makes sense, right? 5. Run the Job Finally, to sample the sweet fruits of our labors, you can go ahead and Run your Airflow Job: (Feel free to invoke this from the CDE CLI, for extra credit 😉) Once the Job completes—which should be a matter of seconds—navigate to the relevant Job Run screen, click across to the Logs tab, and look through the log output... Success! We see the special message which was defined in our (extremely basic) code module, which we had uploaded into our Resource, not into the DAG itself: Conclusion Thus, we have demonstrated how we can "tactically" load code from Resources, and use it in Airflow DAGs in Cloudera Data Engineering. This is one way you can factor-out code which you use across multiple DAGs and make it available in a Resource, to reduce duplication. However, once you are done experimenting and need to move into more formalized / production-grade deployments, please use one of the alternative methods mentioned at the outset of this article.
... View more
03-08-2022
03:25 AM
Update for more recent versions of the platform: unfortunately the `/gremlin` endpoint has been removed from modern Atlas APIs, but you can use the special `_NOT_CLASSIFIED` identifier in the search DSL to meet this requirement. So the query would simply be something like: hive_column isA _NOT_CLASSIFIED
... View more
12-24-2021
04:02 AM
Hopefully the OP is sorted out by now, but for anyone else who comes across this: one of the best step-by-step instructions on Atlas relationships I've found is this comment by Vinayak Marriaya on the Atlas Jira; re-producing here, in case that link dies: Following are the json's to create relationships 1. Creating EntityDef json 2. Creating Entity json 3. Creating RelationshipDef json 4. Creating Relationship instance json Step-1 Creating EntityDef POST - http://localhost:21000/api/atlas/v2/types/typedefs {
"entityDefs": [
{
"category": "ENTITY",
"name": "type_a"
},
{
"category": "ENTITY",
"name": "type_b"
}
]
} Step-2 Creating Entity POST - http://localhost:21000/api/atlas/v2/entity/bulk {
"entities": [
{
"typeName": "type_a",
"guid": "-72ddefefS2UY"
},
{
"typeName": "type_b",
"guid": "-JL1HddDOfdf"
}
]
} Step-3 Creating RelationshipDef POST - http://localhost:21000/api/atlas/v2/types/typedefs {
"relationshipDefs": [
{
"propagateTags": "ONE_TO_TWO",
"description": "description ASSOCIATION between type5 and type6",
"relationshipCategory": "ASSOCIATION",
"typeVersion": "1.0",
"attributeDefs": [
{
"name": "LinkInformation",
"typeName": "string",
"isOptional": true,
"cardinality": "SINGLE",
"valuesMinCount": 0,
"valuesMaxCount": 1,
"isUnique": false,
"isIndexable": false,
"includeInNotification": false,
"searchWeight": -1
}
],
"endDef2": {
"name": "type_b_rel_attribute",
"isLegacyAttribute": false,
"isContainer": false,
"cardinality": "SINGLE",
"type": "type_b",
"description": "description with name: type_2_rel_attribute"
},
"endDef1": {
"name": "type_a_rel_attribute",
"isLegacyAttribute": false,
"isContainer": false,
"cardinality": "SET",
"type": "type_a",
"description": "description with name: type_1_rel_attribute"
},
"guid": "-2fsdfjhsewl04",
"name": "association_type_a_and_type_b"
}
]
}
Step-4 Creating Relationship instance POST- http://localhost:21000/api/atlas/v2/relationship {
"typeName": "association_type_a_and_type_b",
"end1": {
"typeName": "type_a",
"guid": "b4dae5e8-a606-4e41-9ce3-8f35245f389e" (guid of type_a entity)
},
"propagateTags": "NONE",
"provenanceType": 0,
"propagatedClassifications": [],
"end2": {
"typeName": "type_b",
"guid": "23c2f3c1-dd74-4190-a6d1-b012c44cbb6d" (guid of type_b entity)
},
"blockedPropagatedClassifications": [],
"guid": "-bjbfdfddfeffef",
"attributes": {
"LinkInformation": "TestingInformation"
}
} Following is the output of the above API call. Output of type_a entity GET - http://localhost:21000/api/atlas/v2/entity/guid/\{guid of type_a entity} {
"referredEntities": {},
"entity": {
"typeName": "type_a",
"guid": "b4dae5e8-a606-4e41-9ce3-8f35245f389e",
"isIncomplete": false,
"status": "ACTIVE",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1632121151626,
"updateTime": 1632121151626,
"version": 0,
"relationshipAttributes": {
"type_a_rel_attribute": [
{
"guid": "23c2f3c1-dd74-4190-a6d1-b012c44cbb6d",
"typeName": "type_b",
"entityStatus": "ACTIVE",
"relationshipType": "association_type_a_and_type_b",
"relationshipGuid": "ec64783a-58d7-4265-87d6-c1535ce2d9b7",
"relationshipStatus": "ACTIVE",
"relationshipAttributes": {
"typeName": "association_type_a_and_type_b",
"attributes": {
"LinkInformation": "TestingInformation"
}
}
}
]
},
"labels": []
}
}
Output of type_b entity GET - http://localhost:21000/api/atlas/v2/entity/guid/\{guid of type_b entity} {
"referredEntities": {},
"entity": {
"typeName": "type_b",
"guid": "23c2f3c1-dd74-4190-a6d1-b012c44cbb6d",
"isIncomplete": false,
"status": "ACTIVE",
"createdBy": "admin",
"updatedBy": "admin",
"createTime": 1632121151626,
"updateTime": 1632121151626,
"version": 0,
"relationshipAttributes": {
"type_b_rel_attribute": {
"guid": "b4dae5e8-a606-4e41-9ce3-8f35245f389e",
"typeName": "type_a",
"entityStatus": "ACTIVE",
"relationshipType": "association_type_a_and_type_b",
"relationshipGuid": "ec64783a-58d7-4265-87d6-c1535ce2d9b7",
"relationshipStatus": "ACTIVE",
"relationshipAttributes": {
"typeName": "association_type_a_and_type_b",
"attributes": {
"LinkInformation": "TestingInformation"
}
}
}
},
"labels": []
}
} As you can see, LinkInformation is specified under "attributes" and when you search for the entity using GUID as you mentioned in jira, you will be able to see the value of "LinkInformation".
... View more