Support Questions
Find answers, ask questions, and share your expertise

How to create Etl job in spark

Highlighted

Re: How to create Etl job in spark

Contributor

So you mean i have to use third party tool like pentaho, informatica etc in order to create etl job ?

Highlighted

Re: How to create Etl job in spark

Contributor

So you mean i have to use third party tool like pentaho,informatica etc for etl job ? I thought like there is a features available inside hadoop itself for creating job like mentioned in this url but my understanding were wrong then right ?

 

https://drive.google.com/open?id=0B-wEtRLWeFvMMGt1LWJUbURsTDA

 

 

Highlighted

Re: How to create Etl job in spark

Champion
In the open source community the closest is NiFi, but that is its own project and not part of the Apache Hadoop project. It would be your responsibility to integrate and manage it. Cloudera assumes this responsibility for you when they choose to support a project in CDH. Cloudera currently does not support NiFi but may in the future.

So no, there is no way to build ETL jobs using a GUI.
Highlighted

Re: How to create Etl job in spark

Explorer

The starting point of every Spark application is the creation of a SparkSession. This is a driver process that maintains all relevant information about your Spark Application, and it is also responsible for distributing and scheduling your application across all executors. We can simply create a SparkSession in the following way:

Python
 
 
 
 
 
1
def initialize_spark ():
2
    spark = SparkSession.builder \
3
        .master("local[*]") \
4
        .appName("simple etl job") \
5
        .getOrCreate()
6
    return spark
 
 

 

The getOrCreate() method will try to get a SparkSession if one is already created, otherwise, it will create a new one. With the master option, it is possible to specify the master URL that is being connected. However, because we’re running our job locally, we will specify the local[*] argument. This means that Spark will use as many worker threads as logical cores on your machine. We set the application name with the appName option, this name will appear in the Spark UI and log data.

Our next step is to read the CSV file. Reading in a CSV can be done with a DataFrameReader that is associated with our SparkSession. 

To choose for schema inference or manually defining a schema depends heavily on the use case, in case of writing an ETL job for a production environment, it is strongly recommended to define a schema in order to prevent inaccurate data representation. Another constraint of schema inference is that it tends to make your Spark application slower, especially when working with CSV or JSON. The example below, how to read in data with a prior defined schema:

Python
 
 
 
 
 
 
1
def load_df_with_schema(spark):
2
    schema = StructType([
3
        StructField("dateCrawled", TimestampType(), True),
4
        StructField("name", StringType(), True),
5
        StructField("seller", StringType(), False),
6
        StructField("offerType", StringType(), True),
7
        StructField("price", LongType(), True),
8
        StructField("abtest", StringType(), True),
9
        StructField("vehicleType", StringType(), True),
10
        StructField("yearOfRegistration", StringType(), True),
11
        StructField("gearbox", StringType(), True),
12
        StructField("powerPS", ShortType(), True),
13
        StructField("model", StringType(), True),
14
        StructField("kilometer", LongType(), True),
15
        StructField("monthOfRegistration", StringType(), True),
16
        StructField("fuelType", StringType(), True),
17
        StructField("brand", StringType(), True),
18
        StructField("notRepairedDamage", StringType(), True),
19
        StructField("dateCreated", DateType(), True),
20
        StructField("nrOfPictures", ShortType(), True),
21
        StructField("postalCode", StringType(), True),
22
        StructField("lastSeen", TimestampType(), True)
23
    ])
24
 
25
    df = spark \
26
        .read \
27
        .format("csv") \
28
        .schema(schema)         \
29
        .option("header", "true") \
30
        .load(environ["HOME"] + "/data/autos.csv")
31
 
32
    print("Data loaded into PySpark", "\n")
33
    return df
 
 

 

Transform

We have a closer look at our data and start to do more interesting stuff:

 

 

Sample five rows of the car dataset

As you can see, there are multiple columns containing null values. We can handle missing data with a wide variety of options. However, discussing this is out of the scope of this article. As a result, we choose to leave the missing values as null. However, there are more strange values and columns in this dataset, so some basic transformations are needed:

