Created on 04-16-2018 11:16 PM - edited 08-17-2019 07:42 AM
This article objective is to cover the end to end lineage capture with in bigdata platform
A visual representation of data lineage helps to track data from its origin to its destination. It explains the different processes involved in the data flow and their dependencies. Metadata management is the key input to capturing enterprise dataflow and presenting data lineage from end to end and especially in banking industry due to CCAR requirements and regulations.
When it comes to Hortonworks platforms ( HDP Hortonworks Data Platform , Hortonworks Data Flow) there are several components involved beginning from data ingestion, cleansing/transformation all the way through data analytics using Kaka, Nifi, Spark, Hive, Hive LLAP etc in solving enterprise bigdata problems.
Apache Atlas provides a scalable and extensible set of core foundational governance services including lineage. Capturing lineage from end to end is complex problem to solve for any regulated industry and requires integration with several tools and we have seen Apache Spark been used as ELT engine for many data lake implementations either using external tools ( like Snaplogic, Trifacta etc..) or custom spark logic, but Spark lineage is not supported currently out of the box from Apache Atlas and rest of the above mentioned tools are already supported supports Nifi + Atlas integration is added part of HDP 3.1
Capturing lineage with Atlas from Nifi, Spark and Hive by solving the gap mentioned above using Spline. Spline captures and stores lineage information from internal Spark execution plans in a lightweight, unobtrusive (even if there is an issue in lineage generation , spark job will not fail ) and easy to use manner.
Here is the end to end data flow.
Nifi (Ingest two files wikidata.csv, domain.csv into HDFS) --> HDFS --> Spark (Join the wikidata.csv and domain.csv and store result as ORC file ) --> HDFS --> Hive Table ( Create Hive external table on ORC data )
Enable Nifi lineage with the help of HDF 3.1 as explained here, you will get this lineage.
Out of the box Nifi lineage properties especially qualifedName ( qualifiedName will be filepath@culstername, you can see in the above screen) is not recognized when Spline generates the lineage as input, so modified za.co.absa.spline.persistence.atlas.conversion.DatasetConverter to recognize Nifi qualifiedName for input with the help of two new properties “cluster.name” and “absolute.base.path”.
val path = paths.map(_.path) mkString ", " val clustername = System.getProperty("cluster.name") val absolutebasepath = System.getProperty("absolute.base.path") val upath = path.replace(absolutebasepath,"") new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(upath, upath+"@"+ clustername), EndpointType.file, EndpointDirection.input, st)
spark-shell --master yarn --driver-java-options='-Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory' --files /usr/hdp/184.108.40.206-91/kafka/conf/producer.properties --conf 'spark.driver.extraJavaOptions=-Datlas.kafka.bootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dbootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory -Datlas.kafka.auto.commit.enable=false -Datlas.kafka.hook.group.id=atlas -Datlas.kafka.zookeeper.connect=hdp264-0.field.hortonworks.com:2181 -Datlas.kafka.zookeeper.connection.timeout.ms=30000 -Datlas.kafka.zookeeper.session.timeout.ms=60000 -Datlas.kafka.zookeeper.sync.time.ms=20 -Dcluster.name=hdp264 -Dabsolute.base.path=hdfs://hdp264-0.field.hortonworks.com:8020' import za.co.absa.spline.core.SparkLineageInitializer._ spark.enableLineageTracking() import org.apache.spark.sql.SaveMode val sourceDS = spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/wikidata.csv").as("source").filter($"total_response_size" > 1000).filter($"count_views" > 10) val domainMappingDS =spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/domain.csv").as("mapping") val joinedDS = sourceDS.join(domainMappingDS, $"domain_code" ===$"d_code","left_outer").select($"page_title".as("page"),$"d_name".as("domain"), $"count_views") joinedDS.write.mode(SaveMode.Overwrite).format("orc").save("/user/nifi/sparkoutput")
create external table nifi_spark_hive_lineage_end_to_end ( page String, domin String, count_views int) stored as orc location '/user/nifi/sparkoutput';