Community Articles

Find and share helpful community-sourced technical articles.
Announcements
Celebrating as our community reaches 100,000 members! Thank you!
Labels (1)
avatar
Cloudera Employee

Incremental Updates:

A Strategy for Appending Records in Hive

As part of the Stinger.next initiative, we expect to see Hive supporting full CRUD operations (Insert, Select, Update, Delete). While some transaction support exists for Hive today, it is still evolving with regard to performance and scalability. In the meantime, customers may still need to work with the traditional options for Hive table integration: OVERWRITE or APPEND.

The OVERWRITE option requires moving the complete record set from source to Hadoop. While this approach may work for smaller data sets, it may be prohibitive with regard to scale.

The APPEND option can limit data movement to only new or updated records. As true Inserts and Updates are not yet available in Hive, we need to consider a process of preventing duplicate records as Updates are appended to the cumulative record set.

In this article, we will look at a strategy for appending Updates and Inserts from delimited and RDBMS sources to existing Hive table definitions. While there are several options within the Hadoop platform for achieving this goal, our focus will be on a process that uses standard SQL within the Hive toolset.

3663-notea.png

Hive provides multiple table definition options – External, Local and View

External Tables are the combination of Hive table definitions and HDFS managed folders and files. The table definition exists independent from the data, so that, if the table is dropped, the HDFS folders and files remain in their original state.

Local Tables are Hive tables that are directly tied to the source data. The data is physically tied to the table definition and will be deleted if the table is dropped.

Views, just as with traditional RDBMS, are stored SQL queries that support the same READ interaction as HIVE tables, yet do not store any data of their own. Instead, the data is stored and sourced from the HIVE tables referenced in the stored SQL query.

The following process outlines a workflow that leverages all of the above in four steps:

3664-picturea.png

The tables and views that will be a part of the Incremental Update Workflow are:

base_table: A HIVE Local table that initially holds all records from the source system. After the initial processing cycle, it will maintain a copy of the most up-to-date synchronized record set from the source. At the end of each processing cycle, it is overwritten by the reporting_table (as explained in the Step 4: Purge).

incremental_table: A HIVE External table that holds the incremental change records (INSERTS and UPDATES) from the source system. At the end of each processing cycle, it is cleared of content (as explained in the Step 4: Purge).

reconcile_view: A HIVE View that combines and reduces the base_table and incremental_table content to show only the most up-to-date records. It is used to populate the reporting_table (as explained in Step 3: Compact).

reporting_table: A HIVE Local table that holds the most up-to-date records for reporting purposes. It is also used to overwrite the base_table at the end of each processing run.

Step 1: Ingest

Depending on whether direct access is available to the RDBMS source system, you may opt for either a File Processing method (when no direct access is available) or RDBMS Processing (when database client access is available).

Regardless of the ingest option, the processing workflow in this article requires:

  1. One-time, initial load to move all data from source table to HIVE.
  2. On-going, “Change Only” data loads from the source table to HIVE.

Below, both File Processing and Database-direct (SQOOP) ingest will be discussed.

File Processing

For this article, we assume that a file or set of files within a folder will have a delimited format and will have been generated from a relational system (i.e. records have unique keys or identifiers).

Files will need to be moved into HDFS using standard ingest options:

  • WebHDFS – Primarily used when integrating with applications, a Web URL provides an Upload end-point into a designated HDFS folder.
  • NFS – Appears as a standard network drive and allows end-users to use standard Copy-Paste operations to move files from standard file systems into HDFS.

Once the initial set of records are moved into HDFS, subsequent scheduled events can move files containing only new Inserts and Updates.

RDBMS Processing

SQOOP is the JDBC-based utility for integrating with traditional databases. A SQOOP Import allows for the movement of data into either HDFS (a delimited format can be defined as part of the Import definition) or directly into a Hive table.

The entire source table can be moved into HDFS or Hive using the “--table” parameter.

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1

After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1--check-column modified_date --incremental lastmodified --last-value {last_import_date}

Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --target-dir /user/hive/incremental_table -m 1 --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’

Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.

Step 2: Reconcile

In order to support an on-going reconciliation between current records in HIVE and new change records, two tables should be defined, base_table and incremental_table.

base_table

The example below shows DDL for the Hive table “base_table” that will include any delimited files located in HDFS under the '/user/hive/base_table' directory. This table will house the initial, complete record load from the source system. After the first processing run, it will house the on-going, most up-to-date set of records from the source system:

