- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
Created on
03-10-2022
06:41 PM
- edited on
03-10-2022
06:48 PM
by
subratadas
This short example outlines how to configure Cloudera Machine Learning to integrate with the Data lake, Hive, and Iceberg tables.
Apache Iceberg has a number of qualities that make it very suitable for Machine Learning. It maintains the metadata for the data layout alongside the data; this means there are no heavy network demands on the main catalog. The main catalog in this case Hive Metastore maintains a lightweight set of references to the data on the Data lake.
Iceberg also supports snapshots and time-travel so that data can be versioned and queried at a specific
point in time.
Enable parallel processing in Spark
- Setup user credentials and default paths to the Data lake.
spark-defaults.confspark.executor.instances 2 spark.executor.memory 1g spark.executor.cores 2 spark.hadoop.yarn.resourcemanager.principal christopherroyles spark.yarn.access.hadoopFileSystems s3a://demo-aws-go02/
- Setup a Spark session
- Load the appropriate Iceberg JAR
- Add the required SQL extensions
- Configure the pluggable catalog
- Define it as Hive
- Set the paths for both the raw data on the Data lake and the table we will write to in Iceberg.
example.py# ## 1.1 Datalake to Data Warehouse # Load the Raw data in CSV format from the Datalake # into the Data Warehouse, apply a suitable schema and snapshot #.config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:0.12.1") from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("1.1 - Ingest") \ .config("spark.jars","/home/cdsw/libs/iceberg-spark3-runtime-0.9.1.1.13.317211.0-9.jar") \ .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \ .config("spark.sql.catalog.spark_catalog.type","hive") \ .getOrCreate() dl_path = "s3a://demo-aws-go02/user/royles/flightdata.csv" dw_db = "spark_catalog.bronzeDB" dw_table = "flightdata"
- Setup the database and make sure it is placed into the correct catalog to benefit from the Iceberg features.
# Use the Iceberg Catalog spark.sql('SHOW CURRENT NAMESPACE').show() spark.sql('SHOW DATABASES').show() spark.sql("CREATE DATABASE IF NOT EXISTS "+dw_db) spark.sql("USE "+dw_db") spark.sql('SHOW TABLES').show()
- Load some unstructured CSV data from the Data lake as files and infer a schema from the raw data.
- Make sure the column names are compatible with the data warehouse SQL syntax.
"""Read in raw data without a schema""" rawDF = spark.read \ .format("csv") \ .option("inferSchema","true") \ .option("header", "true") \ .option("delimiter",",") \ .option("quote", "\"") \ .option("escape", "\"") \ .load(dl_path) rawDF.printSchema() rawDF.show(2) """Need to normalise the schema""" for name in rawDF.schema.names: rawDF = rawDF.withColumnRenamed(name, name.replace(' ', '_'))
- Write the table to an Iceberg table, overwriting anything that is there in the table already.
- New data inserted will create new point-in-time versions.
"""Write the table out in iceberg+parquet format""" rawDF.write \ .mode("overwrite") \ .format("iceberg") \ .saveAsTable(dw_table) spark.sql('show tables').show() spark.sql('SELECT * FROM %s LIMIT 10'%(dw_table)).show() spark.read.format("iceberg").load('%s.%s.history'%(dw_db,dw_table)).show(20, False) spark.read.format("iceberg").load('%s.%s.snapshots'%(dw_db,dw_table)).show(20, False) spark.read.format("iceberg").load('%s.%s.files'%(dw_db,dw_table)).show(20, False) spark.stop()
- Some simple examples of selecting data at a point in time.
from datetime import datetime # current date and time now = datetime.now() timestamp = datetime.timestamp(now) print("timestamp =", timestamp) # Timestamps can be tricky. Please make sure to round your timestamp as shown below. # Query using a point in time df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load('%s.%s'%(dw_db,dw_table)) df.show(100)
This is a simple hello-world example to get you started using Iceberg table formats with Spark3.
For further reading, refer to:
Introducing Apache Iceberg in Cloudera Data Platform
Apache Iceberg
Cloudera's Distribution of Spark3