Member since
09-26-2015
48
Posts
29
Kudos Received
6
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5985 | 10-25-2016 12:53 PM | |
5885 | 10-22-2016 10:22 PM | |
3887 | 10-22-2016 09:34 PM | |
4435 | 10-21-2016 09:56 PM | |
1919 | 07-17-2016 05:26 PM |
07-05-2022
03:49 PM
1 Kudo
There are a numerous ways of doing event driven architectures in Cloud with Cloudera Data Platform(CDP) but, lately I was working on a use case which required Spark inferences and transformations on streaming log data in Azure cloud environment on a near realtime basis. The use case is to kickoff Spark transformations as soon as the files get landed in the Azure storage container. The preferred way of doing Spark transformations for Data Engineering use cases in CDP is by leveraging Cloudera Data Engineering(CDE) which runs Spark on Kubernetes. CDE provides much better isolation, job orchestration with help of Airflow, Spark version independence, scales faster, efficiently utilizes the cloud native resources and the best part is that it does dependency management like a rockstar. Just makes your life easy and simple. CDE exposes Jobs API for integration with your CI/CD platforms. There are multiple patterns to do streaming architectures with CDE such as leveraging: Cloudera Data Flow(Powered by Apache NiFi) to invoke CDE Spark/Airflow jobs using the Jobs API Leverage Azure Cloud native frameworks such as Azure Event Grid to trigger CDE jobs using Azure Functions. In this article we will discuss the later option on how we can leverage Azure Event Grid and Azure Functions to trigger CDE Jobs. There are many other patterns that Azure Event Grid integrates with natively and everything is possible with this pattern, but for the sake of this article, we will explore how Even grid integrates with an Azure Storage containers to monitor for incoming files and invokes an Azure python functions(serverless) which invokes a Cloudera Data Engineering(CDE) Airflow/PySpark job using CDE Jobs API. Prerequisites: Azure subscription Some knowledge about Azure Event Grid such as configuring Azure storage account with Event Grid subscriptions & Azure Functions Some knowledge about Azure functions(preferably configure Azure functions with some Azure development environment such as Visual Studio Code) Azure Storage account with a container to monitor for incoming files(preferably managed by CDP if you wish to read the files in the Spark job) Cloudera Data Platform(CDP) environment running in Azure Cloudera Data Engineering(CDE) Service enabled and a Running CDE virtual cluster Some knowledge on creating resources and jobs in CDE Airflow and PySpark code uploaded to CDE resources and jobs(Airflow and Spark jobs) pre-created in CDE Azure key vault(to store CDE userid and passwords as secrets for authentication) created, granted necessary permissions to be able to work with Azure functions Project Artifacts: Azure function code: https://github.com/hrongali/CDE-Trigger-Azure-Functions-EventGrid/blob/master/EventGridTrigger2/__init__.py Other project artifacts including dependencies(requirements.txt) to that are needed by the Azure function: https://github.com/hrongali/CDE-Trigger-Azure-Functions-EventGrid Airflow and Spark scripts uploaded into CDE and Jobs pre-created.(I have chosen to run a simple Airflow job that triggers a simple Pi job as a PySpark job. https://github.com/hrongali/CDE-Trigger-Azure-Functions-EventGrid/tree/master/cde-code Processing steps: Create an Azure storage container/directory Create CDE components - Resources, Jobs Collect CDE URLs required to trigger Spark/Airflow jobs Create Azure functions app and Azure python function Create Azure key vault and provide access to Azure python function Create Event Grid subscription on storage account and configure filter to the directory to be monitored Start running the event driven pipeline with Event Grid/Azure functions and CDE Create a Azure storage container/directory Create an Azure Storage account and create a container/directory that we want to monitor for incoming files Create CDE components - Resources, Jobs Create Airflow and/or Spark jobs in CDE following these instructions Collect CDE URLs required to trigger Spark/Airflow jobs Gather the JOBS API URL for the airflow/spark job and token_url for retrieving the AUTH token JOBS API URL: Get the TOKEN URL for extracting the AUTH token for authentication purposes Create Azure functions App and Azure python function Create Azure function App with python flavor and create Azure python function: Follow the instructions here to create a Azure function App with Python/Linux flavor Configure a local Azure functions development environment with something such as Visual Studio code with Azure plugins installed. Choose "Azure Event Grid Trigger" template in Step#4 of the document instructions. Example Azure Event Grid function can be found here. Make changes to the function according to your environment. You might need python dependencies as well to run your Python azure function and you might have installed your dependencies in your local development environment in your virtual environment. You should be able to do a pip freeze > requirements.txt from your local root project folder structure to collect all your python dependencies into requirements.txt file. 2. Test the function locally(optional). Deploy the function to Azure functions App that was created in step 1-i above. You should be able to do that from Visual studio code and you might have to configure your Azure environment to be able to work with Visual Studio code. Create Azure key vault and provide access to Azure python function Cloudera Data Engineering uses JSON Web Tokens (JWT) for API authentication and you may need user id and password to get the token. Obtaining the token programmatically is covered in the Azure function code itself, but you might have to store the user id and password in a secure location within Azure such as Azure Key Vault by creating a secret. Provide access to Azure function to the Azure Key Vault by adding an "Access Policy". I followed this blog to make this integration work. Create Event Grid subscription on storage account and configure filter to the directory to be monitored Create Event Grid Subscription on the Storage Account and configure a filter to monitor only the required container/directory -- This is very important, otherwise you will end up with function triggers for every storage container activity(Be careful and test your filter thoroughly) Choose the correct filter for the container/directory, you want to monitor. On the same page, click on the "Filters" tab and enable "Enable Subject Filtering" checkbox. In my case, I wanted to monitor hrongaliazureenv/data/hrongtest where hrongaliazureenv==My storage account, data==My Container inside my storage account and hrongtest==a directory inside my "data" container. My equivalent filter is: /blobServices/default/containers/data/blobs/hrongtest/ Start running the event driven pipeline with Event Grid/Azure functions and CDE Start uploading the blobs into the hrongtest directory in your storage account and you should see CDE Spark jobs running in CDE In this case, I have uploaded 3 blobs upload1, upload2 and upload3 You should see logs from the Azure functions monitoring tab for each function invocations: From Azure functions --> Monitoring You should also see the Azure functions triggering CDE Jobs from CDE console Cloudera Data Engineering(CDE) makes it super simple to run Spark jobs at scale. Azure cloud native integration patterns such as Azure Event Grid makes it much more robust in terms of possibilities. Happy building in Azure with Cloudera Data Engineering(CDE)!!!
... View more
03-01-2020
05:45 AM
Hi, did you happen to reach to the end of this? How did you add the tables to atlas once you know what tables are missing?
... View more
09-30-2017
04:54 AM
2 Kudos
Often times, it requires us to use the plain Map-Reduce for some special custom use cases. There are many examples about how to use pure text format using Text File Input formats, but when coming to process ORC files, there aren't any. The Apache ORC website is a very good place to start with. I had a custom requirement to read data from a Hive ORC table and do some special processing which is quite difficult to do in Hive UDF. Below is the Map-Reduce code for that using all latest Hadoop 2.7.3 API Driver Code: import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Main extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Main(),args);
System.exit(res);
}
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat");
conf.set("mapreduce.input.fileinputformat.inputdir", arg0[0]);
conf.set("mapreduce.job.queuename", "platformeng");
//Job job = new Job(conf,"Read ORC Files");
Job job = Job.getInstance(conf,"Read ORC Files");
job.setJarByClass(Main.class);
job.setMapperClass(ORCReaderMapper.class);
//job.setInputFormatClass(OrcInputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
//job.setOutputKeyClass(NullWritable.class);
//job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
MultipleInputs.addInputPath(job, new Path(arg0[0]), OrcNewInputFormat.class);
//FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileInputFormat.setInputDirRecursive(job, true);
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true) ?0:1);
return 0;
}
}
Mapper Code: import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ORCReaderMapper extends
Mapper<NullWritable, OrcStruct, NullWritable, Text> {
FileSystem fs;
TypeInfo typeInfo;
ObjectInspector inspector;
Text v;
/*
* (non-Javadoc)
*
* @see org.apache.hadoop.mapreduce.Mapper#map(java.lang.Object,
* java.lang.Object, org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void map(NullWritable key, OrcStruct value,
Mapper<NullWritable, OrcStruct, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
inspector = value.createObjectInspector(typeInfo);
StructObjectInspector structObjectInspector = (StructObjectInspector) inspector;
List columnValues = structObjectInspector.getStructFieldsDataAsList(value);
String fileName = columnValues.get(0).toString();
DateWritable eventDate =(DateWritable) columnValues.get(1);
// <Your custom logic with the key and value pairs>
v.set(filename + " "+ eventDate.toString())
context.write(NullWritable.get(), v);
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
protected void setup(
Mapper<NullWritable, OrcStruct, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
typeInfo = TypeInfoUtils.getTypeInfoFromTypeString("struct<resource:string,evttime:date>");
v = new Text();
super.setup(context);
}
}
The code is written using the mapreduce API and the deprecated mapred API is avoided. You may feel free to implement the data types of your choice. If you need a reducer phase, that should be straight forward as well. Please note, the Split calculation is not straight forward as with text Input formats and are mostly driven by the number of ORC files in the directory. You might want to check the parallelism on the task that generates your source file to get more mapper parallelism in the map reduce code. Below is some useful documentation: https://orc.apache.org/docs/mapreduce.html
... View more
Labels:
10-22-2017
09:07 AM
Thanks man, thats indeed a complex application. My problem is Kafka is working, the topics are created, rangertagsync does not show any errors and still the tagsync between ranger and atlas is not working, meaning i dont see any Tags. Do you know how how can i troubleshoot ranger Tagsync precisely? Do you know which stations(components, ports, etc.) a Tag is taking frm ranger to Atlas and vice versa? I do havr the 2.5.3 with Atlas 0.7.0 Anything would help! Thank you, Regards, Normen
... View more
10-26-2016
09:20 AM
@Hari Rongali I set mapreduce.job.queuename=<queue name> and it works. Thanks a lot for your answer.
... View more
11-30-2017
07:55 AM
hdp 2.3 and hive 1.2 the hive.enforce.bucketing is default true What is the need to set?
... View more
08-05-2016
06:16 AM
2 Kudos
Hi @Kumar Veerappan, You can find namenode via command line using : dfsadmin -report or <code>hadoop getconf -namenodes ( you can use this to get secondary namenode/backup node etc)
From ambari, you can go to services page, get to the service whose admin you want and click on the component link to find the component host. For eg, to find the namenode go to HDFS service page and click on the link for namenode : Else, move to the hosts page and search for the component : screen-shot-2016-08-05-at-114146-am.png Based on the version you are using, the filter page may vary. But every version has a filter for component type which will give you host name. Hope this helps!
... View more
04-05-2016
06:21 PM
@also take a look here about spark security. SASL encryption is currently supported for the block transfer service when authentication ( spark.authenticate ) is enabled. To enable SASL encryption for an application, set spark.authenticate.enableSaslEncryption to true in the application’s configuration. When using an external shuffle service, it’s possible to disable unencrypted connections by setting spark.network.sasl.serverAlwaysEncrypt to true in the shuffle service’s configuration. If that option is enabled, applications that are not set up to use SASL encryption will fail to connect to the shuffle service.
... View more
01-27-2016
08:36 PM
3 Kudos
@hrongali As a rule of thumb CPU usage on Isilon is dictated by the write workload. This is mainly to do the FEC and striping calculations. There are of course exceptions to this rule but it is a good rule to use in these environments. The namenode service is relatively light weight and never gets in the way. The majority of the network traffic is datanode traffic between the compute nodes and Isilon.
... View more
11-18-2015
12:47 AM
2 Kudos
@hrongali I am following stackoverflow link. Can you try this? sqoop import --verbose --driver com.sybase.jdbc4.jdbc.SybDriver --connect jdbc:sybase:Tds:dbgbl-tst:8032/DATABASE=trim_bw --query "select * from trim_bw..account where \$CONDITIONS" --fields-terminated-by ',' --username hrongali -P --hive-database trim_bw --hive-table account --hive-import -m 1
... View more