Created on 11-21-2016 09:37 PM
The scope of this article is to explore ways to simulate Slowly changing dimension (SCD-Type2), given the append-only nature of HAWQ, the idea is to capture delta from source systems and have a flag in each row which shows whether the row is A-Append, U-Update or D-Delete, this will ensure that we can capture this information in our base tables in HAWQ. In an Enterprise Data Lake, you will come across scenarios where you have to capture updates/deletes from Source Systems, since HAWQ is append-only, we do need an alternative way to simulate this, which we discuss in this article. For each source table, we will create a corresponding base table, and then create view on top of it for access. The base table will have 2 additional attributes “update_version” and “delete_flag”, the “update_version” of type “bigserial” which will be an auto increment number which will track which version of the row is the latest, largest number would signify most recent record. The “delete_flag” would determine that the row has been deleted, older versions will still be retained and the base table would have the history of all changes mades to the record in the source system. The script create table DDL with parquet orientation and snappy compression (these are used to have a good balance of performance and compression) , these can be modified overall or for individual tables as required.
-- Create Base Table -- DROP TABLE IF EXISTS customer_table CASCADE; CREATE TABLE base_table ( update_version BIGSERIAL, delete BOOLEAN DEFAULT FALSE, key VARCHAR(255), fieldA text, fieldB text, fieldC text ) WITH (APPENDONLY=TRUE, ORIENTATION=PARQUET, COMPRESSTYPE=SNAPPY); -- Create View on Base Table CREATE OR REPLACE VIEW customer_view AS SELECT key, fieldA, fieldB, fieldC FROM (SELECT DISTINCT ON (key) key, delete, fieldA, fieldB, fieldC FROM customer_table ORDER BY key, update_version DESC, delete, fieldA, fieldB, fieldC) AS latest WHERE delete IS FALSE;
In this article we will take SQL Server as an example as a source system, the first step is to create tables in HAWQ from SQL Server, usually this goes into hundreds and sometime couple of thousand tables. The helper T-SQL script below reads the source tables and converts the data types to HAWQ data types and generates the DDL, this will help migrate the table DDL from SQL Server to HAWQ.
-- The Script uses a metadata table in the public schema to capture the primary key information from SQL Server, create this table before executing the view generation script CREATE TABLE public.metadata( schemaName text, tableName text, key text );
Generate HAWQ DDL from SQL Server Tables using T-SQL Script.
if exists (select * from sys.objects where type = 'P' AND name = 'generate_HAWQ_DDL') drop procedure generate_HAWQ_DDL; GO create procedure generate_HAWQ_DDL as DECLARE @tblName varchar(100), @SchemaName varchar(100),@ColName varchar(200), @ColType varchar(200), @i int,@KeyCol varchar(200), @KeyFull varchar(2000) PRINT '---Starting DDL Generation ---' PRINT 'CREATE TABLE public.metadata(schemaname text, tablename text, key text);' DECLARE tbl_cursor CURSOR FOR select table_schema, table_name from information_schema.tables where TABLE_TYPE = 'BASE TABLE' OPEN tbl_cursor FETCH NEXT FROM tbl_cursor INTO @SchemaName, @tblName WHILE @@FETCH_STATUS = 0 BEGIN PRINT 'CREATE TABLE ' + @SchemaName + '.' + @tblName + '(' PRINT ' update_version BIGSERIAL,' PRINT ' delete_flag BOOLEAN DEFAULT FALSE,' DECLARE col_cursor CURSOR FOR SELECT sub.column_name, CASE WHEN sub.datatype = 'char' THEN 'character' WHEN sub.datatype = 'nchar' THEN 'character' WHEN sub.datatype = 'datetime' THEN 'timestamp' WHEN sub.datatype = 'datetime2' THEN 'timestamp' WHEN sub.datatype = 'datetimeoffset' THEN 'timestamptz' WHEN sub.datatype = 'decimal' THEN 'numeric' WHEN sub.datatype = 'float' THEN 'float8' WHEN sub.datatype = 'real' THEN 'float8' WHEN sub.datatype = 'int' THEN 'integer' WHEN sub.datatype = 'bit' THEN 'boolean' WHEN sub.datatype = 'nvarchar' THEN 'varchar' WHEN sub.datatype = 'smalldatetime' THEN 'timestamp' WHEN sub.datatype = 'smallmoney' THEN 'numeric' WHEN sub.datatype = 'money' THEN 'numeric' WHEN sub.datatype = 'sysname' THEN 'varchar' WHEN sub.datatype = 'tinyint' THEN 'smallint' WHEN sub.datatype = 'uniqueidentifier' THEN 'varchar(36)' ELSE sub.datatype END + CASE WHEN sub.datatype in ('nchar', 'char', 'varchar', 'nvarchar', 'sysname') AND sub.length <> -1 THEN '(' + cast(sub.length as varchar) + ')' ELSE '' END as datatype FROM (SELECT REPLACE(REPLACE(LOWER(sc.name), '\"', ''), '.', '_') column_name, st.name as datatype, sc.max_length as length, sc.column_id FROM sys.objects so JOIN sys.columns sc ON so.object_id = sc.object_id JOIN sys.schemas su ON so.schema_id = su.schema_id JOIN sys.types st ON sc.system_type_id = st.system_type_id AND st.system_type_id = st.user_type_id WHERE so.type in ('U', 'V') AND su.name = @SchemaName AND so.name = @tblName) sub WHERE sub.datatype not in ('binary', 'image', 'timestamp', 'xml', 'varbinary', 'text', 'ntext', 'sql_variant', 'hierarchyid') ORDER BY sub.column_id OPEN col_cursor SET @i = 0 FETCH NEXT FROM col_cursor INTO @ColName, @ColType WHILE @@FETCH_STATUS = 0 BEGIN IF (@i = 0 ) PRINT @ColName + ' ' + @ColType ELSE PRINT ',' + @ColName + ' ' + @ColType FETCH NEXT FROM col_cursor INTO @ColName, @ColType SET @i= @i + 1 END CLOSE col_cursor DEALLOCATE col_cursor PRINT ') ' PRINT ' WITH (APPENDONLY=TRUE, ORIENTATION=parquet, COMPRESSTYPE=snappy);' DECLARE key_cursor CURSOR FOR SELECT LOWER(c.name) as COLUMN_NAME FROM sys.objects o JOIN sys.schemas s ON o.schema_id = s.schema_id JOIN sys.indexes i ON o.object_id = i.object_id JOIN sys.index_columns ic ON o.object_id = ic.object_id AND i.index_id = ic.index_id JOIN sys.columns c ON o.object_id = c.object_id AND ic.column_id = c.column_id WHERE o.name = @tblName AND s.name = @SchemaName AND i.is_primary_key = 1 ORDER BY ic.key_ordinal OPEN key_cursor FETCH NEXT FROM key_cursor INTO @KeyCol SET @i = 0; SET @KeyFull = ''; WHILE @@FETCH_STATUS = 0 BEGIN if (@i = 0) SET @KeyFull = @KeyCol else SET @KeyFull = @KeyFull + ',' + @KeyCol SET @i = @i + 1 FETCH NEXT FROM key_cursor INTO @KeyCol END PRINT ' INSERT INTO public.metadata (schemaName, tableName, key) VALUES(''' + @SchemaName + ''',''' + @tblName + ''','''+ @KeyFull + ''');' CLOSE key_cursor DEALLOCATE key_cursor FETCH NEXT FROM tbl_cursor INTO @SchemaName, @tblName END CLOSE tbl_cursor DEALLOCATE tbl_cursor GO; -- Execute the Procedure --- EXEC generate_HAWQ_DDL;
Once the base tables have all been create in HAWQ, when the source data is being inserted into HAWQ, we need to make sure we update the delete flag to true or false based on the source CDC operation, and let the update_version bigserial auto-increment.
Don’t forget to analyze the base tables once the data is populated and over time after more than 10% of the table has changed.
Below is the helper script which will generate the views on top of the base tables, execute this on HAWQ, using either psql or pgAdmin. This will generate a script with the view DDL’s, execute the output script to create all the views. We’ve added a prefix “_table” to all base tables so that the views correspond to the source system table name, so that changes to SQL queries are minimized.
Helper PL/PGSQL Script to Generate Views on top of Base Tables
create or replace function gen_meta_cdc4() returns void as $body$ declare SQL text; kv RECORD; cv RECORD; col_list text; i int; mk text; begin FOR kv IN select * from information_schema.tables WHERE table_schema NOT IN ('pg_catalog','information_schema','hawq_toolkit') AND table_type = 'BASE TABLE' LOOP col_list := ''; FOR cv IN SELECT column_name FROM information_schema.columns X WHERE x.table_schema = kv.table_schema and x.table_name = kv.table_name LOOP col_list := col_list || ',' || cv.column_name; END LOOP; --RAISE NOTICE 'Column List:% for Table:% ', kv.table_name, col_list ; SELECT key into mk FROM public.metadata WHERE tablename = kv.table_name AND schemaname = kv.table_schema ; RAISE NOTICE 'CREATE OR REPLACE VIEW %.%_view AS SELECT % FROM (SELECT DISTINCT ON (%) % FROM % ORDER BY % , %) AS latest WHERE delete_flag IS FALSE ;',kv.table_schema,kv.table_name, substring(col_list, 2),mk, substring(col_list, 2), kv.table_name, mk, substring(col_list, 2); END LOOP; end $body$ LANGUAGE 'plpgsql'; -- SELECT gen_meta_cdc4();
- Credits to Antonio Petrole, for the view generation script and Micheal Andre Pearce on the design