Created on 03-31-2025 11:16 PM
Data Scientists are consistently looking to augment their data sources with additional datasets either to incorporate new features, refresh the latest data, or validate existing features. In Part 1 of this two-part article, we have covered how we can ingest data using Rest API endpoints. In Part 2, we will explore how you can add data stored in an external file system into Cloudera Data Lake. Specifically, we will ingest data from external S3 buckets of a publicly available dataset into a Cloudera Data Lake.
We will use the the publicly available Air Quality data made available by OpenAQ for this article. To know more about OpenAQ data available for developers , you can visit their website.
The complete code for this article can be found in a Python notebook here.
There are two options for data ingestion that we consider in this two-part article:
Here is a sample of synthesised data for an air quality sensor located in the city of Chennai, India.
While a lot of data features are self-explanatory, there are some columns worth highlighting here:
The workflow consists of two tasks :
If you have already worked through Part 1 of the article, the setting up access is a repeated step and you can quickly move to the step of ingesting data.
To ensure that Cloudera AI is able to access the S3 bucket, we need to use the IDBROKER host name to set the AWS Access keys and secret token. To obtain the IDBROKER host name, reach out to your Cloudera Administrator. The code snippet below calls the Cloudera Knox endpoint using the IDBROKER hostname to set up the access keys.
# Request your Cloudera ADMIN to provide your IDBROKER domain name IDBROKER_HOST_NAME = "your-idbroker-domain-name.cloudera.site" IDBROKER_HOST_PORT = "8444" token_url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/dt/knoxtoken/api/v1/token" url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/aws-cab/cab/api/v1/credentials" r = requests.get(token_url, auth=HTTPKerberosAuth()) headers = { 'Authorization': "Bearer "+ r.json()['access_token'], 'cache-control': "no-cache" } # Set up the AWS Keys response = requests.request("GET", url, headers=headers) os.environ["AWS_ACCESS_KEY_ID"]=response.json()['Credentials']['AccessKeyId'] os.environ["AWS_SECRET_ACCESS_KEY"]=response.json()['Credentials']['SecretAccessKey'] os.environ["AWS_DEFAULT_REGION"]='us-east-2' os.environ["AWS_SESSION_TOKEN"]=response.json()['Credentials']['SessionToken'] api_key = "your-openaq-api-key"
We made a few improvements to our data ingestion approach as below:
The logic for getting the location IDs from city names from the config file is given below
def get_location_ids(city, bbox):
"""Fetches location IDs for a city using the OpenAQ API."""
locations_url = "https://api.openaq.org/v3/locations"
locations_params = {"bbox": bbox, "limit": 1000}
response = requests.get(locations_url, params=locations_params, headers=headers)
response.raise_for_status()
results = response.json().get("results", [])
return [location["id"] for location in results]
def main():
END_DATE_STRING = os.getenv("END_DATE", "31/12/2023 23:59:59 +0530") #Enter the end date (YYYY-MM-DD):
NUMBER_OF_DAYS= int(os.getenv("NUMBER_OF_DAYS", 10))
CITIES_CONFIG_FILE=os.getenv("CITIES_CONFIG_FILE", "cities_config.json")
end_date = datetime.strptime(str(END_DATE_STRING), "%d/%m/%Y %H:%M:%S %z")
start_date = end_date - timedelta(days=NUMBER_OF_DAYS)
city_config = load_city_config(CITIES_CONFIG_FILE)
for city, bbox in city_config.items():
print(f"Fetching location metadata for city: {city}")
location_metadata = get_location_metadata(city, bbox, API_KEY)
...
... # other logic for processing
Since the data is stored in the s3 bucket as zip files, we need to extract these files before processing them further. Here is a brief logic that explains how to do this:
...
..
# zip file ingest and extraction
if key.endswith(f"{year}{month}{day}.csv.gz"):
print(f"Processing file: {key}")
# Download and process the file
obj = openaq_client.get_object(Bucket=SOURCE_BUCKET_NAME, Key=key)
with gzip.GzipFile(fileobj=obj['Body']) as gz_file:
daily_df = pd.read_csv(gz_file)
consolidated_df = pd.concat([consolidated_df, daily_df], ignore_index=True
This two-part article explains the mechanics of data ingestion using two approaches to a real-life data source: