Member since
09-26-2015
48
Posts
29
Kudos Received
6
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
6411 | 10-25-2016 12:53 PM | |
7084 | 10-22-2016 10:22 PM | |
4337 | 10-22-2016 09:34 PM | |
4931 | 10-21-2016 09:56 PM | |
2142 | 07-17-2016 05:26 PM |
07-05-2022
03:49 PM
1 Kudo
There are a numerous ways of doing event driven architectures in Cloud with Cloudera Data Platform(CDP) but, lately I was working on a use case which required Spark inferences and transformations on streaming log data in Azure cloud environment on a near realtime basis. The use case is to kickoff Spark transformations as soon as the files get landed in the Azure storage container. The preferred way of doing Spark transformations for Data Engineering use cases in CDP is by leveraging Cloudera Data Engineering(CDE) which runs Spark on Kubernetes. CDE provides much better isolation, job orchestration with help of Airflow, Spark version independence, scales faster, efficiently utilizes the cloud native resources and the best part is that it does dependency management like a rockstar. Just makes your life easy and simple. CDE exposes Jobs API for integration with your CI/CD platforms. There are multiple patterns to do streaming architectures with CDE such as leveraging: Cloudera Data Flow(Powered by Apache NiFi) to invoke CDE Spark/Airflow jobs using the Jobs API Leverage Azure Cloud native frameworks such as Azure Event Grid to trigger CDE jobs using Azure Functions. In this article we will discuss the later option on how we can leverage Azure Event Grid and Azure Functions to trigger CDE Jobs. There are many other patterns that Azure Event Grid integrates with natively and everything is possible with this pattern, but for the sake of this article, we will explore how Even grid integrates with an Azure Storage containers to monitor for incoming files and invokes an Azure python functions(serverless) which invokes a Cloudera Data Engineering(CDE) Airflow/PySpark job using CDE Jobs API. Prerequisites: Azure subscription Some knowledge about Azure Event Grid such as configuring Azure storage account with Event Grid subscriptions & Azure Functions Some knowledge about Azure functions(preferably configure Azure functions with some Azure development environment such as Visual Studio Code) Azure Storage account with a container to monitor for incoming files(preferably managed by CDP if you wish to read the files in the Spark job) Cloudera Data Platform(CDP) environment running in Azure Cloudera Data Engineering(CDE) Service enabled and a Running CDE virtual cluster Some knowledge on creating resources and jobs in CDE Airflow and PySpark code uploaded to CDE resources and jobs(Airflow and Spark jobs) pre-created in CDE Azure key vault(to store CDE userid and passwords as secrets for authentication) created, granted necessary permissions to be able to work with Azure functions Project Artifacts: Azure function code: https://github.com/hrongali/CDE-Trigger-Azure-Functions-EventGrid/blob/master/EventGridTrigger2/__init__.py Other project artifacts including dependencies(requirements.txt) to that are needed by the Azure function: https://github.com/hrongali/CDE-Trigger-Azure-Functions-EventGrid Airflow and Spark scripts uploaded into CDE and Jobs pre-created.(I have chosen to run a simple Airflow job that triggers a simple Pi job as a PySpark job. https://github.com/hrongali/CDE-Trigger-Azure-Functions-EventGrid/tree/master/cde-code Processing steps: Create an Azure storage container/directory Create CDE components - Resources, Jobs Collect CDE URLs required to trigger Spark/Airflow jobs Create Azure functions app and Azure python function Create Azure key vault and provide access to Azure python function Create Event Grid subscription on storage account and configure filter to the directory to be monitored Start running the event driven pipeline with Event Grid/Azure functions and CDE Create a Azure storage container/directory Create an Azure Storage account and create a container/directory that we want to monitor for incoming files Create CDE components - Resources, Jobs Create Airflow and/or Spark jobs in CDE following these instructions Collect CDE URLs required to trigger Spark/Airflow jobs Gather the JOBS API URL for the airflow/spark job and token_url for retrieving the AUTH token JOBS API URL: Get the TOKEN URL for extracting the AUTH token for authentication purposes Create Azure functions App and Azure python function Create Azure function App with python flavor and create Azure python function: Follow the instructions here to create a Azure function App with Python/Linux flavor Configure a local Azure functions development environment with something such as Visual Studio code with Azure plugins installed. Choose "Azure Event Grid Trigger" template in Step#4 of the document instructions. Example Azure Event Grid function can be found here. Make changes to the function according to your environment. You might need python dependencies as well to run your Python azure function and you might have installed your dependencies in your local development environment in your virtual environment. You should be able to do a pip freeze > requirements.txt from your local root project folder structure to collect all your python dependencies into requirements.txt file. 2. Test the function locally(optional). Deploy the function to Azure functions App that was created in step 1-i above. You should be able to do that from Visual studio code and you might have to configure your Azure environment to be able to work with Visual Studio code. Create Azure key vault and provide access to Azure python function Cloudera Data Engineering uses JSON Web Tokens (JWT) for API authentication and you may need user id and password to get the token. Obtaining the token programmatically is covered in the Azure function code itself, but you might have to store the user id and password in a secure location within Azure such as Azure Key Vault by creating a secret. Provide access to Azure function to the Azure Key Vault by adding an "Access Policy". I followed this blog to make this integration work. Create Event Grid subscription on storage account and configure filter to the directory to be monitored Create Event Grid Subscription on the Storage Account and configure a filter to monitor only the required container/directory -- This is very important, otherwise you will end up with function triggers for every storage container activity(Be careful and test your filter thoroughly) Choose the correct filter for the container/directory, you want to monitor. On the same page, click on the "Filters" tab and enable "Enable Subject Filtering" checkbox. In my case, I wanted to monitor hrongaliazureenv/data/hrongtest where hrongaliazureenv==My storage account, data==My Container inside my storage account and hrongtest==a directory inside my "data" container. My equivalent filter is: /blobServices/default/containers/data/blobs/hrongtest/ Start running the event driven pipeline with Event Grid/Azure functions and CDE Start uploading the blobs into the hrongtest directory in your storage account and you should see CDE Spark jobs running in CDE In this case, I have uploaded 3 blobs upload1, upload2 and upload3 You should see logs from the Azure functions monitoring tab for each function invocations: From Azure functions --> Monitoring You should also see the Azure functions triggering CDE Jobs from CDE console Cloudera Data Engineering(CDE) makes it super simple to run Spark jobs at scale. Azure cloud native integration patterns such as Azure Event Grid makes it much more robust in terms of possibilities. Happy building in Azure with Cloudera Data Engineering(CDE)!!!
... View more
09-30-2017
04:54 AM
2 Kudos
Often times, it requires us to use the plain Map-Reduce for some special custom use cases. There are many examples about how to use pure text format using Text File Input formats, but when coming to process ORC files, there aren't any. The Apache ORC website is a very good place to start with. I had a custom requirement to read data from a Hive ORC table and do some special processing which is quite difficult to do in Hive UDF. Below is the Map-Reduce code for that using all latest Hadoop 2.7.3 API Driver Code: import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Main extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Main(),args);
System.exit(res);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat");
conf.set("mapreduce.input.fileinputformat.inputdir", arg0[0]);
conf.set("mapreduce.job.queuename", "platformeng");
//Job job = new Job(conf,"Read ORC Files");
Job job = Job.getInstance(conf,"Read ORC Files");
job.setJarByClass(Main.class);
job.setMapperClass(ORCReaderMapper.class);
//job.setInputFormatClass(OrcInputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
//job.setOutputKeyClass(NullWritable.class);
//job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
MultipleInputs.addInputPath(job, new Path(arg0[0]), OrcNewInputFormat.class);
//FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileInputFormat.setInputDirRecursive(job, true);
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ?0:1);
return 0;
}
}
Mapper Code: import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ORCReaderMapper extends
Mapper<NullWritable, OrcStruct, NullWritable, Text> {
FileSystem fs;
TypeInfo typeInfo;
ObjectInspector inspector;
Text v;
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object,
* java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void map(NullWritable key, OrcStruct value,
Mapper<NullWritable, OrcStruct, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
inspector = value.createObjectInspector(typeInfo);
StructObjectInspector structObjectInspector = (StructObjectInspector) inspector;
List columnValues = structObjectInspector.getStructFieldsDataAsList(value);
String fileName = columnValues.get(0).toString();
DateWritable eventDate =(DateWritable) columnValues.get(1);
// <Your custom logic with the key and value pairs>
v.set(filename + " "+ eventDate.toString())
context.write(NullWritable.get(), v);
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void setup(
Mapper<NullWritable, OrcStruct, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
typeInfo = TypeInfoUtils.getTypeInfoFromTypeString("struct<resource:string,evttime:date>");
v = new Text();
super.setup(context);
}
}
The code is written using the mapreduce API and the deprecated mapred API is avoided. You may feel free to implement the data types of your choice. If you need a reducer phase, that should be straight forward as well. Please note, the Split calculation is not straight forward as with text Input formats and are mostly driven by the number of ORC files in the directory. You might want to check the parallelism on the task that generates your source file to get more mapper parallelism in the map reduce code. Below is some useful documentation: https://orc.apache.org/docs/mapreduce.html
... View more
Labels:
06-18-2017
11:56 AM
6 Kudos
When an object gets created in Hive metastore, we expect to see the entry in Atlas almost instantaneously. Often times, might be because of disruption in availability of some of the core components on which Atlas is dependent on or the methodology that was used sometimes to create metastore objects(ex: Directly use metastore API to create Hive objects) where Atlas hook doesn't get invoked, we some times see missing tables in Atlas. This article aims at identifying the missing table entries in Atlas. Once the missing table entries are identified, we can post them to Atlas. The easiest method of doing that will be explained in a later article(Will update the reference link in this article when I am done writing the second article). Step#1: Identify the Hive tables from the metastore. There are multiple ways of gathering this information. The easiest way is to write an SQL over "hive" database to get all the information. In my example, the metastore is based on MySQL. From command line on Hive metastore mysql -u root -e "use hive;SELECT NAME, TBL_NAME FROM DBS as a, TBLS as b where a.DB_ID=b.DB_ID;" > tables.txt
sed -i '1d' tables.txt
(The above command will get rid of the heading) Step#2: Download the attached findMissingTablesInAtlas script to your host into your home directory or any directory of your choice. Make sure the tables.txt file with the table names is present in the same directory as well. Step#2.1: Make sure you have the comma separated user id and password in you ~/.cred file. The user id should have nesessary previliges to post to Atlas. if you are missing those, you can grant them using Ranger policy. $ cat ~/.cred
<my user id>,<my password> Step#3: Edit the script and make changes as per your environment. Change the atlasServer and clusterName values. The script will by default read the tablex.txt from the same location as in python script. Step#4: Thats it! Run the script using the below command: python findMissingTablesInAtlas.txt Step#5: After the script execution was successful, you should see the below two files in the same directory: list_of_tables_exists_in_atlas list_of_tables_not_exists_in_atlas In the next posting, instructions for posting these tables to Atlas will be provided.
... View more
Labels:
03-24-2017
11:49 PM
5 Kudos
By definition, Atlas provides a
scalable and core foundational services for Data Governance - enabling
enterprises to efficiently and effectively meet their compliance requirements
with their Hadoop eco system. However, it is a complex application, which is
built using the integration of various components in the Hadoop eco system.
Below are the components involved in the Architecture:
Atlas Ranger Hive HBase Ambari
Infra Kafka The intention of this article is to provide
troubleshooting tips if Ambari install is not functioning correctly. Install validation and troubleshooting tips:
Make sure Atlas Metadata Server and Atlas
Metadata clients are installed from Ambari. Install Ranger Tagsync component if you wish to
do authorization control using Atlas Tags in Ranger. Make sure the below Kafka topics are created in
Kafka:
ATLAS_HOOK ATLAS_ENTITIES
You can check this by
using the following command on any of the kafka brokers: (In Kerberized cluster
you need kafka key tab to run this command) /usr/hdp/current/kafka-broker/bin/kafka-topics.sh
—list —zookeeper <ZK1:2181,ZK2:2181,ZK3:2181> This command should return the below results:
ATLAS_HOOK ATLAS_ENTITIES
If the Kafka topics
didn’t get created, you can create them manually using the attached
atlas_kafka_acl.sh script. All you need to do is to update the Zookeeper quorum
in the script and run it on any Kafka broker. Depending upon whether
you have Ranger in your environment and whether Kafka topic authorization is
controlled by Ranger, You should see necessary policies created in Ranger Kafka
repository in Ranger. If you don’t find those policies, you need to create
policies in Ranger Kafka repository in Ranger granting necessary accesses to
ATLAS_ENTITIES and ATLAS_HOOK topics. Below are how the policies should be set
up: in Ranger —> Resource based policies —> <Cluster Name>_kafka
Policy Name:
atlas_hook
Topic: ATLAS_HOOK Allow Conditions:
Select Group(public),
Permissions(Publish, Create) Select User(atlas),
Permissions(Consume, Create) Policy Name: atlas_entities Topic: ATLAS_ENTITIES Allow Conditions: Select Group(atlas), Permissions(Publish, Create) Select User(rangertagsync), Permissions(Consume, Create) Make sure the policies
get synced to Kafka.
Create some entities through beeline and the
event should trigger atlas hook to post the event to Kafka which is eventually
consumed by atlas user into Atlas.
Ranger audits tab and Atlas log should help in
debugging any kind of issues related to access denial while the entity is being
transferred to Atlas thru Kafka. Incase if the Hive CLI
access is not working as expected in kerberized clusters, you can work with
Hortonworks support in resolving the issue. Please up vote if this article is helpful.
... View more
Labels:
10-27-2016
02:27 PM
@Roman Boyko This seems to be a Hive metastore issue and it looks like the one of the user who was accessing the mestastore before is holding a lock on the resource. Please work with Hortonworks Support/Engineering to get this issue resolved.
... View more
10-25-2016
12:53 PM
@Pooja Kamle I dont see a default queue in your capacity scheduler. It is always recommended to have a defaut queue with some capacity so any queries that doesnt have a queue mapped can go to the default queue. As far as the current issue you are having, please set the below parameter before running your query. if the query is running as MR: set mapreduce.job.queuename=<queue name> If the query is running as TEZ enginer: set tez.queue.name=<queue name> You can always pass this parameter as hiveconf as well. Let me know if this helps
... View more
10-23-2016
01:42 AM
@Riaz Lala Glad that it was helpful. Can you upvote and accept the answer please? Thanks
... View more
10-23-2016
12:23 AM
@Roland Simonis - Can you please vote and accept the answer if the explanation answers your question?
... View more
10-22-2016
10:36 PM
@S. Sali I am pretty sure HDP 2.3.2.0 does not have Hive2.x and Hive 2.x is GA in future releases of HDP, probably from HDP 2.6 or later. If you are okay with the solution provided, can you please upvote and accept the answer ? Thanks
... View more
10-22-2016
10:22 PM
1 Kudo
@S. Sali I am pretty sure it might be something related to the way the bucketed table is loaded. Did you have necessary properties set when loading the Hive table ? Especially hive.enforce.bucketing = true https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables I would recommend recreate the table by setting necessary properties and let me know if that works.
... View more