Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

Introduction

Data Scientists are consistently looking to augment their data sources with additional datasets either to incorporate new features, refresh the latest data, or validate existing features. In Part 1 of this two-part article, we have covered how we can ingest data using Rest API endpoints.  In Part 2, we will explore how you can add data stored in an external file system into Cloudera Data Lake.  Specifically, we will ingest data from external S3 buckets of a publicly available dataset into a Cloudera Data Lake.

Data Source 

We will use the the publicly available Air Quality data made available by OpenAQ for this article. To know more about OpenAQ data available for developers , you can visit their website

Pre-requisite 

  • Access to the Cloudera AI platform 
  • OpenAQ API Key ( available for free, on registration at their website www.openaq.org
  • We plan to use a dataset from OpenAQ data source for getting Air Quality data for cities. Review the OpenAQ developer documentation here

Code Repository

The complete code for this article can be found in a Python notebook here.

3rd Party Data Ingestion Approaches

There are two options for data ingestion that we consider in this two-part article:

  • API specification from a REST Catalog : REST APIs are very popular approaches to access API endpoints. You use an API Key, and examples in the REST Catalog to understand how to use the API end point to make an API Call. Here is the OpenAQ REST Catalog that we will use in our code.
  • Access OPEN Data on AWS:  We will use the external Open Data in AWS to access air quality data provided by OpenAQ. To know more details on the data specification in this format, check OpenAQ documentation here.

Sample Data 

Here is a sample of synthesised data for an air quality sensor located in  the city of Chennai, India.

VishRajagopalan_0-1743398669092.png

While a lot of data features are self-explanatory, there are some columns worth highlighting here: 

  • parameter column: captures the specific air quality measure captured ( in the above case pm25)
  • value refers to the specific value of the air quality recorded by the telemetry systems.

Data Ingestion Workflow

The workflow consists of two tasks :

  • Setting up the access for Cloudera AI to access the underlying data store ( in our case the S3 Bucket).
  • Ingesting data from an external AWS file system.

If you have already worked through Part 1 of the article, the setting up access is a repeated step and you can quickly move to the step of ingesting data.

Setting up access

To ensure that Cloudera AI is able to access the S3 bucket, we need to use the IDBROKER host name to set the AWS Access keys and secret token.  To obtain the IDBROKER host name, reach out to your Cloudera Administrator. The code snippet below calls the Cloudera Knox endpoint using the IDBROKER hostname to set up the access keys. 

# Request your Cloudera ADMIN to provide your IDBROKER domain name
IDBROKER_HOST_NAME = "your-idbroker-domain-name.cloudera.site"
IDBROKER_HOST_PORT = "8444"

token_url = f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/dt/knoxtoken/api/v1/token"
url =  f"https://{IDBROKER_HOST_NAME}:{IDBROKER_HOST_PORT}/gateway/aws-cab/cab/api/v1/credentials"
r = requests.get(token_url, auth=HTTPKerberosAuth())
headers = {
    'Authorization': "Bearer "+ r.json()['access_token'],
    'cache-control': "no-cache"
    }
# Set up the AWS Keys
response = requests.request("GET", url, headers=headers)
os.environ["AWS_ACCESS_KEY_ID"]=response.json()['Credentials']['AccessKeyId']
os.environ["AWS_SECRET_ACCESS_KEY"]=response.json()['Credentials']['SecretAccessKey']
os.environ["AWS_DEFAULT_REGION"]='us-east-2'
os.environ["AWS_SESSION_TOKEN"]=response.json()['Credentials']['SessionToken']
api_key = "your-openaq-api-key"

Ingesting data from external S3 Filesystem

We made a few improvements to our data ingestion approach as below:

  • We use an external config file to store the bounding box coordinates of the cities whose air quality we are interested in.
  • We use dates and number of records to get a subset of records that we are interested in
  • We do some data processing enhancements to integrate location information (including location names and city names) in our processed data. 

The logic for getting the location IDs from city names from the config file is given below

 

def get_location_ids(city, bbox):
    """Fetches location IDs for a city using the OpenAQ API."""
    locations_url = "https://api.openaq.org/v3/locations"
    locations_params = {"bbox": bbox, "limit": 1000}
    response = requests.get(locations_url, params=locations_params, headers=headers)
    response.raise_for_status()
    results = response.json().get("results", [])
    return [location["id"] for location in results]

def main():
    END_DATE_STRING = os.getenv("END_DATE",  "31/12/2023 23:59:59 +0530")  #Enter the end date (YYYY-MM-DD):
    NUMBER_OF_DAYS= int(os.getenv("NUMBER_OF_DAYS", 10))
    CITIES_CONFIG_FILE=os.getenv("CITIES_CONFIG_FILE", "cities_config.json")
    end_date = datetime.strptime(str(END_DATE_STRING), "%d/%m/%Y %H:%M:%S %z")
    start_date = end_date - timedelta(days=NUMBER_OF_DAYS)
    city_config = load_city_config(CITIES_CONFIG_FILE)
    for city, bbox in city_config.items():
        print(f"Fetching location metadata for city: {city}")
        location_metadata = get_location_metadata(city, bbox, API_KEY)
...
... # other logic for processing

 

Since the data is stored in the s3 bucket as zip files, we need to extract these files before processing them further. Here is a brief logic that explains how to do this:

 

...
..
# zip file ingest and extraction
if key.endswith(f"{year}{month}{day}.csv.gz"):
    print(f"Processing file: {key}")
    # Download and process the file
    obj = openaq_client.get_object(Bucket=SOURCE_BUCKET_NAME, Key=key)
    with gzip.GzipFile(fileobj=obj['Body']) as gz_file:
        daily_df = pd.read_csv(gz_file)
        consolidated_df = pd.concat([consolidated_df, daily_df], ignore_index=True

 

CONCLUSION

This two-part article explains the mechanics of data ingestion using two approaches to a real-life data source:

  • In Part 1,  we used the REST API to ingest the data from the OpenAQ API specification.
  • In (this) Part 2, we used an external AWS File system. 
235 Views
0 Kudos