As a healthcare provider / public health official, I want to respond equitably to the COVID-19 pandemic as quickly as possible, and serve all the communities that are adversely impacted in the state of California. I want to use health equity data reported by California Department of Public Health (CDPH) toidentify impacted membersand accelerate the launch of outreach programs.
Steps to create this data pipeline, are as follows:
Please note that this data pipeline's documentation is in accordance with CDP Runtime Version 7.1.7.
Step #1 - Setup NiFi Flow
Go to NiFi user interface and upload NiFi-CDPH.xml as a template.
NiFi-CDPH.xml uses PutHDFS processor to connect to an existing HDFS directory.Please change the properties in this processor to use your own HDFS directory.
For reference, here's a picture of the flow in NiFi user interface -
Step #2 - Setup PySpark Job
SSH in to the cluster and make enrich.py program available in any directory.Please change thefsvariable in enrich.py program to point to your HDFS directory.
Execute the following command -/bin/spark-submit enrich.pyand monitor logs to ensure it's finished successfully. It takes approx. 4 minutes to finish.
Following Hive tables are created by this job:
cdph.data_dictionary
cdph.covid_rate_by_soc_det
cdph.covid_demo_rate_cumulative
member.member_profile
member.target_mbrs_by_income
member.target_mbrs_by_age_group
Step #3 - Identify impacted members in Hue editor
Open Hue editor and explore the Hive tables created by PySpark job.
-- Raw Dataselect*fromcdph.data_dictionary a;
select*fromcdph.covid_rate_by_soc_det a;
select*fromcdph.covid_demo_rate_cumulative a;
select*frommember.member_profile a;
Let's analyze this data to identify income-groups that are most impacted by COVID-19
selecta.social_det,
a.social_tier,
a.priority_sort,
avg(a.case_rate_per_100k) as avg_rate
fromcdph.covid_rate_by_soc_det a
wherea.social_det='income'group bya.social_det, a.social_tier, a.priority_sortorder bya.priority_sort;
a.social_det
a.social_tier
a.priority_sort
avg_rate
income
above $120K
0.0
10.7511364396
income
$100k - $120k
1.0
14.9906347703
income
$80k - $100k
2.0
17.6407007434
income
$60k - $80k
3.0
21.9569391068
income
$40k - $60k
4.0
25.964262668
income
below $40K
5.0
28.9420371609
Let's do one more exercise and identify age-groups that are most impacted by COVID-19 in last 30 days. This query uses metric_value_per_100k_delta_from_30_days_ago column which is the difference between most recent 30-day rate and the previous 30-day rate.
selecta.demographic_set,
a.demographic_set_category,
a.metric,
avg(a.metric_value_per_100k_delta_from_30_days_ago) as avg_rate
fromcdph.covid_demo_rate_cumulative a
where
demographic_set in ('age_gp4')
and metric in ('cases')
anda.demographic_set_categoryis not nullgroup bya.demographic_set, a.demographic_set_category, a.metricorder by avg_rate desc;
a.demographic_set
a.demographic_set_category
a.metric
avg_rate
age_gp4
18-49
cases
211.566164197452
age_gp4
50-64
cases
159.875170602457
age_gp4
0-17
cases
112.383263616547
age_gp4
65+
cases
96.0969276083687
Based on above results,below $40Kis the most impacted income-group in terms of COVID-19 cases, and18-49is the most impacted age-group in terms of COVID-19 cases in last 30 days. You can now use this information, to filter members that are in these categories. Execute the following queries to get impacted members:
select*frommember.target_mbrs_by_income a where social_tier ='below $40K'; select*frommember.target_mbrs_by_age_group a where demographic_set_category ='18-49';
Step #4 - View Hive tables in Atlas
Go to Atlas user interface, select any Hive table created in this exercise and see its lineage, schema, audits, etc.
Here's a snapshot of covid_rate_by_soc_det table in Atlas.