Created on 09-30-2022 04:16 PM - edited 11-01-2022 10:53 PM
As a healthcare provider / public health official, I want to respond equitably to the COVID-19 pandemic as quickly as possible, and serve all the communities that are adversely impacted in the state of California.
I want to use health equity data reported by California Department of Public Health (CDPH) to identify impacted members and accelerate the launch of outreach programs.
Collect - Ingest data from https://data.chhs.ca.gov/dataset/covid-19-equity-metrics using NiFi.
Enrich - Transform the dataset using Spark and load Hive tables.
Report - Gather insights using Hue Editor.
Steps to create this data pipeline, are as follows:
Please note that this data pipeline's documentation is in accordance with CDP Runtime Version 7.1.7.
Go to NiFi user interface and upload NiFi-CDPH.xml as a template.
NiFi-CDPH.xml uses PutHDFS processor to connect to an existing HDFS directory. Please change the properties in this processor to use your own HDFS directory.
Execute the flow and ensure InvokeHTTP processors are able to get covid19case_rate_by_social_det.csv and covid19demographicratecumulative.csv. Verify that these files are added to your storage directory.
For reference, here's a picture of the flow in NiFi user interface -
Open Hue editor and explore the Hive tables created by PySpark job.
-- Raw Data
select * from cdph.data_dictionary a;
select * from cdph.covid_rate_by_soc_det a;
select * from cdph.covid_demo_rate_cumulative a;
select * from member.member_profile a;
Let's analyze this data to identify income-groups that are most impacted by COVID-19
select
a.social_det,
a.social_tier,
a.priority_sort,
avg(a.case_rate_per_100k) as avg_rate
from cdph.covid_rate_by_soc_det a
where a.social_det = 'income'
group by a.social_det, a.social_tier, a.priority_sort
order by a.priority_sort;
a.social_det | a.social_tier | a.priority_sort | avg_rate |
income | above $120K | 0.0 | 10.7511364396 |
income | $100k - $120k | 1.0 | 14.9906347703 |
income | $80k - $100k | 2.0 | 17.6407007434 |
income | $60k - $80k | 3.0 | 21.9569391068 |
income | $40k - $60k | 4.0 | 25.964262668 |
income | below $40K | 5.0 | 28.9420371609 |
Let's do one more exercise and identify age-groups that are most impacted by COVID-19 in last 30 days.
This query uses metric_value_per_100k_delta_from_30_days_ago column which is the difference between most recent 30-day rate and the previous 30-day rate.
select
a.demographic_set,
a.demographic_set_category,
a.metric,
avg(a.metric_value_per_100k_delta_from_30_days_ago) as avg_rate
from cdph.covid_demo_rate_cumulative a
where
demographic_set in ('age_gp4')
and metric in ('cases')
and a.demographic_set_category is not null
group by a.demographic_set, a.demographic_set_category, a.metric
order by avg_rate desc;
a.demographic_set | a.demographic_set_category | a.metric | avg_rate |
age_gp4 | 18-49 | cases | 211.566164197452 |
age_gp4 | 50-64 | cases | 159.875170602457 |
age_gp4 | 0-17 | cases | 112.383263616547 |
age_gp4 | 65+ | cases | 96.0969276083687 |
select * from member.target_mbrs_by_income a where social_tier = 'below $40K';
select * from member.target_mbrs_by_age_group a where demographic_set_category = '18-49';
Go to Atlas user interface, select any Hive table created in this exercise and see its lineage, schema, audits, etc.
Here's a snapshot of covid_rate_by_soc_det table in Atlas.