Member since
05-16-2016
26
Posts
20
Kudos Received
0
Solutions
07-27-2020
11:04 PM
@mqadri Thank you for sharing very useful information. I tried configuring Hive LLAP daemons with YARN Node labelling in HDP3 by following your post. But LLAP daemons not running on YARN node label, Instead LLAP daemons running on another nodes which are not part of Node label..(there was no errors in RM and HiveInteractive server) Is HDP3 has different Node label setup for LLAP daemons? Could you please guide me? Thanks in advance!
... View more
12-20-2018
01:36 PM
hi Muji, Great job 🙂 just missing a ',' after : B_df("_c1").cast(StringType).as("S_STORE_ID") // Assign column names to the Region dataframe
val storeDF = B_df.select( B_df("_c0").cast(IntegerType).as("S_STORE_SK"), B_df("_c1").cast(StringType).as("S_STORE_ID"), B_df("_c5").cast(StringType).as("S_STORE_NAME")
)
... View more
06-29-2017
05:48 PM
import re
from pyspark.sql import Row
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'
# Returns a dictionary containing the parts of the Apache Access Log.
def parse_apache_log_line(logline):
match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
if match is None:
# Optionally, you can change this to just ignore if each line of data is not critical.
# For this example, we want to ensure that the format is consistent.
raise Exception("Invalid logline: %s" % logline)
return Row(
ipAddress = match.group(1),
clientIdentd = match.group(2),
userId = match.group(3),
dateTime = match.group(4),
method = match.group(5),
endpoint = match.group(6),
protocol = match.group(7),
responseCode = int(match.group(8)),
contentSize = long(match.group(9)))
log_files = "hdfs://dataset/apache_logs/"
raw_log_files = sc.textFile(log_files)
raw_log_files.count()
parsed_log_files = raw_log_files.map(parse_apache_log_line)
parsed_log_files.toDF().registerTempTable("log_data")
%scala
// HELPER FUNCTION - Register ParseDate UDF to use with queries later
def parseDate(rawDate:String):Long = {
val dtParser = new java.text.SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss")
val splitted = rawDate.split(" ")
val futureDt = splitted(0)
val offset = splitted(1).asInstanceOf[String].toLong
val hourOffset = (offset.toInt / 100)
val minuteOffset = (offset - hourOffset * 100).toInt
val totalOffset = hourOffset * 60 * 60 + minuteOffset * 60
(dtParser.parse(futureDt).getTime() / 1000) + totalOffset
}
val example = "21/Jun/2014:10:00:00 -0730"
parseDate(example)
sqlContext.udf.register("parseDate", parseDate(_:String):Long)
%sql
-- Used the parsed logs and the date helper UDF to execute SQL Query
select responseCode, ipAddress, to_date(cast(parseDate(dateTime) as timestamp)) as date, count(*) as NoOfRequests, sum(contentSize) as TotalContentSize
from log_data
group by responseCode,ipAddress,to_date(cast(parseDate(dateTime) as timestamp))
order by count(*) desc
... View more
Labels:
11-21-2016
09:37 PM
3 Kudos
The scope of this article is to explore ways to simulate Slowly changing dimension (SCD-Type2), given the append-only nature of HAWQ, the idea is to capture delta from source systems and have a flag in each row which shows whether the row is A-Append, U-Update or D-Delete, this will ensure that we can capture this information in our base tables in HAWQ.
In an Enterprise Data Lake, you will come across scenarios where you have to capture updates/deletes from Source Systems, since HAWQ is append-only, we do need an alternative way to simulate this, which we discuss in this article.
For each source table, we will create a corresponding base table, and then create view on top of it for access. The base table will have 2 additional attributes “update_version” and “delete_flag”, the “update_version” of type “bigserial” which will be an auto increment number which will track which version of the row is the latest, largest number would signify most recent record. The “delete_flag” would determine that the row has been deleted, older versions will still be retained and the base table would have the history of all changes mades to the record in the source system. The script create table DDL with parquet orientation and snappy compression (these are used to have a good balance of performance and compression) , these can be modified overall or for individual tables as required.
-- Create Base Table --
DROP TABLE IF EXISTS customer_table CASCADE;
CREATE TABLE base_table (
update_version BIGSERIAL,
delete BOOLEAN DEFAULT FALSE,
key VARCHAR(255),
fieldA text,
fieldB text,
fieldC text
)
WITH (APPENDONLY=TRUE, ORIENTATION=PARQUET, COMPRESSTYPE=SNAPPY);
-- Create View on Base Table
CREATE OR REPLACE VIEW customer_view AS SELECT key, fieldA, fieldB, fieldC FROM (SELECT DISTINCT ON (key)
key, delete, fieldA, fieldB, fieldC
FROM customer_table
ORDER BY key, update_version DESC, delete, fieldA, fieldB, fieldC) AS latest WHERE delete IS FALSE;
In this article we will take SQL Server as an example as a source system, the first step is to create tables in HAWQ from SQL Server, usually this goes into hundreds and sometime couple of thousand tables. The helper T-SQL script below reads the source tables and converts the data types to HAWQ data types and generates the DDL, this will help migrate the table DDL from SQL Server to HAWQ. -- The Script uses a metadata table in the public schema to capture the primary key information from SQL Server, create this table before executing the view generation script
CREATE TABLE public.metadata(
schemaName text,
tableName text,
key text
); Generate HAWQ DDL from SQL Server Tables using T-SQL Script.
if exists (select * from sys.objects where type = 'P' AND name = 'generate_HAWQ_DDL')
drop procedure generate_HAWQ_DDL;
GO
create procedure generate_HAWQ_DDL
as
DECLARE @tblName varchar(100), @SchemaName varchar(100),@ColName varchar(200), @ColType varchar(200), @i int,@KeyCol varchar(200), @KeyFull varchar(2000)
PRINT '---Starting DDL Generation ---'
PRINT 'CREATE TABLE public.metadata(schemaname text, tablename text, key text);'
DECLARE tbl_cursor CURSOR FOR
select table_schema, table_name
from information_schema.tables
where TABLE_TYPE = 'BASE TABLE'
OPEN tbl_cursor
FETCH NEXT FROM tbl_cursor INTO @SchemaName, @tblName
WHILE @@FETCH_STATUS = 0
BEGIN
PRINT 'CREATE TABLE ' + @SchemaName + '.' + @tblName + '('
PRINT ' update_version BIGSERIAL,'
PRINT ' delete_flag BOOLEAN DEFAULT FALSE,'
DECLARE col_cursor CURSOR FOR
SELECT sub.column_name,
CASE WHEN sub.datatype = 'char' THEN 'character'
WHEN sub.datatype = 'nchar' THEN 'character'
WHEN sub.datatype = 'datetime' THEN 'timestamp'
WHEN sub.datatype = 'datetime2' THEN 'timestamp'
WHEN sub.datatype = 'datetimeoffset' THEN 'timestamptz'
WHEN sub.datatype = 'decimal' THEN 'numeric'
WHEN sub.datatype = 'float' THEN 'float8'
WHEN sub.datatype = 'real' THEN 'float8'
WHEN sub.datatype = 'int' THEN 'integer'
WHEN sub.datatype = 'bit' THEN 'boolean'
WHEN sub.datatype = 'nvarchar' THEN 'varchar'
WHEN sub.datatype = 'smalldatetime' THEN 'timestamp'
WHEN sub.datatype = 'smallmoney' THEN 'numeric'
WHEN sub.datatype = 'money' THEN 'numeric'
WHEN sub.datatype = 'sysname' THEN 'varchar'
WHEN sub.datatype = 'tinyint' THEN 'smallint'
WHEN sub.datatype = 'uniqueidentifier' THEN 'varchar(36)'
ELSE sub.datatype END + CASE WHEN sub.datatype in ('nchar', 'char', 'varchar', 'nvarchar', 'sysname')
AND sub.length <> -1 THEN '(' + cast(sub.length as varchar) + ')'
ELSE '' END as datatype
FROM (SELECT REPLACE(REPLACE(LOWER(sc.name), '\"', ''), '.', '_') column_name,
st.name as datatype,
sc.max_length as length,
sc.column_id
FROM sys.objects so
JOIN sys.columns sc ON so.object_id = sc.object_id
JOIN sys.schemas su ON so.schema_id = su.schema_id
JOIN sys.types st ON sc.system_type_id = st.system_type_id
AND st.system_type_id = st.user_type_id
WHERE so.type in ('U', 'V')
AND su.name = @SchemaName
AND so.name = @tblName) sub
WHERE sub.datatype not in ('binary', 'image', 'timestamp', 'xml', 'varbinary', 'text', 'ntext', 'sql_variant', 'hierarchyid')
ORDER BY sub.column_id
OPEN col_cursor
SET @i = 0
FETCH NEXT FROM col_cursor INTO @ColName, @ColType
WHILE @@FETCH_STATUS = 0
BEGIN
IF (@i = 0 )
PRINT @ColName + ' ' + @ColType
ELSE
PRINT ',' + @ColName + ' ' + @ColType
FETCH NEXT FROM col_cursor INTO @ColName, @ColType
SET @i= @i + 1
END
CLOSE col_cursor
DEALLOCATE col_cursor
PRINT ') '
PRINT ' WITH (APPENDONLY=TRUE, ORIENTATION=parquet, COMPRESSTYPE=snappy);'
DECLARE key_cursor CURSOR FOR
SELECT LOWER(c.name) as COLUMN_NAME
FROM sys.objects o
JOIN sys.schemas s ON o.schema_id = s.schema_id
JOIN sys.indexes i ON o.object_id = i.object_id
JOIN sys.index_columns ic ON o.object_id = ic.object_id AND i.index_id = ic.index_id
JOIN sys.columns c ON o.object_id = c.object_id AND ic.column_id = c.column_id
WHERE o.name = @tblName
AND s.name = @SchemaName
AND i.is_primary_key = 1
ORDER BY ic.key_ordinal
OPEN key_cursor
FETCH NEXT FROM key_cursor INTO @KeyCol
SET @i = 0;
SET @KeyFull = '';
WHILE @@FETCH_STATUS = 0
BEGIN
if (@i = 0)
SET @KeyFull = @KeyCol
else
SET @KeyFull = @KeyFull + ',' + @KeyCol
SET @i = @i + 1
FETCH NEXT FROM key_cursor INTO @KeyCol
END
PRINT ' INSERT INTO public.metadata (schemaName, tableName, key) VALUES(''' + @SchemaName +
''',''' + @tblName + ''','''+ @KeyFull + ''');'
CLOSE key_cursor
DEALLOCATE key_cursor
FETCH NEXT FROM tbl_cursor INTO @SchemaName, @tblName
END
CLOSE tbl_cursor
DEALLOCATE tbl_cursor
GO;
-- Execute the Procedure ---
EXEC generate_HAWQ_DDL;
Once the base tables have all been create in HAWQ, when the source data is being inserted into HAWQ, we need to make sure we update the delete flag to true or false based on the source CDC operation, and let the update_version bigserial auto-increment. Don’t forget to analyze the base tables once the data is populated and over time after more than 10% of the table has changed. Below is the helper script which will generate the views on top of the base tables, execute this on HAWQ, using either psql or pgAdmin. This will generate a script with the view DDL’s, execute the output script to create all the views. We’ve added a prefix “_table” to all base tables so that the views correspond to the source system table name, so that changes to SQL queries are minimized. Helper PL/PGSQL Script to Generate Views on top of Base Tables create or replace function gen_meta_cdc4()
returns void as
$body$
declare
SQL text;
kv RECORD;
cv RECORD;
col_list text;
i int;
mk text;
begin
FOR kv IN select * from information_schema.tables WHERE table_schema NOT IN ('pg_catalog','information_schema','hawq_toolkit') AND table_type = 'BASE TABLE' LOOP
col_list := '';
FOR cv IN SELECT column_name
FROM information_schema.columns X
WHERE x.table_schema = kv.table_schema and x.table_name = kv.table_name LOOP
col_list := col_list || ',' || cv.column_name;
END LOOP;
--RAISE NOTICE 'Column List:% for Table:% ', kv.table_name, col_list ;
SELECT key into mk FROM public.metadata WHERE tablename = kv.table_name AND schemaname = kv.table_schema ;
RAISE NOTICE 'CREATE OR REPLACE VIEW %.%_view AS SELECT % FROM (SELECT DISTINCT ON (%) % FROM % ORDER BY % , %) AS latest WHERE delete_flag IS FALSE ;',kv.table_schema,kv.table_name, substring(col_list, 2),mk, substring(col_list, 2), kv.table_name, mk, substring(col_list, 2);
END LOOP;
end
$body$
LANGUAGE 'plpgsql';
-- SELECT gen_meta_cdc4();
- Credits to Antonio Petrole, for the view generation script and Micheal Andre Pearce on the design
... View more
08-26-2016
06:52 PM
1 Kudo
What came up in a customer setting was the suggestion to not even use multitenancy. Instead, set up one shared realm so users can authenticate on all clusters. Then, assign access privileges using Ranger. In older HDP versions this would create the problem of sharing the service principals and technical user accounts across clusters. I believe this has largely been solved in HDP 2.3.
... View more