Member since
05-29-2019
9
Posts
19
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
4190 | 02-13-2018 01:01 AM |
02-09-2019
11:30 PM
3 Kudos
Abstract
This article objective is to explain how one can
Enrich Ranger Request Context with additional data which can be very useful
during policy evaluation. Problem Statement
As you all know Ranger Tag
based policies are very powerful in implementing data governance using ABAC model, one can define data attributes at Tag level within
Atlas and use them during
policy condition creation using JavaScript style execution engine, by default you
can only access certain context level data like user, group, action, IP address and tag
attribute values etc within this javascript logic, but these are not sufficient
to define complex authorization policies you see in the enterprise,
authorization polices may require additional data like project/usecase user is
assigned to, external data which influences policy evaluation etc. How are you
going to resolve these kind of situations? Solution
This is where Ranger Dynamic Context hook comes handy, Ranger framework
allow you to hook your own custom context enricher which can augment the access
request with external data so that you can use them during policy creation,
it’s a very powerful feature if you use it properly.
In this article I will explain how we can enrich
the Hive access request with additional project/usecase related data for a given
user from external file and use it while defining tag based policy. Step 1) Implementing Custom Enricher. RangerContextEnricher is an interface we need to implement to provide custom implementation, for example
here ExternalDataEnricher is a custom implementation which extracts the external data from a file for a given
user and stick into RangerAccessRequest. Once created build the jar and copy into hive lib
under /usr/hdp/3.1.0.0-78/hive/lib/ranger-hive-plugin-impl and/usr/hdp/3.1.0.0-78/ranger-hive-plugin/lib/ranger-hive-plugin-impl ( depending on your environment versions ) Sample Code attached ExternalDataEnricher.java Step 2) Updating Ranger Hive Service Definition. In order for ranger to enrich
the request context , we need to update the Ranger Hive Service definition with
following new context enricher. Here “Client_Name” is the usecase/project data attribute
we will extract from /etc/ranger/data/userProject.txt for a given user in the
form of username + “-“ + “Client_Name”. "contextEnrichers":[ {
"itemId":1, "name": "Client_Name",
"enricher":
"org.apache.ranger.plugin.contextenricher.ExternalDataEnricher",
"enricherOptions": { "contextName" :
"Client_Name", "dataFile":"/etc/ranger/data/userProject.txt"} } ] We will register above
Context Enricher with Hive service using the Ranger API. We will first retrieve
the service definition via the following GET request, where the authentication
credentials and Ranger host URI are updated as appropriate. curl --user admin:BadPass#1 --get
"http://demo.hortonworks.com:6080/service/public/v2/api/servicedef/name/hive" and then post new service definition with above contextEnrichers back to the Ranger API using
following Ranger API. curl -u admin:BadPass#1 -d @hive_service.json -X PUT -H 'Content-Type:
application/json; charset=UTF-8' http://demo.hortonworks.com:6080/service/public/v2/api/servicedef/name/hive Restart both Hive and Ranger
services once the above steps are done, make sure to have /etc/ranger/data/userProject.txt
with username-Client_Name=something for
users you need this additional data. Step 3) Using the newly added context data in policy condition. Access the newly added project/usecase data in tag based policy “JavaScript”
condition logic, use ctx.getRequestContextAttribute()
method to extract data based on ctx.getUser() value. Here I am comparing tag attribute "Data.Client_Name" with user's project/usecase value from external text file and if matches allowing the access. References https://cwiki.apache.org/confluence/display/RANGER/Dynamic+Policy+Hooks+in+Ranger+-+Configure+and+Use#DynamicPolicyHooksinRanger-ConfigureandUse-contextenricher
... View more
Labels:
12-07-2018
04:27 AM
3 Kudos
Abstract Adding known clarification tags during data Ingestion using Atlas and NiFi. Introduction Most of the times we know some metadata about
the data we are ingesting into bigdata platform and this metadata could play an
important role in data governance and security as the data moves through downstream pipeline,
metadata like terms and conditions, country, consent agreements, validity period
of the data etc, these are
very useful defining GDRP like requirements and if we don’t add these classification
during ingestion we loose the context, as you all know Apache Atlas is used as
open metadata governance tool defining these classifications and tags etc, one way is to use NiFi to ingest the data
which captures the data lineage in Atlas , but you have to add additional classification tags manually based on the nature
of the data at later point of time in the process which is error prone . Problem If you do not add metadata classifications automatically part of the data ingestion you will loose the context of metadata,. Solution In this article I will explain how we can automate the tagging of
these classifications within NiFi as we are ingesting the data. I will be using predefined classifications
for this article but one can use ML/AI to
auto classify based on the business taxonomy and keep the metadata ready for NiFi to use . I have actual data and metadata
in the following directory and goal is to ingest this data using NiFi and immediately
add classifications tags once they get ingested. In the following case I am
adding “COUNTY” with country_code attribute as “FR” and retention_period after data gets ingested into hdfs. Overall NiFi Flow : Make sure to enable Nifi + Atlas
Integration through ReportLineageToAtlas Reporting Task. Step 1 ) Ingest the data using
PutHDFS processor : Steps 2) Wait for the Atlas to
generate the lineage , currently I am waiting for 5 min and triggering further
process on fetching lineage. UpdateAttribute configurations Step 3) After 5 min delay expired,
get entity metadata from Atlas using REST API with below url , since we ingested the data
into HDFS we will use hdfs_path as type and once we get the response extract the guid using EvaluateJsonPath
, we need this guid to add classifications in the next step. http://smunigati-hdp3-1.field.hortonworks.com:21000/api/atlas/v2/search/dsl?typeName=hdfs_path&query=${whereclause:urlEncode()} Step 4) Fetch classification metadata file ( this
metadata file is located along with actual data in separate directory ) and post
the JSON data with classifications to Atlas REST API using following URL http://smunigati-hdp3-1.field.hortonworks.com:21000/api/atlas/v2/entity/bulk/classification Atlas entity got created while NiFi flows was waiting ( remember we had
5 min delay in the flow , so that Atlas will create these entities, you can
change this wait time depending on your environment and latency ) After 5 minutes delay Nifi will
fetch the guid and posts the classifications, you can see Atlas entity again
with "COUNTY" classifications and attributes with country_code as "FR" with retention_period Overall NiFi flow :
... View more
Labels:
04-16-2018
11:16 PM
13 Kudos
Abstract This article objective is to cover the end to end lineage capture with in bigdata platform Introduction A
visual representation of
data lineage helps to track data from its origin to its
destination. It explains the different processes involved in the data flow and their dependencies.
Metadata management is the key input to capturing enterprise dataflow and presenting
data
lineage from end to end
and especially in banking industry due to CCAR
requirements and regulations.
When
it comes to Hortonworks platforms ( HDP Hortonworks Data Platform , Hortonworks Data Flow) there are several components involved
beginning from data ingestion, cleansing/transformation all the way through
data analytics using Kaka, Nifi, Spark, Hive, Hive LLAP etc in solving
enterprise bigdata problems. Problem Statement
Apache
Atlas provides a scalable and extensible set of core foundational governance
services including lineage. Capturing lineage from end to end is complex
problem to solve for any regulated industry and requires integration with
several tools and we have seen Apache Spark been used as ELT engine for many data lake implementations either using external tools ( like Snaplogic,
Trifacta etc..) or custom spark logic,
but Spark lineage is not supported
currently out of the box from Apache Atlas and rest of the above mentioned tools are
already supported supports Nifi + Atlas integration is added part of HDP 3.1 Solution Capturing lineage with Atlas from Nifi, Spark and Hive by solving
the gap mentioned above using
Spline. Spline captures and stores
lineage information from internal Spark execution plans in a
lightweight, unobtrusive (even if there is an issue in lineage generation , spark job will not fail ) and easy to use manner.
Here
is the end to end data flow.
Nifi
(Ingest two files wikidata.csv, domain.csv into HDFS) --> HDFS --> Spark (Join the wikidata.csv and domain.csv
and store result as ORC file ) --> HDFS --> Hive Table ( Create Hive external table on ORC
data )
Step 1: Nifi -->HDFS Enable Nifi lineage with the help of HDF 3.1 as explained here, you will get this lineage.
Step 2)
Spark --> HDFS
Out of the box Nifi lineage properties
especially qualifedName ( qualifiedName will be filepath@culstername, you can see in the above screen) is not recognized when Spline generates the
lineage as input, so modified za.co.absa.spline.persistence.atlas.conversion.DatasetConverter
to recognize Nifi
qualifiedName for input with the help of two new properties “cluster.name” and
“absolute.base.path”.
val path = paths.map(_.path) mkString ", "
val clustername = System.getProperty("cluster.name")
val absolutebasepath = System.getProperty("absolute.base.path")
val upath = path.replace(absolutebasepath,"")
new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(upath, upath+"@"+ clustername), EndpointType.file, EndpointDirection.input, st)
spark-shell --master yarn --driver-java-options='-Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory' --files /usr/hdp/2.6.4.0-91/kafka/conf/producer.properties --conf 'spark.driver.extraJavaOptions=-Datlas.kafka.bootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dbootstrap.servers=hdp264-0.field.hortonworks.com:6667 -Dspline.persistence.factory=za.co.absa.spline.persistence.atlas.AtlasPersistenceFactory -Datlas.kafka.auto.commit.enable=false -Datlas.kafka.hook.group.id=atlas -Datlas.kafka.zookeeper.connect=hdp264-0.field.hortonworks.com:2181 -Datlas.kafka.zookeeper.connection.timeout.ms=30000 -Datlas.kafka.zookeeper.session.timeout.ms=60000 -Datlas.kafka.zookeeper.sync.time.ms=20 -Dcluster.name=hdp264 -Dabsolute.base.path=hdfs://hdp264-0.field.hortonworks.com:8020'
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()
import org.apache.spark.sql.SaveMode
val sourceDS = spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/wikidata.csv").as("source").filter($"total_response_size" > 1000).filter($"count_views" > 10)
val domainMappingDS =spark.read.option("header","true").option("inferSchema","true").csv("/user/nifi/data/domain.csv").as("mapping")
val joinedDS = sourceDS.join(domainMappingDS, $"domain_code" ===$"d_code","left_outer").select($"page_title".as("page"),$"d_name".as("domain"), $"count_views")
joinedDS.write.mode(SaveMode.Overwrite).format("orc").save("/user/nifi/sparkoutput")
Step 3) Hive
External table on ORC data
0: jdbc:hive2://hdp264-0.field.hort> create external table nifi_spark_hive_lineage_end_to_end ( page String, domin
String, count_views int) stored as orc location '/user/nifi/sparkoutput'; References https://absaoss.github.io/spline/
... View more
Labels:
02-13-2018
01:01 AM
1 Kudo
Greg, See if you can write to folder inside the bucket rather than directly writing into root level bucket. INSERT OVERWRITE DIRECTORY 's3a://demo/testdata'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
select*from demo_table;
... View more
02-05-2018
06:34 PM
Have you considered Spark Livy Interpreter , with that you won't be running multiple Spark Drivers on the Zeppelin node and free up some Memory and CPU usage, also supports user impersonation. https://zeppelin.apache.org/docs/0.7.2/interpreter/livy.html
... View more
10-12-2017
03:42 PM
if yarn.scheduler.capacity.queue-mappings-override.enable is set to false then user's have more control on which queue they want to submit the application either by setting the queue name or from default mapping, in this case user specified queue name takes the precedence , but if yarn.scheduler.capacity.queue-mappings-override.enable set to true then admin's have more control and admin can override user's explicit settings.
... View more
07-19-2016
06:49 PM
Thanks for the response .. Do you know when the new JOLT transform processor is going to be releasing ? existing 0.61. or 0.7 does not have this new processor you are talking about, but NIFI-361 ticket is talking about it.
... View more
07-18-2016
08:54 PM
Goal is to read the data from RDBMS table and store it in HDFS in avro format with additional column , let say source table has 5 columns , as part of the ingestion I would like to add additional column say "ingest_datetime" with current_time value before nifi stores the file in HDFS finally HDFS should have avro file with 6 columns in the end . Currently I am using ExecuteSQL --> PutHDFS processors
... View more
Labels:
- Labels:
-
Apache NiFi