Community Articles

Find and share helpful community-sourced technical articles.
avatar
Contributor

This short example outlines how to configure Cloudera Machine Learning to integrate with the Data lake, Hive, and Iceberg tables.
Apache Iceberg has a number of qualities that make it very suitable for Machine Learning. It maintains the metadata for the data layout alongside the data; this means there are no heavy network demands on the main catalog. The main catalog in this case Hive Metastore maintains a lightweight set of references to the data on the Data lake.

 

Iceberg also supports snapshots and time-travel so that data can be versioned and queried at a specific

point in time.

 
Enable parallel processing in Spark

 

  1. Setup user credentials and default paths to the Data lake.
    spark-defaults.conf
    spark.executor.instances  2
    spark.executor.memory  1g
    spark.executor.cores  2
    
    spark.hadoop.yarn.resourcemanager.principal  christopherroyles
    spark.yarn.access.hadoopFileSystems     s3a://demo-aws-go02/
  2. Setup a Spark session
  3. Load the appropriate Iceberg JAR
  4. Add the required SQL extensions
  5. Configure the pluggable catalog
  6. Define it as Hive
  7. Set the paths for both the raw data on the Data lake and the table we will write to in Iceberg.
    example.py
    #
    ## 1.1 Datalake to Data Warehouse 
    #  Load the Raw data in CSV format from the Datalake
    #  into the Data Warehouse, apply a suitable schema and snapshot
    #.config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:0.12.1")
    
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
      .appName("1.1 - Ingest") \
      .config("spark.jars","/home/cdsw/libs/iceberg-spark3-runtime-0.9.1.1.13.317211.0-9.jar") \
      .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
      .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
      .config("spark.sql.catalog.spark_catalog.type","hive") \
      .getOrCreate()
      
    dl_path = "s3a://demo-aws-go02/user/royles/flightdata.csv"
    dw_db = "spark_catalog.bronzeDB"
    dw_table = "flightdata"
  8. Setup the database and make sure it is placed into the correct catalog to benefit from the Iceberg features.
    # Use the Iceberg Catalog
    spark.sql('SHOW CURRENT NAMESPACE').show()
    spark.sql('SHOW DATABASES').show()
    
    spark.sql("CREATE DATABASE IF NOT EXISTS "+dw_db)
    spark.sql("USE "+dw_db")
    spark.sql('SHOW TABLES').show()
  9. Load some unstructured CSV data from the Data lake as files and infer a schema from the raw data.
  10. Make sure the column names are compatible with the data warehouse SQL syntax.
    """Read in raw data without a schema"""
    rawDF = spark.read \
      .format("csv") \
      .option("inferSchema","true") \
      .option("header", "true") \
      .option("delimiter",",") \
      .option("quote", "\"") \
      .option("escape", "\"") \
      .load(dl_path)
    
    rawDF.printSchema()
    rawDF.show(2)
    
    """Need to normalise the schema"""
    for name in rawDF.schema.names:
      rawDF = rawDF.withColumnRenamed(name, name.replace(' ', '_'))
  11. Write the table to an Iceberg table, overwriting anything that is there in the table already.
  12. New data inserted will create new point-in-time versions.
    """Write the table out in iceberg+parquet format"""
    rawDF.write \
      .mode("overwrite") \
      .format("iceberg") \
      .saveAsTable(dw_table)
      
    spark.sql('show tables').show()
    spark.sql('SELECT * FROM %s  LIMIT 10'%(dw_table)).show()
    
    spark.read.format("iceberg").load('%s.%s.history'%(dw_db,dw_table)).show(20, False)
    spark.read.format("iceberg").load('%s.%s.snapshots'%(dw_db,dw_table)).show(20, False)
    spark.read.format("iceberg").load('%s.%s.files'%(dw_db,dw_table)).show(20, False)
    
    spark.stop()
  13. Some simple examples of selecting data at a point in time.
    from datetime import datetime
    # current date and time
    now = datetime.now()
    
    timestamp = datetime.timestamp(now)
    print("timestamp =", timestamp)
    
    # Timestamps can be tricky. Please make sure to round your timestamp as shown below.
    # Query using a point in time
    
    df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load('%s.%s'%(dw_db,dw_table))
    df.show(100)

 

 

This is a simple hello-world example to get you started using Iceberg table formats with Spark3.

For further reading, refer to:
Introducing Apache Iceberg in Cloudera Data Platform
Apache Iceberg
Cloudera's Distribution of Spark3

1,022 Views