Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

Table of Contents

Use Case

As a healthcare provider / public health official, I want to respond equitably to the COVID-19 pandemic as quickly as possible, and serve all the communities that are adversely impacted in the state of California.
I want to use health equity data reported by California Department of Public Health (CDPH) to identify impacted members and accelerate the launch of outreach programs.

Design

Collect - Ingest data from https://data.chhs.ca.gov/dataset/covid-19-equity-metrics using NiFi.
Enrich - Transform the dataset using Spark and load Hive tables.
Report - Gather insights using Hue Editor.

 

 

Implementation

Prerequisites

Steps to create this data pipeline, are as follows:

Please note that this data pipeline's documentation is in accordance with CDP Runtime Version 7.1.7.

Step #1 - Setup NiFi Flow

  • Go to NiFi user interface and upload NiFi-CDPH.xml as a template.

  • NiFi-CDPH.xml uses PutHDFS processor to connect to an existing HDFS directory. Please change the properties in this processor to use your own HDFS directory.

  • Execute the flow and ensure InvokeHTTP processors are able to get covid19case_rate_by_social_det.csv and covid19demographicratecumulative.csv. Verify that these files are added to your storage directory.

  • For reference, here's a picture of the flow in NiFi user interface -

 

 

Step #2 - Setup PySpark Job

  • SSH in to the cluster and make enrich.py program available in any directory. Please change the fs variable in enrich.py program to point to your HDFS directory.
  • Execute the following command - /bin/spark-submit enrich.py and monitor logs to ensure it's finished successfully. It takes approx. 4 minutes to finish.
  • Following Hive tables are created by this job:
    • cdph.data_dictionary
    • cdph.covid_rate_by_soc_det
    • cdph.covid_demo_rate_cumulative
    • member.member_profile
    • member.target_mbrs_by_income
    • member.target_mbrs_by_age_group

 

Step #3 - Identify impacted members in Hue editor

  • Open Hue editor and explore the Hive tables created by PySpark job.

    -- Raw Data
    select * from cdph.data_dictionary a;
    select * from cdph.covid_rate_by_soc_det a;
    select * from cdph.covid_demo_rate_cumulative a;
    select * from member.member_profile a;
  • Let's analyze this data to identify income-groups that are most impacted by COVID-19

    select 
      a.social_det, 
      a.social_tier,
      a.priority_sort,
      avg(a.case_rate_per_100k) as avg_rate
    from cdph.covid_rate_by_soc_det a
    where a.social_det = 'income'
    group by a.social_det, a.social_tier, a.priority_sort
    order by a.priority_sort;
    a.social_deta.social_tiera.priority_sortavg_rate
    incomeabove $120K0.010.7511364396
    income$100k - $120k1.014.9906347703
    income$80k - $100k2.017.6407007434
    income$60k - $80k3.021.9569391068
    income$40k - $60k4.025.964262668
    incomebelow $40K5.028.9420371609

 

  • Let's do one more exercise and identify age-groups that are most impacted by COVID-19 in last 30 days.
    This query uses metric_value_per_100k_delta_from_30_days_ago column which is the difference between most recent 30-day rate and the previous 30-day rate.

    select 
      a.demographic_set,
      a.demographic_set_category,
      a.metric,
      avg(a.metric_value_per_100k_delta_from_30_days_ago) as avg_rate
    from cdph.covid_demo_rate_cumulative a
    where 
    demographic_set in ('age_gp4')
    and metric in ('cases')
    and a.demographic_set_category is not null
    group by a.demographic_set, a.demographic_set_category, a.metric
    order by avg_rate desc;
    a.demographic_seta.demographic_set_categorya.metricavg_rate
    age_gp418-49cases211.566164197452
    age_gp450-64cases159.875170602457
    age_gp40-17cases112.383263616547
    age_gp465+cases96.0969276083687

 

  • Based on above results, below $40K is the most impacted income-group in terms of COVID-19 cases, and 18-49 is the most impacted age-group in terms of COVID-19 cases in last 30 days. You can now use this information, to filter members that are in these categories. Execute the following queries to get impacted members:
    select * from member.target_mbrs_by_income a where social_tier = 'below $40K'; 
    select * from member.target_mbrs_by_age_group a where demographic_set_category = '18-49';

 

Step #4 - View Hive tables in Atlas

  • Go to Atlas user interface, select any Hive table created in this exercise and see its lineage, schema, audits, etc.

  • Here's a snapshot of covid_rate_by_soc_det table in Atlas.

 
675 Views
0 Kudos