Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
avatar
Contributor

Introduction:

When I start thinking about a leading opensource cloud computing platform, Openstack comes into a picture. OpenStack controls a large pools of compute, storage, and networking resources throughout a datacenter. Openstack has gained a very high popularity in the technology market. It introduces a new services and features in every release cycle to address critical IT requirements. A very important requirement for any IT organization is to build a robust platform to perform data analytics with a large data sets. BigData is the latest buzzword in the IT Industry. This article primarily focusses on how Openstack plays a key role in addressing BigData usecase.

Bigdata on Openstack!

Now a days the data is generated everywhere and its volume is growing exponentially. Data is generated from web server, application server, database server in the form of user information, log files and system state information. Apart from this, a huge volume of data is generated from IoT devices like sensors, vehicles, industrial devices. Data generated from the scientific simulation model is also an example of big data source. It is difficult to store these data and perform analytics with a traditional software tools. Hadoop can address the issue.

Let me share my usecase with you! I have a bulk amount of data stored in RDBMS environment. It is not performing well when the data grows bigger. I could not imagine its performance when it grows even bigger. I do not feel better to adopt with NoSQL culture in this stage. I need to store and process a bulk amount of data in a cost effective way. Should I rely on high end server in a non virtualized environment? My requirement is to scale the cluster at any time and need a better dashboard to manage all of its components. I planned to set up a hadoop cluster on top of openstack and created my ETL job environment. Hadoop is an industry standard framework for storing and analyzing a large data set with its fault tolerant hadoop distributed file system and mapreduce implematation. Scalability is a very common problem in a typical hadoop cluster. Openstack has introduced a project called Sahara – Data Processing as a Service. Openstack Sahara aims to provision and manage data processing frameworks such as hadoop mapreduce, spark and storm in a cluster topology. This project is similar to the Data analytics platform provided by Amazon Elastic MapReduce(EMR) service. Sahara deploys the cluster in a few minutes. Besides, Openstack Sahara can scale the cluster by adding or removing worker nodes based on demand.

Benefits of managing hadoop cluster with Openstack Sahara:

  • Cluster can be provisioned faster and easy to configure.
  • Like other openstack services, Sahara service can be managed through a powerful REST API, CLI and horizon dashboard.
  • Plugins available to support mutliple hadoop vendor such as Vannila(Apache Hadoop), HDP(ambari), CDH(coudera), MapR, Spark, Storm.
  • Cluster size can be scaled up and down based on demand.
  • It can be integrated with Openstack Swift to store the data processed by hadoop and spark.
  • Cluster monitoring can be made simple.
  • Apart from cluster provisioning, sahara can be used as “Analytics as a Service” for ad-hoc or bursty analytic workloads. Here, we can select the framwork and load the job data. It bahaves as a transient cluster and it will be automatically after job completion.

Architecture:

Openstack sahara is architected to leverage the core services and other fully managed services of openstack. It makes Sahara more reliable and ability to manage the hadoop cluster efficiently. We can optionally use services such as Trove, Swift in your deployment. Let us look into the internals of Sahara service.

64839-sahara.png

  • Sahara service has an API server which responds to HTTP request from the end user and interacts with other openstack services to perform its function.
  • Keystone(Identity as a Service) – authenticates users and provides security tokens that are used to work with OpenStack, limiting a user’s abilities in sahara to their OpenStack privileges.
  • Heat(Orchestration as a Service) – used to provision and orchestrate the deployment of data processing clusters.
  • Glance(Virtual Machine Image as a Service) – stores VM images with operating system and pre-installed hadoop/spark software packages to create a data processing cluster.
  • Nova(Compute as a Service) – provisions a virual machine for data processing clusters.
  • Ironic(Bare metal as a Service) provisions a bare metal node for data processing clusters.
  • Neutron(Networking as a Service) – facilitates networking services from basic to advanced topology to access the data processing clusters.
  • Cinder(Block Storage) – provides a persistant storage media for cluster nodes.
  • Swift(Object Storage) – provides a reliable storage to keep job binaries and the data processed by hadoop/spark.
  • Designate(DNS as a Service) – provides a hosted zone to keep DNS records of the cluster instances. Hadoop services communicates with the cluster instances by their hostnames.
  • Ceilometer(Telemetry as a Service) – collects and stores the metrics about the cluster for metering and monitoring purposes.
  • Manila(File Share as a Service) – can be used to store job binaries and data created by the job.
  • Barbican(Key Management Service) – stores sensitive data such as password and private keys securely.
  • Trove(Database as a Service) – provides data base instance for hive metastore and to store the states of the hadoop services and other management services.

