Member since
02-14-2022
3
Posts
0
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1742 | 06-16-2022 07:00 AM |
05-12-2023
03:21 AM
I've been testing the phoenixdb library in Python with CDP Operational DataStore cluster. Following the example here: https://community.cloudera.com/t5/Community-Articles/Using-phoenixdb-to-connect-Cloudera-Machine-Learning-to/ta-p/310130 I can make a connection and return a single row but the connection is very slow to establish and execute. I'm running phoenixdb from the python interpreter on the COD cluster gateway host. The timestamped python execution is below. Python 3.6.8 (default, Nov 16 2020, 16:55:22) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux >>> import phoenixdb >>> from datetime import datetime >>> opts = {} >>> opts["authentication"] = "BASIC" >>> opts["serialization"] = "PROTOBUF" >>> opts["avatica_user"] = "workload-username" >>> opts["avatica_password"] = "workload-password" >>> jbdc_connection_url = "https://cod--<clusterID>-gateway0.<envID>.<custID>.cloudera.site/cod--<clusterID>/cdp-proxy-api/avatica/" >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:54:45 >>> conn = phoenixdb.connect(jbdc_connection_url, autocommit=True, **opts) >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:54:51 >>> sql_command = "SELECT * FROM DBNAME.TABLENAME WHERE CUSTID = ?" >>> data = ("xxxxxxxx", ) >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:54:51 >>> cursor = conn.cursor() >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:54:51 >>> cursor.execute(sql_command, data) >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:55:00 >>> results_list = cursor.fetchall() >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:55:00 >>> conn.close() >>> print(datetime.now().strftime('%Y-%m-%d %H:%M:%S')) 2023-05-12 09:55:06 >>> results_list [['xxxxxxxx', '...', '...', ...]] You can see that it takes 6 seconds to establish the connection to Phoenix. It takes 9 second to execute the SQL command and another 6 seconds to close the connection. Any thoughts on why this might be the case and if I can make changes to address this? Thanks.
... View more
Labels:
06-16-2022
07:00 AM
To provide a module with custom Python functions that are declared as UDFs, one must specify: spark_session.sparkContext.addPyFile("/app/mount/python_utils.py") This file should be included in a resource attached to the job. See this post for further examples: https://blog.cloudera.com/managing-python-dependencies-for-spark-workloads-in-cloudera-data-engineering/
... View more
04-07-2022
03:04 AM
Hi. I'm trying to reproduce a typical edge-node submission pattern for PySpark jobs using the CDE Jobs UI. To provide a module with custom Python functions that are declared as UDFs, one can specify: spark_session.sparkContext.addPyFile("python_utils.py") or use the --py-files argument with spark-submit: spark-submit --py-files python_utils.py pyspark_main.py Pure Python functions can then be imported in this way: import pyspark.sql.functions as F def add_udf_column(df): from python_utils import python_func python_udf = F.udf(python_func, StringType()) df = df.withColumn("udf_column", python_udf(df["src_column"])) return df Attempting something similar using the CDE Jobs UI, Spark cannot seem to find the custom module. My settings are: Application File: pyspark_main.py Arguments: --py-files python_utils.py Advanced Options: Python, Egg, Zip files: Added python_utils.py The error I'm getting is: ModuleNotFoundError: No module named 'python_utils' Any thoughts on how I should provide this file? Thanks. Edit: Testing spark-submit on CDH6.2, it seems that the --py-files flag must be placed before the main script. If the flag is placed afterwards, then the job fails with the ModuleNotFoundError as above. From the CDE logs it looks like the API is placing the flag after the reference to the main script. From driver.stderr.log: + CMD=("$SPARK_HOME/bin/spark-submit" --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" --deploy-mode client "$@") + '[' -z ']' + exec /usr/bin/tini -s -- /opt/spark/bin/spark-submit --conf spark.driver.bindAddress=***.***.**.*** --deploy-mode client --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.deploy.PythonRunner /app/mount/pyspark_main.py '--py-files python_utils.py'
... View more
Labels: