Member since
09-23-2015
19
Posts
5
Kudos Received
0
Solutions
02-29-2024
10:24 PM
How about submitting a CDE job from CML to a private cloud base cluster?
... View more
10-17-2023
10:16 PM
Install
Install Rust from Install Rust
This is interactive so needs to run in a terminal curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
Hello World
Rust Hello World https://doc.rust-lang.org/book/ch01-02-hello-world.html
Reload your session and terminal to update the PATH env
Run rustc main.rs
main.rs
…
Why Rust?
Original Post from Stack Overflow: what is rust and why is it so popular
Rust has been Stack Overflow’s most loved language for four years in a row, indicating that many of those who have had the opportunity to use Rust have fallen in love with it. However, the roughly 97% of survey respondents who haven’t used Rust may wonder, “What’s the deal with Rust?”
Rust’s static typing does its best to get out of the programmer’s way while encouraging long-term maintainability.
Rust gives you the choice of storing data on the stack or the heap and determines at compile time when memory is no longer needed and can be cleaned up.
The biggest benefit Rust can provide compared to these languages is the borrow checker.
The Rust experience is larger than a language specification and a compiler; many aspects of creating and maintaining production-quality software are treated as first-class citizens.
... View more
Labels:
10-04-2023
02:08 PM
Possibly. Adding an additional CPU, may have license cost implications. A node (such as your CM host) can have upto 16 CCUs. A CCU is the total number of CPU cores / 6. So if the cores are reported across all installed CPUs in a node as 96 or less then no core based variable cost would be incurred. Note: Generally you take the total cores / memory across the whole cluster as part of the calculation so this provides some flexibility when adding a CPU to a single server.
... View more
10-03-2023
08:12 AM
@Kiranq, What I found is that the JoltTransformRecord expects only single record to work with hence the name. I noticed when I try to pass an array I was getting the error "...error transforming the first record", however if I pass just one json record it works. If you have an array of json\csv and you are looking to split and process each record individually then I would suggest that you split the records before the JoltTransformRecord. If you dont want to split the array then I recommend using JoltTranformJson first and then use Convert Record processor to convert to CSV.
... View more
05-06-2022
09:32 AM
Great article! With the introduction of the OpDB experience (aka COD), there are some worthwhile updates to make here. Namely, there is a direct download link to the Phoenix Thin client, and also the Phoenix Thin URL is provided for you (no need to determine the knox host). With that comes a complication on the DBeaver side. Easiest thing to do is to blank out the URL Template, which allows you to directly paste the Thin URL provided by COD, making sure to update your workload password. Alternatively, you can parameterize the URL & secure the password by changing the URL template to look like this, along with unchecking the "No Authentication" box. jdbc:phoenix:thin:url={host};avatica_user={user};avatica_password={password} The complication is that you would need to only copy a portion of the URL up to the user section to get the host.
... View more
03-28-2022
09:34 PM
1 Kudo
in August 2021, we released CDP Private Cloud Base 7.1.7 LTS. This article will outline what that means. For each software release, we provide a version, General Availability (GA) date, and End of Service (EoS) date. These are all published and described here. CDP Private Cloud Base 7.1.7 LTS has a GA date of August 2021 and an EoS date of August 2025, providing a 4-year support window. We then offer two paths, Service Pack Train The Service Pack train is for customers who value a stable platform, service a lot of users or use-cases, and want a low risk, low costs in maintenance and operations. Rather than taking the new feature release, customers can instead take regular Cumulative Hotfix (CHF)*. These hotfixes are released pro-actively to address break-fix issues and CVEs. At certain pre-planned release date, the hotfixes will be rolled up into a Service Pack (e.g. SP1)**. * Cumulative Hotfix means each hotfix includes those that came before up to the initial release or prior service pack, if any. ** It is important to note that service packs need to be installed in order (for example, SP1 + SP2). Service pack releases will not change the config, depreciate APIs, change schemas or break backward compatibility. Feature Release Train The Feature Release train is for customers who want the latest features and capabilities available from Cloudera. There may be some additional overhead in operations and systems testing. From 7.1.7, you will have the option of upgrading to 7.1.8 when it is released. 7.1.8 will include new features and these features may require application re-testing. However, it does mean you get the latest features and innovations in the platform sooner. Moving between trains If you would like to move between the trains, you can do so when a release is marked LTS. Both the Support and Feature trains can be upgraded via hotfixes. Generally upgrading from the Support train to the Feature train will require an upgrade compatible base and associated hotfixes to be in place. Detailed information on supported upgrade paths and versions will be maintained online CDP Private Cloud Base Upgrade and CDP In-Place Upgrade Paths The main objective is to offer flexibility in managing upgrades, reduce overall customer costs in operations and system testing, while reducing the number of platform variations we maintain going forward. Update - CDP Base 7.1.7, Service Pack 1 was released on 31st March 2022 It is described as "The LTS release provides a stable version of the platform to be supported for 4 years. There will be periodic cumulative hotfixes released to address any vulnerabilities or bug fixes to ensure this version remains secure, however, there won’t be any major changes or new features that require significant testing. Customers looking to utilize the latest feature set and innovations that Cloudera releases on CDP should consider upgrading to our regular Feature Release cycle. Feature Releases are released more frequently and are typically supported for 18 months. Service Pack 1 contains all the hotfixes from previously released cumulative hotfixes as well as additional CVE, security updates, critical bug fixes, and minor certifications. This addresses 45 CVE, Security and Critical Bug Fixes."
... View more
03-10-2022
06:41 PM
1 Kudo
This short example outlines how to configure Cloudera Machine Learning to integrate with the Data lake, Hive, and Iceberg tables. Apache Iceberg has a number of qualities that make it very suitable for Machine Learning. It maintains the metadata for the data layout alongside the data; this means there are no heavy network demands on the main catalog. The main catalog in this case Hive Metastore maintains a lightweight set of references to the data on the Data lake.
Iceberg also supports snapshots and time-travel so that data can be versioned and queried at a specific
point in time.
Enable parallel processing in Spark
Setup user credentials and default paths to the Data lake. spark-defaults.conf spark.executor.instances 2
spark.executor.memory 1g
spark.executor.cores 2
spark.hadoop.yarn.resourcemanager.principal christopherroyles
spark.yarn.access.hadoopFileSystems s3a://demo-aws-go02/
Setup a Spark session
Load the appropriate Iceberg JAR
Add the required SQL extensions
Configure the pluggable catalog
Define it as Hive
Set the paths for both the raw data on the Data lake and the table we will write to in Iceberg. example.py #
## 1.1 Datalake to Data Warehouse
# Load the Raw data in CSV format from the Datalake
# into the Data Warehouse, apply a suitable schema and snapshot
#.config("spark.jars.packages","org.apache.iceberg:iceberg-spark3-runtime:0.12.1")
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("1.1 - Ingest") \
.config("spark.jars","/home/cdsw/libs/iceberg-spark3-runtime-0.9.1.1.13.317211.0-9.jar") \
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type","hive") \
.getOrCreate()
dl_path = "s3a://demo-aws-go02/user/royles/flightdata.csv"
dw_db = "spark_catalog.bronzeDB"
dw_table = "flightdata"
Setup the database and make sure it is placed into the correct catalog to benefit from the Iceberg features. # Use the Iceberg Catalog
spark.sql('SHOW CURRENT NAMESPACE').show()
spark.sql('SHOW DATABASES').show()
spark.sql("CREATE DATABASE IF NOT EXISTS "+dw_db)
spark.sql("USE "+dw_db")
spark.sql('SHOW TABLES').show()
Load some unstructured CSV data from the Data lake as files and infer a schema from the raw data.
Make sure the column names are compatible with the data warehouse SQL syntax. """Read in raw data without a schema"""
rawDF = spark.read \
.format("csv") \
.option("inferSchema","true") \
.option("header", "true") \
.option("delimiter",",") \
.option("quote", "\"") \
.option("escape", "\"") \
.load(dl_path)
rawDF.printSchema()
rawDF.show(2)
"""Need to normalise the schema"""
for name in rawDF.schema.names:
rawDF = rawDF.withColumnRenamed(name, name.replace(' ', '_'))
Write the table to an Iceberg table, overwriting anything that is there in the table already.
New data inserted will create new point-in-time versions. """Write the table out in iceberg+parquet format"""
rawDF.write \
.mode("overwrite") \
.format("iceberg") \
.saveAsTable(dw_table)
spark.sql('show tables').show()
spark.sql('SELECT * FROM %s LIMIT 10'%(dw_table)).show()
spark.read.format("iceberg").load('%s.%s.history'%(dw_db,dw_table)).show(20, False)
spark.read.format("iceberg").load('%s.%s.snapshots'%(dw_db,dw_table)).show(20, False)
spark.read.format("iceberg").load('%s.%s.files'%(dw_db,dw_table)).show(20, False)
spark.stop()
Some simple examples of selecting data at a point in time. from datetime import datetime
# current date and time
now = datetime.now()
timestamp = datetime.timestamp(now)
print("timestamp =", timestamp)
# Timestamps can be tricky. Please make sure to round your timestamp as shown below.
# Query using a point in time
df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load('%s.%s'%(dw_db,dw_table))
df.show(100)
This is a simple hello-world example to get you started using Iceberg table formats with Spark3. For further reading, refer to: Introducing Apache Iceberg in Cloudera Data Platform Apache Iceberg Cloudera's Distribution of Spark3
... View more
Labels:
02-09-2021
09:32 PM
1 Kudo
Cloudera Machine Learning provides a number of methods of connecting to other CDP services and experiences such as a Cloudera Data Warehouse. In this post, we will connect using Python and the Impyla library, as well as using the embedded Cloudera Data Visualization.
Using Impyla
Within Cloudera Machine Learning, create a new project and set the language to Python 3.6. The connection details are available from the Data Warehouse console by copying the JDBC connection details which will look like. jdbc:impala://coordinator-aws-2-impala-prod.env-j2ln9x.dw.ylcu-atmi.cloudera.site:443/default;AuthMech=3;transportMode=http;httpPath=cliservice;ssl=1;UID=<workload username>;PWD=<workload password>
Use the following Python code to install Impyla and configure a connection: !pip3 install impyla==0.16a3
USERNAME='<workload username>'
IMPALA_HOST='coordinator-aws-2-impala-prod.env-j2ln9x.dw.ylcu-atmi.cloudera.site'
IMPALA_PORT='443'
from impala.dbapi import connect
conn = connect(host=IMPALA_HOST,
port=IMPALA_PORT,
auth_mechanism='LDAP',
user=USERNAME,
password=os.environ['PASS'],
use_http_transport=True,
http_path='/cliservice',
use_ssl=True)
cursor = conn.cursor()
cursor.execute('show databases')
for row in cursor:
print(row)
Note: The PASS variable is an Environment variable set in the Project settings under the Advanced tab. This does not protect your password but will mitigate the risk of it being copied into a version control service.
Using Visual Applications
Create a Cloudera Data Visualization App by following the instructions at Accessing Data Visualization in CML.
Log out as your default user and log back into Cloudera Data Visualization using the local admin user account. Note: You can raise a support request if you don't have access to this.
Add a new connection under Basic settings using the following parameters.
Connection Name: Name your Connection
Hostname or IP Address: Use the hostname from the JDBC string
Port #: Use the SSL port of 443
Username: CDP Workload Username
Password: CDP Workload Password
Under Advanced Settings, set the following parameters.
Connection Type: HTTP
HTTP path: /cliservice
Socket Type: SSL
Test the connection.
... View more
02-09-2021
01:58 AM
Cloudera Machine Learning provides support for Python3. It is very straightforward to connect a session with an operational database.
Provision an Operational Database
Log into a CDP instance
Select Operational Database
Select Create Database
Choose the Cloud environment
Provide a unique name for the database
Click Create Database
Once the database has started, make a copy of the Phoenix (Thin) JDBC URL. This will be used as the connection string.
Create a Machine Learning Project
Within your Cloudera Machine Learning (CML) workspace, create a new project.
Provide a name, and choose a blank initial setup. Create a session, and install phoenixdb using the command: !pip3 install phoenixdb
Create a new Python file and paste the following code into the notebook. Import the required dependencies import phoenixdb
import io
import json
Setup the parameters required to establish the connection with ODB. Refer to the Thin client details.
opts = {}
opts['authentication'] = 'BASIC'
opts['serialization'] = 'PROTOBUF'
opts['avatica_user'] = 'xxxxxxxx'
opts['avatica_password'] = 'xxxxxxxx'
database_url = 'https://<the jdbc url copied from the ODB console>/'
TABLENAME = "us_population"
conn = phoenixdb.connect(database_url, autocommit=True,**opts)
For the URL, remove everything before the https and remove the parameters at the end, while retaining any path details.
Example:
https://<server>/<instance name>/cdp-proxy-api/avatica/
Create the table into which to insert the data curs = conn.cursor()
query = """
CREATE TABLE IF NOT EXISTS """+TABLENAME+""" (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city))
"""
curs.execute(query)
Bulk insert a set of data, using nested arrays for each record, and executing multiple upserts. sql = "upsert into " + TABLENAME + \
" (state ,city, population) \
values (?,?,?)"
data =[['NY','New York',8143197],
['CA','Los Angeles',3844829],
['IL','Chicago',2842518],
['TX','Houston',2016582],
['PA','Philadelphia',1463281],
['AZ','Phoenix',1461575],
['TX','San Antonio',1256509],
['CA','San Diego',1255540],
['TX','Dallas',1213825],
['CA','San Jose',912332]]
results = curs.executemany(sql,data)
Finally, run a query to return an aggregated group-by and return as a Dictionary object. curs = conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)
query = """SELECT state as "State",count(city) as "City Count",sum(population) as "Population Sum"
FROM us_population
GROUP BY state
ORDER BY sum(population) DESC"""
curs.execute(query)
print(curs.fetchall())
When the above is run in a session, it will return the following results.
[{'State': 'NY', 'City Count': 1, 'Population Sum': 8143197}, {'State': 'CA', 'City Count': 3, 'Population Sum': 6012701}, {'State': 'TX', 'City Count': 3, 'Population Sum': 4486916}, {'State': 'IL', 'City Count': 1, 'Population Sum': 2842518}, {'State': 'PA', 'City Count': 1, 'Population Sum': 1463281}, {'State': 'AZ', 'City Count': 1, 'Population Sum': 1461575}]
This example is based on the post: Phoenix in 15 minutes or less
... View more
Labels:
06-13-2020
07:11 AM
Hi Royles, I am using hiveql for creating the table, altering the table for adding new columns. Doing all the operations like msck repair table,add partition to table everything I am doing from hiveql only.Only we are reading table from sparksql. After reading your reply,I tried to create external table,do msck repair,alter table to add new columns everything from sparksql. I got the below results 1.No results from spark when reading data from table 2.No results from hive shell when reading table 3.If I see the tblproperties,parquet schema is not matching .So there are no results from hiveql and from spark The only solution which I am following till now is(for adding new columns to external tbls) 1.Drop and create table using hiveql from hiveshell with all columns(old + new) 2.add latest partition manually which has data for all new columns added so far apart from beginning creation of table from hiveshell 3.query table from spark.Then check for tblproperties and parquet schema should be reflecting and mapped with hive columns 4.If the schema is not matching like testData in parquet is reflecting as testdata in hive tblproperties then we will get null values form spark 5.If both the schemas are matching,then we can see results from spark 4.then do msck repair which is giving me results in both spark 2.2 and 2.3 But I feel there must be some other way of adding new columns instead of dropping table and recreating it.
... View more