Created on 03-10-2022 06:41 PM - edited on 03-10-2022 06:48 PM by subratadas
This short example outlines how to configure Cloudera Machine Learning to integrate with the Data lake, Hive, and Iceberg tables.
Apache Iceberg has a number of qualities that make it very suitable for Machine Learning. It maintains the metadata for the data layout alongside the data; this means there are no heavy network demands on the main catalog. The main catalog in this case Hive Metastore maintains a lightweight set of references to the data on the Data lake.
Iceberg also supports snapshots and time-travel so that data can be versioned and queried at a specific
point in time.
Enable parallel processing in Spark
spark.executor.instances 2
spark.executor.memory 1g
spark.executor.cores 2
spark.hadoop.yarn.resourcemanager.principal christopherroyles
spark.yarn.access.hadoopFileSystems s3a://demo-aws-go02/
#
## 1.1 Datalake to Data Warehouse
# Load the Raw data in CSV format from the Datalake
# into the Data Warehouse, apply a suitable schema and snapshot
#.config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:0.12.1")
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("1.1 - Ingest") \
.config("spark.jars","/home/cdsw/libs/iceberg-spark3-runtime-0.9.1.1.13.317211.0-9.jar") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type","hive") \
.getOrCreate()
dl_path = "s3a://demo-aws-go02/user/royles/flightdata.csv"
dw_db = "spark_catalog.bronzeDB"
dw_table = "flightdata"
# Use the Iceberg Catalog
spark.sql('SHOW CURRENT NAMESPACE').show()
spark.sql('SHOW DATABASES').show()
spark.sql("CREATE DATABASE IF NOT EXISTS "+dw_db)
spark.sql("USE "+dw_db")
spark.sql('SHOW TABLES').show()
"""Read in raw data without a schema"""
rawDF = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header", "true") \
.option("delimiter",",") \
.option("quote", "\"") \
.option("escape", "\"") \
.load(dl_path)
rawDF.printSchema()
rawDF.show(2)
"""Need to normalise the schema"""
for name in rawDF.schema.names:
rawDF = rawDF.withColumnRenamed(name, name.replace(' ', '_'))
"""Write the table out in iceberg+parquet format"""
rawDF.write \
.mode("overwrite") \
.format("iceberg") \
.saveAsTable(dw_table)
spark.sql('show tables').show()
spark.sql('SELECT * FROM %s LIMIT 10'%(dw_table)).show()
spark.read.format("iceberg").load('%s.%s.history'%(dw_db,dw_table)).show(20, False)
spark.read.format("iceberg").load('%s.%s.snapshots'%(dw_db,dw_table)).show(20, False)
spark.read.format("iceberg").load('%s.%s.files'%(dw_db,dw_table)).show(20, False)
spark.stop()
from datetime import datetime
# current date and time
now = datetime.now()
timestamp = datetime.timestamp(now)
print("timestamp =", timestamp)
# Timestamps can be tricky. Please make sure to round your timestamp as shown below.
# Query using a point in time
df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load('%s.%s'%(dw_db,dw_table))
df.show(100)
This is a simple hello-world example to get you started using Iceberg table formats with Spark3.
For further reading, refer to:
Introducing Apache Iceberg in Cloudera Data Platform
Apache Iceberg
Cloudera's Distribution of Spark3