The rationale for this cleaning is based on the following: the columns “dateCrawled” and “lastSeen” doesn’t seem to be useful for any future analysis. All the values in the column “nrOfPictures” were equal to 0, hence we decided to drop this column.

seller
gewerblich 3
private         371525

offerType
Angebot 371513
Gesuch         12

Inspecting the columns “seller” and “offerType” resulted in the following numbers. As a result, we can remove the three rows containing value “gewerblich” and then drop the column “seller”. The same logic applies also for the column “offerType”, consequently, we’re left with a more balanced dataset. For example, we leave the dataset like this:

 

 

Last five rows of the ‘cleaned’ car dataset

Load

We have translated our raw data into analysis-ready data, hence we’re ready to load our data into our locally running MySQL database for further analysis. For example, we initialized a MySQL database with “autos” and a table with “cars”.

Steps to connect MySQL database in Python using MySQL Connector Python

1. Install MySQL Connector Python using pip.

2. Use mysql.connector.connect() method of MySQL Connector Python with required parameters to connect MySQL.

3. Use the connection object returned by a connect() method to create a cursor object to perform Database Operations.

4. The cursor.execute() to execute SQL queries from Python.

5. Close the Cursor object using a cursor.close() and MySQL database connection using connection.close() after your work completes.

6. Catch Exception if any that may occur during this process.

Now that we have created a cursor, we are able to create a table named “cars” in our “autos” database:

After creating the table, it’s now ready to be populated with our dataset. We can insert our data, by providing our data as a list of tuples (where each record is one tuple) to our INSERT statement:

As a result, it is now possible to execute this command with our previously defined cursor:

Python
 
 
 
 
 
 
1
cur.executemany(insert_query, cars_seq) # we are inserting multiple rows (from a List) into the table.
 
 

 

Which gives us the following output in our terminal:

Printing 3 rows

 

 

 

We need to call the following code to commit our transaction to MySQL:

Python
 
 
 
 
 
 
1
conn.commit()
 
 

 

And to make sure, we can check in MySQL workbench if the dataset is loaded correctly in MySQL:

 

 

 

Re: How to create Etl job in spark

Explorer

Spark is an incredible asset for removing information, running changes, and stacking the outcomes in an information store.

Spark runs calculations in equal so execution is extremely quick and groups can be scaled up for enormous information. Spark's local API and spark-daria's Etl Definition object take into consideration rich meanings of ETL rationale.

Extract

Assume you have an information pool of Parquet records. Here's some model code that will bring the information lake, channel the information, and afterward repartition the information subset.

 

val dataLakeDF = spark.read.parquet("s3a://some-bucket/foo")
val extractDF = dataLakeDF
.where(col("mood") === "happy")
.repartition(10000)

Transform

We can characterize a custom change work that accepts a DataFrame as a contention and returns a DataFrame to change the extractDF. Custom change capacities are reusable and effectively testable, so this makes a top notch codebase.

 

Load

We can utilize the Spark DataFrame essayists to characterize a nonexclusive capacity that composes a DataFrame to a given area in S3.

def exampleWriter()(df: DataFrame): Unit = {
val path = "s3a://some-bucket/extracts/bar"
df.write.mode(SaveMode.Overwrite).parquet(path)
}

Defining ETL

How about we launch the EtlDefinition case class characterized in spark-daria and utilize the procedure() technique to execute the ETL code.

 

val etl = new EtlDefinition(
sourceDF = extractDF,
transform = model(),
write = exampleWriter()
)

 

Here’s how to execute the ETL code:

etl.process()

 

Investigate the technique marks of the EtlDefinition contentions and ensure you see how the capacities we've characterized fit into this form.

 

case class EtlDefinition(
sourceDF: DataFrame,
transform: (DataFrame => DataFrame),
write: (DataFrame => Unit),
metadata: scala.collection.mutable.Map[String, Any] = scala.collection.mutable.Map[String, Any]()
) {

def process(): Unit = {
write(sourceDF.transform(transform))
}

}

 

 

Don't have an account?