- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on 03-31-2025 11:09 PM
Introduction
Data Scientists are consistently looking to augment their data sources with additional datasets either to incorporate new features, refresh the latest data, or validate existing features. In this 2 part article, we will explore two approaches to augment external data in the Cloudera Data Lake. We will explore how REST Catalog and external S3 buckets of a publicly available dataset can be accessed within ClouderaAI to ingest data.
This is Part 1 of a two-part article. The Part 2 covers data ingestion from an external file system (s3).
Data Source
In this article, we will use the publicly available Air Quality data made available by OpenAQ. Visit their website to learn more about OpenAQ data available for developers.
Pre-requisite
- Access to the Cloudera AI platform
- OpenAQ API Key ( available for free, on registration at their website www.openaq.org)
- We plan to use a dataset from OpenAQ data source for getting Air Quality data for cities. Review the OpenAQ developer documentation here
Code Repository
The complete code for this article can be found in a Python notebook here.
3rd Party Data Ingestion Approaches
There are two options to data ingestion that we consider in this two-part article:
- API specification from a REST Catalog: REST APIs are very popular approaches to access API endpoints. You use an API Key, and examples in the REST Catalog to understand how to use the API end point to make an API Call. Here is the OpenAQ REST Catalog that we will use in our code.
- Access OPEN Data on AWS: We will use the external Open Data in AWS to access air quality data provided by OpenAQ. To know more details on the data specification in this format, check OpenAQ documentation here.
Sample Data
Here is a sample of synthesized data for an air quality sensor located in the city of Chennai, India.
While a lot of data features are self-explanatory, there are some columns worth highlighting here :
- parameter column: captures the specific air quality measure captured ( in the above case pm25)
- value refers to the specific value of the air quality recorded by the telemetry systems.
Data Ingestion Workflow
The workflow consists of two tasks :
- Setting up the access for Cloudera AI to access the underlying data store (in our case the S3 Bucket)
- Making API requests to the endpoints for accessing the data.
Let's go through the highlights in each of these steps below:
Setting up access
To ensure that Cloudera AI can access the S3 bucket, we need to use the IDBROKER hostname to set the AWS Access keys and secret token. To obtain the IDBROKER hostname, reach out to your Cloudera Administrator. The code snippet below calls the Cloudera Knox endpoint using the IDBROKER hostname to set up the access keys.
# Request your Cloudera ADMIN to provide your IDBROKER domain name
IDBROKER_HOST_NAME = "your-idbroker-domain-name.cloudera.site"
IDBROKER_HOST_PORT = "8444"
token_url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/dt/knoxtoken/api/v1/token"
url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/aws-cab/cab/api/v1/credentials"
r = requests.get(token_url, auth=HTTPKerberosAuth())
headers = {
'Authorization': "Bearer "+ r.json()['access_token'],
'cache-control': "no-cache"
}
# Set up the AWS Keys
response = requests.request("GET", url, headers=headers)
os.environ["AWS_ACCESS_KEY_ID"]=response.json()['Credentials']['AccessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"]=response.json()['Credentials']['SecretAccessKey']
os.environ["AWS_DEFAULT_REGION"]='us-east-2'
os.environ["AWS_SESSION_TOKEN"]=response.json()['Credentials']['SessionToken']
api_key = "your-openaq-api-key"
Making API requests
We set up the API requests by first determining the locations in each city, where we are interested in measuring the air quality. Each city is added with a bounding box ( as shown below). This is one of the formats that OpenAQ endpoints use.
{
"Delhi": "76.84,28.40,77.34,28.88",
"Mumbai": "72.70,18.90,72.98,19.25",
"Bangalore": "77.45,12.85,77.85,13.15",
"Chennai": "80.15,12.90,80.35,13.15",
"Kolkata": "88.20,22.45,88.45,22.70",
"Hyderabad": "78.30,17.30,78.60,17.55",
"Pune": "73.80,18.50,73.95,18.65",
"Ahmedabad": "72.50,22.95,72.65,23.10"
}
We then use the requests Python package to first obtain the locations and use a boto3 client to save the data. Below are some code snippets.
.. # prior code logic, see github for actual code here
..
# Initialize sessions
authenticated_session = boto3.Session() # For saving the data in s3 bucket bucket
target_s3_client = authenticated_session.client('s3', region_name=REGION_NAME)
s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
...
...
locations_url = "https://api.openaq.org/v3/locations"
locations_params = {"bbox": bbox, "limit": 1000}
locations_response = requests.get(locations_url, params=locations_params, headers=headers)
# handles the API call rate of 50 API Calls/ minute throttling by sleeping between calls
if handle_rate_limit(locations_response):
locations_response = requests.get(locations_url, params=locations_params, headers=headers)
...
...
The code above captures all the locations for each city of interest. Subsequently, we need to get the telemetry data for each location for the date range we are interested in. This is done as follows by looping within each location ID and each sensor_id for that location.
for location in locations_data:
if fetched_sensor_count >= NUM_SENSORS_TO_FETCH:
break
sensors = location.get("sensors", [])
for sensor in sensors:
if fetched_sensor_count >= NUM_SENSORS_TO_FETCH:
break
sensor_id = sensor.get("id")
print("SENSORID : ", sensor_id)
parameter_id = sensor.get("parameter", {}).get("id")
parameter_name = sensor.get("parameter", {}).get("name")
latest_measurement_url = f"https://api.openaq.org/v3/sensors/{sensor_id}/hours/daily" #modified API call
latest_measurement_params = {"limit": 1, "sort": "desc"}
latest_measurement_response = requests.get(latest_measurement_url, params=latest_measurement_params, headers=headers)
Finally, all the data is saved in the s3 bucket using the boto client handle we obtained earlier. We create a separate data file for each city of interest, as long as we are able to find the data for that city.
# Process and save data to S3
for city, sensor_data in all_city_data.items():
all_measurements = []
for sensor_id, measurements in sensor_data.items():
if measurements:
all_measurements.extend(measurements) #add all measurements to the list
if all_measurements:
df = pd.DataFrame(all_measurements)
csv_data = df.to_csv(index=False)
file_key = f"user/vishr/{city}_all_measurements_{NUMBER_OF_DAYS}days_daily_data.csv"
target_s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=file_key, Body=csv_data)
logging.info(f"Consolidated data saved to S3 at: s3://{S3_BUCKET_NAME}/{file_key}")
else:
logging.warning(f"No data to save for {city}.")
This concludes the first approach of moving data using an external API to a Cloudera Data Lake. In the next article here we will deep dive into using an external file system such as AWS to ingest the data into the Cloudera Data Lake.