How to setup sahara cluster?

Openstack team has given a clear document to setup Sahara service. Please follow the steps given in the installation guide for deploying Sahara in your environment. There are several ways where we can deploy sahara service. For experimenting Sahara service, kolla is a preferrable choice.

You can also manage sahara project through horizon dashboard.

https://docs.openstack.org/sahara/latest/install/index.html

ETL(Extract, Transform and Load) or ELT(Extract, Load and Transform) with Sahara Cluster:

There are numerous ETL tools available in the market and tradtitional data warehouse has it own benefits and limitations, it might be in some other location other than your data source. The reason I am targetting hadoop is that it is an ideal platform to run your ETL jobs. Data in your datastore has a varitey of data including structured, semi-structured and unstructured. Hadoop ecosystem has tools to ingest data from different data sources including databases, files and other data streams and store it in a centralized Hadoop Distributed File System(HDFS). As the data grows rapidly, Hadoop cluster can be scaled and leverages Openstack Sahara.

64840-etl.png

Apache Hive is the data warehouse project built on top of the hadoop ecosystem. It is a proven tool for doing ETL analysis. Once the data is extracted from the data sources with tools such as Sqoop, Flume, Kafka, etc., it should be cleansed and transformed by hive or pig scripts by Mapreduce technique. Later we can load the data to the table we created while transformation phase and it is subjected to perform analysis. It can be generated as a report and visualized with any BI tools.

Another advantage of hive is that it is an interactive query engine and it can be accessed via Hive Query Language. It resembles SQL. So, database guy can execute job in hadoop ecosystem without prior knowledge in Java and Mapreduce concepts. Hive query execution engine parses the hive query and convert it in to a sequence of Mapreduce/Spark job to a cluster. Hive can be accessed by JDBC/ODBC driver and thrift clients.

Oozie is a workflow engine available in hadoop ecosystem, A workflow is nothing but a set of tasks that needs to be executed as a sequene in a distributed environment. Oozie helps us to create a simple workflow to cascading multiple workflows and create coordinated jobs. However it is ideal for crating workflow for complex ETL jobs, It does not have modules to support all actions related to hadoop. A large organization might their own workflow engine to execute their tasks. We can use any workflow engnine to carryout our ETL job. Openstack mistral – Workflow as a Service is the best example. Apache oozie resembles Openstack mistral in some aspects. It acts a job scheduler and it can be triggered at reqular intervals of time.

Let us look at a typical ETL job process with hadoop. An application stores its data in a mysql server. The data stored in DB server should be analysed at a minium cost and time.

Extract Phase:

The very first step is to extract data from mysql and store it in HDFS. Apache sqoop can be used to export/import from a structured data source such as RDBMS data store. If the data to be extracted is of semi-structured or unstructured, we can use Apache Flume to ingest the data from data soruce such as web server log, twitter data stream, sensor data.

sqoop import --connect jdbc:mysql://<database host>/<database name> --driver com.mysql.jdbc.Driver–table <table name> --username <database user> --password <database password> --num-mappers 3 --target-dir hdfs://hadoop_sahara_local/user/rkkrishnaa/database

Transform phase:

The data extracted from the above phase is not in a proper format. It is just a raw data. It should be cleansed by applying a proper filter and data aggregation from multiple tables. This is our staging and intermediate area for storing data in HDFS.

Now, We should design hive schema for each table and create database to transform the data stored in staging area. Typically, your data is in csv format and each record is delimited by comma(,)

We don’t need to check HDFS data to know how it is stored. Since we imported from MySQL, we aware of the schama. It is almost compatible with hive except some data types.

Once the database is modelled, we can load the extracted data for cleaning. Still the data in the table is de-normalized. Aggregate the required columns from the different table.

Open hive or beeline shell or if you have HUE(Hadoop User Experience) installed, please open hive editor and run the commands given below.

