Member since
05-29-2019
9
Posts
19
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4491 | 02-13-2018 01:01 AM |
04-15-2020
08:50 AM
Thank you so much for this article. Do you have a list of the jars used to convert this java file into a new jar?
... View more
12-07-2018
04:27 AM
3 Kudos
Abstract Adding known clarification tags during data Ingestion using Atlas and NiFi. Introduction Most of the times we know some metadata about
the data we are ingesting into bigdata platform and this metadata could play an
important role in data governance and security as the data moves through downstream pipeline,
metadata like terms and conditions, country, consent agreements, validity period
of the data etc, these are
very useful defining GDRP like requirements and if we don’t add these classification
during ingestion we loose the context, as you all know Apache Atlas is used as
open metadata governance tool defining these classifications and tags etc, one way is to use NiFi to ingest the data
which captures the data lineage in Atlas , but you have to add additional classification tags manually based on the nature
of the data at later point of time in the process which is error prone . Problem If you do not add metadata classifications automatically part of the data ingestion you will loose the context of metadata,. Solution In this article I will explain how we can automate the tagging of
these classifications within NiFi as we are ingesting the data. I will be using predefined classifications
for this article but one can use ML/AI to
auto classify based on the business taxonomy and keep the metadata ready for NiFi to use . I have actual data and metadata
in the following directory and goal is to ingest this data using NiFi and immediately
add classifications tags once they get ingested. In the following case I am
adding “COUNTY” with country_code attribute as “FR” and retention_period after data gets ingested into hdfs. Overall NiFi Flow : Make sure to enable Nifi + Atlas
Integration through ReportLineageToAtlas Reporting Task. Step 1 ) Ingest the data using
PutHDFS processor : Steps 2) Wait for the Atlas to
generate the lineage , currently I am waiting for 5 min and triggering further
process on fetching lineage. UpdateAttribute configurations Step 3) After 5 min delay expired,
get entity metadata from Atlas using REST API with below url , since we ingested the data
into HDFS we will use hdfs_path as type and once we get the response extract the guid using EvaluateJsonPath
, we need this guid to add classifications in the next step. http://smunigati-hdp3-1.field.hortonworks.com:21000/api/atlas/v2/search/dsl?typeName=hdfs_path&query=${whereclause:urlEncode()} Step 4) Fetch classification metadata file ( this
metadata file is located along with actual data in separate directory ) and post
the JSON data with classifications to Atlas REST API using following URL http://smunigati-hdp3-1.field.hortonworks.com:21000/api/atlas/v2/entity/bulk/classification Atlas entity got created while NiFi flows was waiting ( remember we had
5 min delay in the flow , so that Atlas will create these entities, you can
change this wait time depending on your environment and latency ) After 5 minutes delay Nifi will
fetch the guid and posts the classifications, you can see Atlas entity again
with "COUNTY" classifications and attributes with country_code as "FR" with retention_period Overall NiFi flow :
... View more
Labels:
04-16-2018
11:16 PM
13 Kudos
Abstract This article objective is to cover the end to end lineage capture with in bigdata platform Introduction A
visual representation of
data lineage helps to track data from its origin to its
destination. It explains the different processes involved in the data flow and their dependencies.
Metadata management is the key input to capturing enterprise dataflow and presenting
data
lineage from end to end
and especially in banking industry due to CCAR
requirements and regulations.
When
it comes to Hortonworks platforms ( HDP Hortonworks Data Platform , Hortonworks Data Flow) there are several components involved
beginning from data ingestion, cleansing/transformation all the way through
data analytics using Kaka, Nifi, Spark, Hive, Hive LLAP etc in solving
enterprise bigdata problems. Problem Statement
Apache
Atlas provides a scalable and extensible set of core foundational governance
services including lineage. Capturing lineage from end to end is complex
problem to solve for any regulated industry and requires integration with
several tools and we have seen Apache Spark been used as ELT engine for many data lake implementations either using external tools ( like Snaplogic,
Trifacta etc..) or custom spark logic,
but Spark lineage is not supported
currently out of the box from Apache Atlas and rest of the above mentioned tools are
already supported supports Nifi + Atlas integration is added part of HDP 3.1 Solution Capturing lineage with Atlas from Nifi, Spark and Hive by solving
the gap mentioned above using
Spline. Spline captures and stores
lineage information from internal Spark execution plans in a
lightweight, unobtrusive (even if there is an issue in lineage generation , spark job will not fail ) and easy to use manner.
Here
is the end to end data flow.
Nifi
(Ingest two files wikidata.csv, domain.csv into HDFS) --> HDFS --> Spark (Join the wikidata.csv and domain.csv
and store result as ORC file ) --> HDFS --> Hive Table ( Create Hive external table on ORC
data )
Step 1: Nifi -->HDFS Enable Nifi lineage with the help of HDF 3.1 as explained here, you will get this lineage.
Step 2)
Spark --> HDFS
Out of the box Nifi lineage properties
especially qualifedName ( qualifiedName will be filepath@culstername, you can see in the above screen) is not recognized when Spline generates the
lineage as input, so modified za.co.absa.spline.persistence.atlas.conversion.DatasetConverter
to recognize Nifi
qualifiedName for input with the help of two new properties “cluster.name” and
“absolute.base.path”.
val path = paths.map(_.path) mkString ", "
val clustername = System.getProperty("cluster.name")
val absolutebasepath = System.getProperty("absolute.base.path")
val upath = path.replace(absolutebasepath,"")
new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(upath, upath+"@"+ clustername), EndpointType.file, EndpointDirection.input, st)
spark-shell --master yarn --driver-java-options='-Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory' --files /usr/hdp/2.6.4.0-91/kafka/conf/producer.properties --conf 'spark.driver.extraJavaOptions=-Datlas.kafka.bootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dbootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory -Datlas.kafka.auto.commit.enable=false -Datlas.kafka.hook.group.id=atlas -Datlas.kafka.zookeeper.connect=hdp264-0.field.hortonworks.com:2181 -Datlas.kafka.zookeeper.connection.timeout.ms=30000 -Datlas.kafka.zookeeper.session.timeout.ms=60000 -Datlas.kafka.zookeeper.sync.time.ms=20 -Dcluster.name=hdp264 -Dabsolute.base.path=hdfs://hdp264-0.field.hortonworks.com:8020'
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()
import org.apache.spark.sql.SaveMode
val sourceDS = spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/wikidata.csv").as("source").filter($"total_response_size" > 1000).filter($"count_views" > 10)
val domainMappingDS =spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/domain.csv").as("mapping")
val joinedDS = sourceDS.join(domainMappingDS, $"domain_code" ===$"d_code","left_outer").select($"page_title".as("page"),$"d_name".as("domain"), $"count_views")
joinedDS.write.mode(SaveMode.Overwrite).format("orc").save("/user/nifi/sparkoutput")
Step 3) Hive
External table on ORC data
0: jdbc:hive2://hdp264-0.field.hort> create external table nifi_spark_hive_lineage_end_to_end ( page String, domin
String, count_views int) stored as orc location '/user/nifi/sparkoutput'; References https://absaoss.github.io/spline/
... View more
Labels:
07-19-2016
07:00 PM
It is in the 0.7.0 release, part of the stanadard bundle: https://github.com/apache/nifi/blob/0.x/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
... View more