Created on 12-07-201804:27 AM - edited 08-17-201905:31 AM
Adding known clarification tags during data Ingestion using Atlas and NiFi.
Most of the times we know some metadata about
the data we are ingesting into bigdata platform and this metadata could play an
important role in data governance and security as the data moves through downstream pipeline,
metadata like terms and conditions, country, consent agreements, validity period
of the data etc, these are
very useful defining GDRP like requirements and if we don’t add these classification
during ingestion we loose the context, as you all know Apache Atlas is used as
open metadata governance tool defining these classifications and tags etc, one way is to use NiFi to ingest the data
which captures the data lineage in Atlas , but you have to add additional classification tags manually based on the nature
of the data at later point of time in the process which is error prone .
If you do not add metadata classifications automatically part of the data ingestion you will loose the context of metadata,.
In this article I will explain how we can automate the tagging of
these classifications within NiFi as we are ingesting the data.
I will be using predefined classifications
for this article but one can use ML/AI to
auto classify based on the business taxonomy and keep the metadata ready for NiFi to use .
I have actual data and metadata
in the following directory and goal is to ingest this data using NiFi and immediately
add classifications tags once they get ingested. In the following case I am
adding “COUNTY” with country_code attribute as “FR” and retention_period after data gets ingested into hdfs.
Overall NiFi Flow :
Make sure to enable Nifi + Atlas
Integration through ReportLineageToAtlas Reporting Task.
Step 1 ) Ingest the data using
PutHDFS processor :
Steps 2) Wait for the Atlas to
generate the lineage , currently I am waiting for 5 min and triggering further
process on fetching lineage.
Step 3) After 5 min delay expired,
get entity metadata from Atlas using REST API with below url , since we ingested the data
into HDFS we will use hdfs_path as type and once we get the response extract the guid using EvaluateJsonPath
, we need this guid to add classifications in the next step.
Step 4) Fetch classification metadata file ( this
metadata file is located along with actual data in separate directory ) and post
the JSON data with classifications to Atlas REST API using following URL
Atlas entity got created while NiFi flows was waiting ( remember we had
5 min delay in the flow , so that Atlas will create these entities, you can
change this wait time depending on your environment and latency )
After 5 minutes delay Nifi will
fetch the guid and posts the classifications, you can see Atlas entity again
with "COUNTY" classifications and attributes with country_code as "FR" with retention_period