Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

Cloudera Data Platform provides two complementing services for data processing. Cloudera Data Engineering (CDE) for preparing and organising data to be consumed by Data Scientists in Cloudera Machine Learning (CML). This article provides a high level example of how to call CDE from CML in Python.

## Part 0: Imports
# Requests will be used to make REST requests to the CDE service
# JSON is used to manage payloads and responses
# REST calls are made over HTTP and use BasicAuth
# cde_endpoint is set to the high level URI to the CDE cluster endpoint

import requests
import json
from requests.auth import HTTPBasicAuth

cde_endpoint='cde-<cluster>.cloudera.site'

The first step is establishing the credentials required to call the API. This is done by calling the Knox proxy and requesting a token. The token is then passed using the "Authorization" header.

## Part 1: Connect to Knox service.
# Retrieve the JWT token and parse it for the 'access_token' part.
# Need to pass workload username and password which is set as a project ENV variable.

JWT = requests.get('https://service.'+cde_endpoint+'/gateway/authtkn/knoxtoken/api/v1/token', auth=HTTPBasicAuth(os.getenv('HADOOP_USER_NAME'), os.getenv('WLPASS')))
JWT_TOKEN = json.loads(JWT.content)['access_token']
auth_header={"Authorization": "Bearer %s" %JWT_TOKEN}

The second step of the process is creating a resource to upload the .py file into.

## Part 2: Create a resource.
# Resources provide a location to store files and dependancies.
# Files are typically staged at /app/mount

data={"name":"my-resource","type":"files"}
resource = requests.post('https://<virtual_prefix>.'+cde_endpoint+'/dex/api/v1/resources', json=data, headers=auth_header)
resource

We can then upload a file from a file stream opened locally in the CML project and passing the file over HTTP/PUT into the named resource.

## Part 3: File upload to the named resource.
# Once the resource is created, it can host several files.
# In this example we are opening a simple python file in the project folder.

files = {'file':open('/home/cdsw/helloworld.py', 'rb')}
upload = requests.put('https://<virtual_prefix>.'+cde_endpoint+'/dex/api/v1/resources/my-resource/helloworld.py', files=files, headers=auth_header)
upload

The job then needs to be defined, this sets the type of job "spark" and links to the relevant resources.

## Part 4: Define the Job and dependancies.
# Create a job definition, define its name, the resource we will use and load the file as part of the spark process.

data={"name":"my-job","type":"spark","mounts":[{"resourceName":"my-resource"}],"spark":{"file":"helloworld.py"}}
job = requests.post('https://<virtual_prefix>.'+cde_endpoint+'/dex/api/v1/jobs', json=data, headers=auth_header)
job

Finally, we can run or schedule the job.

## Part 5: Run the named Job
run = requests.post('https://<virtual_prefix>.'+cde_endpoint+'/dex/api/v1/jobs/my-job/run', headers=auth_header)
run.content

To validate the job status, we can list the jobs running on the given cluster.

## Part 6: List all Jobs
joblist = requests.get('https://<virtual_prefix>.'+cde_endpoint+'/dex/api/v1/jobs', headers=auth_header)
joblist.content
1,505 Views
Comments
avatar
Rising Star

How about submitting a CDE job from CML to a private cloud base cluster?