Created on 12-07-2018 04:27 AM - edited 08-17-2019 05:31 AM
Adding known clarification tags during data Ingestion using Atlas and NiFi.
Most of the times we know some metadata about the data we are ingesting into bigdata platform and this metadata could play an important role in data governance and security as the data moves through downstream pipeline, metadata like terms and conditions, country, consent agreements, validity period of the data etc, these are very useful defining GDRP like requirements and if we don’t add these classification during ingestion we loose the context, as you all know Apache Atlas is used as open metadata governance tool defining these classifications and tags etc, one way is to use NiFi to ingest the data which captures the data lineage in Atlas , but you have to add additional classification tags manually based on the nature of the data at later point of time in the process which is error prone .
If you do not add metadata classifications automatically part of the data ingestion you will loose the context of metadata,.
In this article I will explain how we can automate the tagging of these classifications within NiFi as we are ingesting the data.
I will be using predefined classifications for this article but one can use ML/AI to auto classify based on the business taxonomy and keep the metadata ready for NiFi to use .
I have actual data and metadata in the following directory and goal is to ingest this data using NiFi and immediately add classifications tags once they get ingested. In the following case I am adding “COUNTY” with country_code attribute as “FR” and retention_period after data gets ingested into hdfs.
Overall NiFi Flow :
Make sure to enable Nifi + Atlas Integration through ReportLineageToAtlas Reporting Task.
Step 1 ) Ingest the data using PutHDFS processor :
Steps 2) Wait for the Atlas to generate the lineage , currently I am waiting for 5 min and triggering further process on fetching lineage.
Step 3) After 5 min delay expired, get entity metadata from Atlas using REST API with below url , since we ingested the data into HDFS we will use hdfs_path as type and once we get the response extract the guid using EvaluateJsonPath , we need this guid to add classifications in the next step.
Step 4) Fetch classification metadata file ( this metadata file is located along with actual data in separate directory ) and post the JSON data with classifications to Atlas REST API using following URL
Atlas entity got created while NiFi flows was waiting ( remember we had 5 min delay in the flow , so that Atlas will create these entities, you can change this wait time depending on your environment and latency )
After 5 minutes delay Nifi will fetch the guid and posts the classifications, you can see Atlas entity again with "COUNTY" classifications and attributes with country_code as "FR" with retention_period
Overall NiFi flow :