Community Articles

Find and share helpful community-sourced technical articles.
avatar
Cloudera Employee

Abstract

This article objective is to cover the end to end lineage capture with in bigdata platform

Introduction

A visual representation of data lineage helps to track data from its origin to its destination. It explains the different processes involved in the data flow and their dependencies. Metadata management is the key input to capturing enterprise dataflow and presenting data lineage from end to end and especially in banking industry due to CCAR requirements and regulations.

When it comes to Hortonworks platforms ( HDP Hortonworks Data Platform , Hortonworks Data Flow) there are several components involved beginning from data ingestion, cleansing/transformation all the way through data analytics using Kaka, Nifi, Spark, Hive, Hive LLAP etc in solving enterprise bigdata problems.

Problem Statement

Apache Atlas provides a scalable and extensible set of core foundational governance services including lineage. Capturing lineage from end to end is complex problem to solve for any regulated industry and requires integration with several tools and we have seen Apache Spark been used as ELT engine for many data lake implementations either using external tools ( like Snaplogic, Trifacta etc..) or custom spark logic, but Spark lineage is not supported currently out of the box from Apache Atlas and rest of the above mentioned tools are already supported supports Nifi + Atlas integration is added part of HDP 3.1

Solution

Capturing lineage with Atlas from Nifi, Spark and Hive by solving the gap mentioned above using Spline. Spline captures and stores lineage information from internal Spark execution plans in a lightweight, unobtrusive (even if there is an issue in lineage generation , spark job will not fail ) and easy to use manner.

Here is the end to end data flow.

Nifi (Ingest two files wikidata.csv, domain.csv into HDFS) --> HDFS --> Spark (Join the wikidata.csv and domain.csv and store result as ORC file ) --> HDFS --> Hive Table ( Create Hive external table on ORC data )

Step 1: Nifi -->HDFS

Enable Nifi lineage with the help of HDF 3.1 as explained here, you will get this lineage.

70383-screen-shot-2018-04-16-at-54037-pm.png

70384-screen-shot-2018-04-16-at-54532-pm.png

70385-screen-shot-2018-04-16-at-55032-pm.png

Step 2) Spark --> HDFS

Out of the box Nifi lineage properties especially qualifedName ( qualifiedName will be filepath@culstername, you can see in the above screen) is not recognized when Spline generates the lineage as input, so modified za.co.absa.spline.persistence.atlas.conversion.DatasetConverter to recognize Nifi qualifiedName for input with the help of two new properties “cluster.name” and “absolute.base.path”.

val path = paths.map(_.path) mkString ", "
val clustername = System.getProperty("cluster.name")
val absolutebasepath = System.getProperty("absolute.base.path")
val upath = path.replace(absolutebasepath,"")
new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(upath, upath+"@"+ clustername), EndpointType.file, EndpointDirection.input, st)
spark-shell  --master yarn  --driver-java-options='-Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory' --files /usr/hdp/2.6.4.0-91/kafka/conf/producer.properties --conf 'spark.driver.extraJavaOptions=-Datlas.kafka.bootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dbootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory -Datlas.kafka.auto.commit.enable=false -Datlas.kafka.hook.group.id=atlas -Datlas.kafka.zookeeper.connect=hdp264-0.field.hortonworks.com:2181 -Datlas.kafka.zookeeper.connection.timeout.ms=30000 -Datlas.kafka.zookeeper.session.timeout.ms=60000 -Datlas.kafka.zookeeper.sync.time.ms=20 -Dcluster.name=hdp264 -Dabsolute.base.path=hdfs://hdp264-0.field.hortonworks.com:8020'
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()
import org.apache.spark.sql.SaveMode
val sourceDS = spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/wikidata.csv").as("source").filter($"total_response_size" > 1000).filter($"count_views" > 10)
val domainMappingDS =spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/domain.csv").as("mapping")
val joinedDS = sourceDS.join(domainMappingDS, $"domain_code" ===$"d_code","left_outer").select($"page_title".as("page"),$"d_name".as("domain"), $"count_views")
joinedDS.write.mode(SaveMode.Overwrite).format("orc").save("/user/nifi/sparkoutput")

70386-screen-shot-2018-04-16-at-55835-pm.png

Step 3) Hive External table on ORC data

0: jdbc:hive2://hdp264-0.field.hort>

create external table nifi_spark_hive_lineage_end_to_end ( page String, domin
String, count_views int) stored as orc location '/user/nifi/sparkoutput';

70387-screen-shot-2018-04-16-at-60516-pm.png

70389-screen-shot-2018-04-16-at-60530-pm.png

References

https://absaoss.github.io/spline/

12,264 Views