Created on 03-31-2025 11:09 PM
Data Scientists are consistently looking to augment their data sources with additional datasets either to incorporate new features, refresh the latest data, or validate existing features. In this 2 part article, we will explore two approaches to augment external data in the Cloudera Data Lake. We will explore how REST Catalog and external S3 buckets of a publicly available dataset can be accessed within ClouderaAI to ingest data.
This is Part 1 of a two-part article. The Part 2 covers data ingestion from an external file system (s3).
In this article, we will use the publicly available Air Quality data made available by OpenAQ. Visit their website to learn more about OpenAQ data available for developers.
The complete code for this article can be found in a Python notebook here.
There are two options to data ingestion that we consider in this two-part article:
Here is a sample of synthesized data for an air quality sensor located in the city of Chennai, India.
While a lot of data features are self-explanatory, there are some columns worth highlighting here :
The workflow consists of two tasks :
Let's go through the highlights in each of these steps below:
To ensure that Cloudera AI can access the S3 bucket, we need to use the IDBROKER hostname to set the AWS Access keys and secret token. To obtain the IDBROKER hostname, reach out to your Cloudera Administrator. The code snippet below calls the Cloudera Knox endpoint using the IDBROKER hostname to set up the access keys.
# Request your Cloudera ADMIN to provide your IDBROKER domain name
IDBROKER_HOST_NAME = "your-idbroker-domain-name.cloudera.site"
IDBROKER_HOST_PORT = "8444"
token_url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/dt/knoxtoken/api/v1/token"
url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/aws-cab/cab/api/v1/credentials"
r = requests.get(token_url, auth=HTTPKerberosAuth())
headers = {
'Authorization': "Bearer "+ r.json()['access_token'],
'cache-control': "no-cache"
}
# Set up the AWS Keys
response = requests.request("GET", url, headers=headers)
os.environ["AWS_ACCESS_KEY_ID"]=response.json()['Credentials']['AccessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"]=response.json()['Credentials']['SecretAccessKey']
os.environ["AWS_DEFAULT_REGION"]='us-east-2'
os.environ["AWS_SESSION_TOKEN"]=response.json()['Credentials']['SessionToken']
api_key = "your-openaq-api-key"
We set up the API requests by first determining the locations in each city, where we are interested in measuring the air quality. Each city is added with a bounding box ( as shown below). This is one of the formats that OpenAQ endpoints use.
{
"Delhi": "76.84,28.40,77.34,28.88",
"Mumbai": "72.70,18.90,72.98,19.25",
"Bangalore": "77.45,12.85,77.85,13.15",
"Chennai": "80.15,12.90,80.35,13.15",
"Kolkata": "88.20,22.45,88.45,22.70",
"Hyderabad": "78.30,17.30,78.60,17.55",
"Pune": "73.80,18.50,73.95,18.65",
"Ahmedabad": "72.50,22.95,72.65,23.10"
}
We then use the requests Python package to first obtain the locations and use a boto3 client to save the data. Below are some code snippets.
.. # prior code logic, see github for actual code here
..
# Initialize sessions
authenticated_session = boto3.Session() # For saving the data in s3 bucket bucket
target_s3_client = authenticated_session.client('s3', region_name=REGION_NAME)
s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
...
...
locations_url = "https://api.openaq.org/v3/locations"
locations_params = {"bbox": bbox, "limit": 1000}
locations_response = requests.get(locations_url, params=locations_params, headers=headers)
# handles the API call rate of 50 API Calls/ minute throttling by sleeping between calls
if handle_rate_limit(locations_response):
locations_response = requests.get(locations_url, params=locations_params, headers=headers)
...
...
The code above captures all the locations for each city of interest. Subsequently, we need to get the telemetry data for each location for the date range we are interested in. This is done as follows by looping within each location ID and each sensor_id for that location.
for location in locations_data:
if fetched_sensor_count >= NUM_SENSORS_TO_FETCH:
break
sensors = location.get("sensors", [])
for sensor in sensors:
if fetched_sensor_count >= NUM_SENSORS_TO_FETCH:
break
sensor_id = sensor.get("id")
print("SENSORID : ", sensor_id)
parameter_id = sensor.get("parameter", {}).get("id")
parameter_name = sensor.get("parameter", {}).get("name")
latest_measurement_url = f"https://api.openaq.org/v3/sensors/{sensor_id}/hours/daily" #modified API call
latest_measurement_params = {"limit": 1, "sort": "desc"}
latest_measurement_response = requests.get(latest_measurement_url, params=latest_measurement_params, headers=headers)
Finally, all the data is saved in the s3 bucket using the boto client handle we obtained earlier. We create a separate data file for each city of interest, as long as we are able to find the data for that city.
# Process and save data to S3
for city, sensor_data in all_city_data.items():
all_measurements = []
for sensor_id, measurements in sensor_data.items():
if measurements:
all_measurements.extend(measurements) #add all measurements to the list
if all_measurements:
df = pd.DataFrame(all_measurements)
csv_data = df.to_csv(index=False)
file_key = f"user/vishr/{city}_all_measurements_{NUMBER_OF_DAYS}days_daily_data.csv"
target_s3_client.put_object(Bucket=S3_BUCKET_NAME, Key=file_key, Body=csv_data)
logging.info(f"Consolidated data saved to S3 at: s3://{S3_BUCKET_NAME}/{file_key}")
else:
logging.warning(f"No data to save for {city}.")
This concludes the first approach of moving data using an external API to a Cloudera Data Lake. In the next article here we will deep dive into using an external file system such as AWS to ingest the data into the Cloudera Data Lake.