CREATE TABLE base_table (

id string,

field1 string,

field2 string,

field3 string,

field4 string,

field5 string,

modified_date string

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ','

LOCATION '/user/hive/base_table';

incremental_table

The example below shows an external Hive table “incremental_table” that will include any delimited files with incremental change records, located in HDFS under the '/user/hive/incremental_append' directory:

CREATE EXTERNAL TABLE incremental_table (

id string,

field1 string,

field2 string,

field3 string,

field4 string,

field5 string,

modified_date string

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ','

LOCATION '/user/hive/incremental_table';

reconcile_view

This view combines record sets from both the Base (base_table) and Change (incremental_table) tables and is reduced to only the most recent records for each unique “id”. This view (reconcile_view) is defined as follows:

CREATE VIEW reconcile_view AS

SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date FROM

(SELECT *,ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) rn

FROM (SELECT * FROM base_table

UNION ALL

SELECT * FROM incremental_table)

t1) t2

WHERE rn = 1;

Example:

The sample data below represents the UNION of both the base_table and incremental_table. Note, there are new updates for “id” values 1 and 2, which are found as the last two records in the table. The record for “id” 3 remains unchanged.

3660-tablea.png

The reconcile_view should only show one record for each unique “id”, based on the latest “modified_date” field value.

The resulting query from “select * from reconcile_view” shows only three records, based on both unique “id” and “modified_date”

3662-tablec.png

Step 3: Compact

The reconcile_view now contains the most up-to-date set of records and is now synchronized with changes from the RDBMS source system. For BI Reporting and Analytical tools, a reporting_table can be generated from the reconcile_view. Before creating this table, any previous instances of the table should be dropped as in the example below.

reporting_table

DROP TABLE reporting_table;

CREATE TABLE reporting_table AS

SELECT * FROM reconcile_view;

Moving the Reconciled View (reconcile_view) to a Reporting Table (reporting_table), reduces the amount of processing needed for reporting queries.

Further, the data stored in the Reporting Table (reporting_table) will also be static; unchanging until the next processing cycle. This provides consistency in reporting between processing cycles. In contrast, the Reconciled View (reconcile_view) is dynamic and will change as soon as new files (holding change records) are added to or removed from the Change table (incremental_table) folder /user/hive/incremental_table.

Step 4: Purge

To prepare for the next series of incremental records from the source, replace the Base table (base_table) with only the most up-to-date records (reporting_table). Also, delete the previously imported Change record content (incremental_table) by deleting the files located in the external table location ('/user/hive/incremental_table').

From a HIVE client:

DROP TABLE base_table;

CREATE TABLE base_table AS

SELECT * FROM reporting_table;

From a HDFS client:

hadoop fs –rm –r /user/hive/incremental_table/*

Final Thoughts:

While there are several possible approaches to supporting incremental data feeds into Hive, this example has a few key advantages:

  • By maintaining an External Table for updates only, the table contents can be refreshed by simply adding or deleting files to that folder.
  • The four steps in the processing cycle (Ingest, Reconcile, Compact and Purge) can be coordinated in a single OOZIE workflow. The OOZIE workflow can be a scheduled event that corresponds to the data freshness SLA (i.e. Daily, Weekly, Montly, etc.)
  • In addition to supporting INSERT and UPDATE synchronization, DELETES can be synchronized by adding either a DELETE_FLAG or DELETE_DATE field to the import source. Then, use this field as a filter in the Hive reporting table to hide deleted records.

Ex.

CREATE VIEW reconcile_view AS

SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date, t2.delete_date FROM

(SELECT *,ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) rn

FROM (SELECT * FROM base_table

UNION ALL

SELECT * FROM incremental_table)

t1) t2

WHERE rn = 1 and delete_date = "";


tableb.png
26,466 Views
Comments

Thanks, we are working on something similar.

I have one question/comment to the 'compact' stage. The execution flow as presented here means that table 'reporting_table' disappears for significant amount of time, before it is filled again. This could break queries running against this table. Is there a way how to make this switch (almost) seamless? It also may require to keep the older data not to break already running queries.

Thanks,

Pavel

avatar
Expert Contributor

base_table is not a exernal table.How we are loading the data into base_table is not clear during first run is not clear?could you please provide input on this?we are not using any load statement or insert into statement?

I think after using below statement.Manually we have to load the data from files present in path:- /user/hive/incremental_table/incremental_table into table base_table

sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail --connection-manager org.apache.sqoop.teradata.TeradataConnManager --username dbc --password dbc --table SOURCE_TBL --target-dir /user/hive/incremental_table -m 1
Version history
Last update:
‎08-17-2019 12:39 PM
Updated by:
Contributors