Created 08-14-2018 10:50 AM
Hi,
I am running below code to fetch data from sql server tables and loading it to hive tables.
import os
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = (SparkSession .builder .appName("data_import") .config("spark.dynamicAllocation.enabled", "true") .config("spark.shuffle.service.enabled", "true") .config("spark.sql.parquet.writeLegacyFormat","true") .enableHiveSupport() .getOrCreate())
df = spark.read.jdbc("jdbc:sqlserver://10.24.25.25;database=CORE_13_2_TEST;username=core;password=password;table=(select * from T_DISTRICT_TYPE_test)")
df.write.mode('append').format('orc').saveAsTable(test)
But I am getting below error while running this.
df = spark.read.jdbc("jdbc:sqlserver://10.24.25.25;database=CORE_13_2_TEST;username=core;password=password;table=(select * from T_DISTRICT_TYPE_test)")
TypeError: jdbc() takes at least 3 arguments (2 given)
Created 08-14-2018 11:55 AM
the syntax should be spark.read.jdbc(url, table, connectionProperties)
you can also check here: https://stackoverflow.com/questions/30983982/how-to-use-jdbc-source-to-write-and-read-data-in-pyspar...
looks like you are missing the connectionProperties, which include typically the login.
jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"})
Created 08-14-2018 12:40 PM
I am not getting a clear syntax for this.
The below code is working fine for table but not for a sql query.
I want to load from select query with some where condition , not the complete table.
If possible could you please modify the code to support sql query instead of tables.
import os
from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = (SparkSession .builder .appName("data_import") .config("spark.dynamicAllocation.enabled", "true") .config("spark.shuffle.service.enabled", "true") .config("spark.sql.parquet.writeLegacyFormat","true") .enableHiveSupport() .getOrCreate())
lst = list(["T_DISTRICT_TYPE_test"]);
for tbl in lst: df = spark.read.jdbc("jdbc:sqlserver://10.24.25.25;database=CORE_13_2_TEST;username=core;password=password",tbl) df.write.format("orc").save("/tmp/orc_query_output_"+tbl)
df.write.mode('append').format('orc').saveAsTable(tbl)
Created 08-14-2018 01:26 PM
just to be clear:
df = spark.read.jdbc("jdbc:sqlserver://10.24.25.25;database=CORE_13_2_TEST;username=core;password=password","T_DISTRICT_TYPE_test")is working, while
df = spark.read.jdbc("jdbc:sqlserver://10.24.25.25;database=CORE_13_2_TEST;username=core;password=password","SELECT * FROM T_DISTRICT_TYPE_test")is failing? Or do you change anything else in addition?
Created 08-14-2018 01:32 PM
Its working now. The format should be "(SELECT * FROM T_DISTRICT_TYPE_test) as abc".
Without alias its not working