Member since
09-15-2016
19
Posts
4
Kudos Received
1
Solution
My Accepted Solutions
Title | Views | Posted |
---|---|---|
5627 | 03-08-2017 06:03 PM |
03-08-2017
10:23 PM
It works now, it is my IDE problem, Thanks
... View more
03-08-2017
06:03 PM
I am not talking about the broadcast variables, I am talking about the broadcast hint in join: join(broadcast(right),...) the 'broadcast' here is a function defined specifically for dataframe: public static org.apache.spark.sql.DataFrame broadcast(org.apache.spark.sql.DataFrame dataFrame) { /* compiled code */ } It is different from the broadcast variable explained in your link, which needs to be called by a spark context as below: sc.broadcast (...)
... View more
03-08-2017
05:33 PM
I noticed that we can do: join(broadcast(right),...) in Spark 1.6 in Java, but it looks like the broadcast function is not available in Spark 2.1.0
... View more
Labels:
- Labels:
-
Apache HBase
11-18-2016
05:11 PM
1 Kudo
Is there a way to broadcast a Dataframe/RDD without doing the collect first? I am thinking this could avoid a copy to the driver first. I did notice that there is a broadcast function that is used in the broadcast join for the DataFrame. public static DataFrame broadcast(DataFrame df) //Marks a DataFrame as small enough for use in broadcast joins.The following example marks the right DataFrame for broadcast hash join using joinKey.
// left and right are DataFrames
left.join(broadcast(right), "joinKey")
<code>
It seems that Sparks determines when the broadcasting is needed automatically when it finds that a join operation is needed. What I am wondering is that if I wanted to use in some other more general context, does the above broadcast function still work, i.e., the broadcast still occurs. The other thing is after the boradcasting, does the partition concept still exists for the dataframe, e.g. can I still apply functions like mapPartitions to the dataframe? Thanks
... View more
Labels:
- Labels:
-
Apache Spark
11-08-2016
10:07 PM
It is very clear, thanks
... View more
11-07-2016
05:08 PM
Thank you for the reply. After changing to create table by using create table ACME.ENDPOINT_STATUS The phoenix-spark plug in seems working. I also find some weird behavior, If I do both the following create table ACME.ENDPOINT_STATUS create table "ACME:ENDPOINT_STATUS" Both table shows up in phoenix, the first one shows as Schema ACME, and table name ENDPOINT_STATUS, but the later on shows as scheme none, and table name ACME:ENDPOINT_STATUS. However, in HBASE, I only see one table ACME:ENDPOINT_STATUS. In addition, upsert in the table ACME.ENDPOINT_STATUS shows up in the other table, so is the other way around. Namespace mapping is introduced in Phoenix 4.8, did Hortonworks backported the feature into HDP 2.5 and Phoenix 4.7 ?
... View more
11-03-2016
09:37 PM
Yes, the namespace is enabled and hbase conf dir is in the classpath.:q ACME.xxxxx format does not work either, actually even testJdbc complains with tableNotFound exception if using ACME.xxxxx format for table name.
... View more
11-03-2016
09:02 PM
I am testing some code using Phoenix Spark plug in to read a
Phoenix table with a namespace prefix in the table name (the table is
created as a phoenix table not a hbase table), but it returns an TableNotFoundException. The table is obviously there because I can query it using plain phoenix sql through Squirrel. In addition, using spark sql to query it has no problem at all. I am running on the HDP 2.5 platform, with phoenix 4.7.0.2.5.0.0-1245 The problem does not exist at all when I was running the same code on HDP 2.4 cluster, with phoenix 4.4. Neither does the problem occur when I query a table without a namespace prefix in the DB table name, on HDP 2.5 The log is in the attached file: tableNoFound.txt My testing code is listed below: The weird thing is in the attached code, if I run testSpark
alone it gives the above exception, but if I run the testJdbc first, and
followed by testSpark, both of them work. public class Application {
static private Logger log = Logger.getLogger(Application.class);
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("NMS Tuning Engine");
JavaSparkContext sc = new JavaSparkContext(conf);
//testJdbc(sc);
testSpark(sc);
}
static public void testSpark(JavaSparkContext sc) {
//SparkContextBuilder.buildSparkContext("Simple Application", "local");
// One JVM can only have one Spark Context now
Map<String, String> options = new HashMap<String, String>();
SQLContext sqlContext = new SQLContext(sc);
String tableStr = "\"ACME:ENDPOINT_STATUS\"";
String dataSrcUrl="jdbc:phoenix:luna-sdp-nms-01.davis.sensus.lab:2181:/hbase-unsecure";
options.put("zkUrl", dataSrcUrl);
options.put("table", tableStr);
log.info("Phoenix DB URL: " + dataSrcUrl + " tableStr: " + tableStr);
DataFrame df = null;
try {
df = sqlContext.read().format("org.apache.phoenix.spark").options(options).load();
df.explain(true);
} catch (Exception ex) {
log.error("sql error: ", ex);
}
try {
log.info ("Count By phoenix spark plugin: "+ df.count());
} catch (Exception ex) {
log.error("dataframe error: ", ex);
}
}
static public void testJdbc(JavaSparkContext sc) {
Map<String, String> options = new HashMap<String, String>();
SQLContext sqlContext = new SQLContext(sc);
if (sc == null || sqlContext == null || options == null) {
log.info("NULL sc, sqlContext, or options");
}
String qry2 = "(Select ENDPOINT_ID, CITY from \"ACME:ENDPOINT_STATUS\" Where city = 'ACME City')";
String dataSrcUrl="jdbc:phoenix:luna-sdp-nms-01.davis.sensus.lab:2181:/hbase-unsecure";
options.put("url", dataSrcUrl);
options.put("dbtable", qry2);
log.info("Phoenix DB URL: " + dataSrcUrl + "\nquery: " + qry2);
DataFrame df = null;
try {
DataFrameReader dfRd = sqlContext.read().format("jdbc").options(options);
if (dfRd == null) {
log.error("NULL DataFrameReader Object dfRd in getEndPointDataByJdbc");
}
df = dfRd.load();
df.explain(true);
} catch (Exception ex) {
log.error("sql error: ", ex);
}
try {
log.info ("Count By Jdbc: "+ df.count());
} catch (Exception ex) {
log.error("dataframe error: ", ex);
}
}
}
By the way, here is how the HBase looks like when I list it. hbase(main):031:0* list TABLE ACME:ENDPOINT_CONFIG ACME:ENDPOINT_STATUS LONG:ENDPOINTS LONG:RADIOCHANNELS LONG:REGIONINFORMATION LONG:TGBSTATISTICS SENSUS1:ENDPOINTS SENSUS1:RADIOCHANNELS SENSUS1:REGIONINFORMATION SENSUS1:TGBSTATISTICS SENSUS2:ENDPOINTS SENSUS2:RADIOCHANNELS SENSUS2:REGIONINFORMATION SENSUS2:TGBSTATISTICS SENSUS:ENDPOINTS SENSUS:RADIOCHANNELS SENSUS:REGIONINFORMATION SENSUS:TGBSTATISTICS SYSTEM.CATALOG SYSTEM:CATALOG SYSTEM:FUNCTION SYSTEM:SEQUENCE SYSTEM:STATS TENANT 24 row(s) in 0.0090 seconds =>
["ACME:ENDPOINT_CONFIG", "ACME:ENDPOINT_STATUS",
"LONG:ENDPOINTS", "LONG:RADIOCHANNELS",
"LONG:REGIONINFORMATION", "LONG:TGBSTATISTICS",
"SENSUS1:ENDPOINTS", "SENSUS1:RADIOCHANNELS",
"SENSUS1:REGIONINFORMATION", "SENSUS1:TGBSTATISTICS", "SENSUS2:ENDPOINTS",
"SENSUS2:RADIOCHANNELS", "SENSUS2:REGIONINFORMATION",
"SENSUS2:TGBSTATISTICS", "SENSUS:ENDPOINTS",
"SENSUS:RADIOCHANNELS", "SENSUS:REGIONINFORMATION",
"SENSUS:TGBSTATISTICS", "SYSTEM.CATALOG",
"SYSTEM:CATALOG", "SYSTEM:FUNCTION", "SYSTEM:SEQUENCE",
"SYSTEM:STATS", "TENANT"]
... View more
Labels:
- Labels:
-
Apache Phoenix
09-29-2016
03:58 PM
This works. Thank you Xindain
... View more
09-23-2016
09:52 PM
1 Kudo
I am submitted a spark job from a linuxVM(installed with a 1node HDP) to a real remote HDP cluster. It ismentioned in Spark doc that "Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration. If the configuration references Java system properties or environment variables not managed by YARN, they should also be set in the Spark application’s configuration (driver, executors, and the AM when running in client mode)." My question is, when I submit a job from a machine outside the cluster, obviously I need to put information about the address of the machine that YARN is running on, and so on. What does it mean it needs to "contains the (client side) configuration files" ? Shall I set the HADOOP_CONF_DIR to my local(the VM) directory containing local version of the yarn-site.xml file (the client side files)? this does not make sense since I want to submit the job to the remote cluster. I cannot find any more information for what files and what content should be included in the files, for the directory that HADOOP_CONF_DIR or YARN_CONF_DIR is pointing to.
... View more
Labels:
- Labels:
-
Apache Spark
-
Apache YARN