CREATE DATABASE <database name>;
USE <database name>;
CREATE TABLE <table name>
(variable1 DATATYPE ,
variable2 DATATYPE ,
variable3 DATATYPE ,
. ,
. ,
. ,
variablen DATATYPE
)
PARTITIONED BY (Year INT, Month INT )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
LOAD DATA INPATH 'hdfs://hadoop_sahara_local/user/rkkrishnaa/database/files.csv'

Similarly, we can aggreate the data from the multiple tables with ‘OVERWRITE INTO TABLE’ statement.

Hive supports partitioning table to improve the query performance by distributing exection load horizontally. We prefer to partition the column which stores Year and Month. Sometimes, partitioned table creates more tasks in a mapreduce job.

Load Phase:

We have checked about the transform phase before. It is time to load the transformed data in to a data warehouse dirctory in HDFS, which is the final state of the data. Here we can apply our SQL queries to get an appropriate results.

All DML commands can be used to analyse the warehouse data based on the use case.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML

Results can be download as a csv, table or chart for analysis. It can be integrated with other popular BI tools such as Talend OpenStudio, Tabelau, etc.

What Next?

We had a brief look at the benefits of Openstack Sahara and a typical example of ETL task in hadoop ecosystem. It is time to automate the ETL job with Oozie workflow engine.

If you are having experience in using mistral, you can adhere to it. Most of the hadoop users are acquainted with Apache Oozie, So I take Oozie as an example.

Step1: Create job.properties file

nameNode=hdfs://hadoop_sahara_local:8020
jobTracker=<rm-host>:8050
queueName=default
examplesRoot=examples
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}
oozie.libpath=/user/root

Step2: Create workflow.xml file to define the tasks for extracting data from mysql data store to HDFS

<workflow-app xmlns=”uri:oozie:workflow:0.2″ name=”my-etl-job”>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
</global>

<start to=”extract”/>
<action name=”extract”>
<sqoop xmlns=”uri:oozie:sqoop-action:0.2″>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import –connect jdbc:mysql://<database host>:3306/<database name> –username <database user> –password <database password> –table <table name> –driver com.mysql.jdbc.Driver –target-dir hdfs://hadoop_sahara_local/user/rkkrishnaa/database${date} –m 3 </command>
</sqoop>

<ok to=”transform”/>
<error to=”fail”/>

<action name=”transform”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>/user/rkkrishnaa/oozie/hive-site.xml</value>
</property>
</configuration>
<script>transform.q</script>
</hive>

<ok to=”load”/>
<error to=”fail”/>

<action name=”load”>
<hive xmlns=”uri:oozie:hive-action:0.2″>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>/user/rkkrishnaa/oozie/hive-site.xml</value>
</property>
</configuration>
<script>load.q</script>
</hive>

<ok to=”end”/>
<error to=”fail”/>
</action>
<kill name=”fail”>
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>

<end name=”end”/>
</workflow-app>

Step3: Create file transform.q

LOAD DATA INPATH 'hdfs://hadoop_sahara_local/user/rkkrishnaa/database${date}/files.csv'

Step4: Create file load.q

SELECT name, age, department from sales_department where experience > 5;

The above workflow job will do ETL operation. We can customize the workflow job based on our use case, we have lot of mechanisms to handle task failure in our workflow job. Email action helps to track the status of the workflow task. This ETL job can be executed on daily basis or weekly basis at any time. Usually organizations put these kind of long running tasks during weekends or at night time.

To know more about workflow action: https://oozie.apache.org/docs/3.3.1/index.html

Conclusion:

Openstack has integrated a very large hadoop ecosystem to its universe. Many cloud providers offers hadoop service within some clicks on their cloud management portal. Openstack Sahara proves the ability and robustness of openstack infrastructure. According to Openstack user survey, Very few organizations has adopted to use Sahara service in their private cloud. Sahara supports most of the hadoop vendor plugins to operate hadoop workload effectively. Execute your ETL workflow with Openstack Sahara.

Enjoy Learning!
|| Tamasoma Jyodhirgamaya ||

References:
[1] https://www.openstack.org/software/sample-configs#big-data
[2] https://docs.openstack.org/sahara/latest/install/index.html
[3] https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
[4] https://oozie.apache.org/docs/3.3.1/index.html

3,494 Views