Support Questions

Find answers, ask questions, and share your expertise

How to bind variable in Apache Spark SQL

avatar
Expert Contributor

For example

df= HiveContext.sql("SELECT * FROM src WHERE col1 = ${VAL1}")

Thank

6 REPLIES 6

avatar

@alain TSAFACK

val VAL1 = "testcol"
val df = HiveContext.sql(s"SELECT * FROM src WHERE col1 = $VAL1")

avatar
Expert Contributor

Hello, I tried but I get an error that is:

code with "r" like parameter:

df=hive_context.sql(s"select c.`date`, c.blglast from qvol1_temp as c join qvol2_temp as uv on c.udl_id = uv.udl_id where uv.ric =$r and c.`date` >= '2016-06-13 00:00:00' and c.`date` <= '2016-06-17 00:00:00' and c.adj_split = False")

error:

SyntaxError: invalid syntax

avatar

Are you using PySpark or Scala?

avatar
Expert Contributor

Hello !

I use PySpark

avatar
Contributor

I think this should work for you if you are using PySpark.

VAL1 = 'SOME_STRING'
df= HiveContext.sql("SELECT * FROM src WHERE col1 = '%s'" % VAL1)

avatar
Expert Contributor
Thank you . it works well .
But this is performed in local Initiation and When I ' execute on cluster with the command:
spark-submit --master yarn-cluster --py-files hdfs:///dev/datalake/app/des/dev/script/lastloader.py  --queue DES hdfs:///dev/datalake/app/des/dev/script/return.py

it generates this error in the logs :

Log Type: stdout Log Upload Time: Thu Jun 30 09:19:20 +0200 2016 Log Length: 3254 Traceback (most recent call last): File "return.py", line 10, in <module> df = Lastloader() File "/DATA/fs6/hadoop/yarn/local/usercache/atsafack/appcache/application_1465374541433_9209/container_e52_1465374541433_9209_02_000001/__pyfiles__/lastloader.py", line 13, in Lastloader qvol1 = hive_context.table("lake_des_statarbmfvol.qvol_bbg_closes") File "/DATA/fs6/hadoop/yarn/local/usercache/atsafack/appcache/application_1465374541433_9209/container_e52_1465374541433_9209_02_000001/pyspark.zip/pyspark/sql/context.py", line 565, in table File "/DATA/fs6/hadoop/yarn/local/usercache/atsafack/appcache/application_1465374541433_9209/container_e52_1465374541433_9209_02_000001/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/DATA/fs6/hadoop/yarn/local/usercache/atsafack/appcache/application_1465374541433_9209/container_e52_1465374541433_9209_02_000001/pyspark.zip/pyspark/sql/utils.py", line 36, in deco File "/DATA/fs6/hadoop/yarn/local/usercache/atsafack/appcache/application_1465374541433_9209/container_e52_1465374541433_9209_02_000001/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o53.table. : org.apache.spark.sql.catalyst.analysis.NoSuchTableException at

org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123) at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:61) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406) at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:410) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:410) at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739) at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207)

Can you help me please? Best regards