Member since
10-07-2015
107
Posts
73
Kudos Received
23
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
1146 | 02-23-2017 04:57 PM | |
1183 | 12-08-2016 09:55 AM | |
6708 | 11-24-2016 07:24 PM | |
2734 | 11-24-2016 02:17 PM | |
6991 | 11-24-2016 09:50 AM |
05-03-2017
02:09 PM
Maybe check whether you can access WebHDFS via Knox to see if your kinit user is accepted by Knox
... View more
04-28-2017
06:50 AM
My guess here is that each worker writes the model to file://model-path/model-part on each of the two worker machines. So maybe there is a part of the model on both machines? With HDFS the model-path is the same and hence the model will be completely saved. So for a distributed system to store and load data all workers need to be able to access the same data under the same path. That's why a distributed file system is the usual recommendation. Ignoring performance, replication and so on you should also be able to mount and and use the same path on a network file system (SAN, NAS, NFS, ...), however this is not recommended
... View more
04-12-2017
08:40 AM
1 Kudo
You don't need to do the whole string conversion and concat steps. Simply try from pyspark.sql.functions import collect_list, col
transactions = df.groupBy("customer_id")\
.agg(collect_list("product_id").alias("product_ids"))\
.rdd\
.map(lambda x: x.product_ids)
transactions.collect()
Then from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(transactions, 0.2, 10)
result = model.freqItemsets().collect()
for fi in result:
print(fi) will work
... View more
04-12-2017
08:09 AM
iris = spark.read.csv("/tmp/iris.csv", header=True, inferSchema=True)
iris.printSchema()
Result:
root
|-- sepalLength: double (nullable = true)
|-- sepalWidth: double (nullable = true)
|-- petalLength: double (nullable = true)
|-- petalWidth: double (nullable = true)
|-- species: string (nullable = true)
Write parquet file ... iris.write.parquet("/tmp/iris.parquet")
... and create hive table spark.sql("""
create external table iris_p (
sepalLength double,
sepalWidth double,
petalLength double,
petalWidth double,
species string
)
STORED AS PARQUET
location "/tmp/iris.parquet"
""")
... View more
03-24-2017
08:11 AM
Hard to say from the info you gave. Since you load the data into a DataFrame, the Oracle part should be abstracted, as long as the schema fits - and I guess you checked the schema of "tran1" You could try to select from tran1 into another dataframe trans2 to control all columns, check schema and try to write tran2
... View more
03-22-2017
04:33 PM
Hive: create table tran_spark_part (
id String,
amount BigInt
)
partitioned by (date1 string); This works in Spark (tested on 1.6.2): > case class Person(id: String, amount: Integer, date1: String)
> val df = Seq(Person("1", 32, "2017")).toDF()
> sqlContext.setConf("hive.exec.dynamic.partition", "true")
> sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
> df.write.mode("overwrite").partitionBy("date1").insertInto("tran_spark_part")
> sqlContext.sql("select * from tran_spark_part").show()
+---+------+-----+
| id|amount|date1|
+---+------+-----+
| 1| 32| 2017|
+---+------+-----+
This doesn't: > case class Person2(id: String, amount: Integer, t_date: String)
> val df2 = Seq(Person("2", 42, "2017")).toDF()
> df2.write.mode("overwrite").partitionBy("t_date").insertInto("tran_part")
org.apache.spark.sql.AnalysisException: Partition column t_date not found in existing columns (id, amount, date1);
... View more
03-21-2017
11:25 AM
If you use Spark Streaming and the event RDD is empty then Spark will write a folder and not add any content. see e.g. Deleting Directory in HDFS using Spark for how to avoid this. If it is not Streaming, verify the JavaRDD is not empty
... View more
03-21-2017
11:13 AM
partitionBy uses column names. Hive table has "date1" and in Spark "t_date" is used. Have you tried to rename the dataframe column to date1 so that it matches the Hive schema?
... View more
02-24-2017
09:30 AM
2 Kudos
Assumption: all files have the same columns and in each file the first line is the header
This is a solution in PySpark
I load every file via "com.databricks.spark.csv" class respecting header and inferring schema
Then I use python reduce to union them all
from functools import reduce
files = ["/tmp/test_1.csv", "/tmp/test_2.csv", "/tmp/test_3.csv"]
df = reduce(lambda x,y: x.unionAll(y),
[sqlContext.read.format('com.databricks.spark.csv')
.load(f, header="true", inferSchema="true")
for f in files])
df.show()
... View more
02-24-2017
09:13 AM
Try "explode": import org.apache.spark.sql.functions.{udf, array, explode, col}
case class Result ( date: String, usage: Double )
def splitUsage = udf { (datediff:Integer, startdate: String, usage:Integer) =>
if (datediff == 32) {
val date = new DateTime(format.parse(startdate))
(for (i <- 0 to datediff) yield Result(format.format(date.plusDays(2).toDate()),
usage.toDouble / datediff.toDouble)).toArray
} else {
Array(Result(startdate, usage.toDouble))
}
}
val df2 = df.withColumn("dayusage", splitUsage($"datediff", $"startdate", $"usage"))
val df3 = df2.select($"*", explode($"dayusage"))
val result = df3.select($"Id", $"startdate", $"enddate", $"datediff", $"did",
col("col")("date").alias("date"), col("col")("usage").alias("usage"))
... View more
02-24-2017
07:33 AM
Fine grained permissions (row level, column masking, ...) are created in Ranger for any Hive table - whether created by HiveQL or SparkQL So if you create a new table in Hive via SparkSQL that should be used by others with access control, you need to create the appropriate policies afterwards in Ranger. For less fine grained permissions (delete update, insert delete) you can also use the SQL commands of https://cwiki.apache.org/confluence/display/Hive/SQL+Standard+Based+Hive+Authorization#SQLStandardBasedHiveAuthorization-ManagingObjectPrivileges with SparkSQL
... View more
02-23-2017
04:57 PM
1 Kudo
For Hive one would use Apache Ranger for this. You can allow or deny access to tables, columns and even rows. Now, what to do with Spark: For the normal HiveContext Spark would read the Schema from Metastore and then read the the file directly from HDFS. So no Hive Ranger plugin would kick in. However, with LLAP it will be possible, see e.g. https://hortonworks.com/blog/sparksql-ranger-llap-via-spark-thrift-server-bi-scenarios-provide-row-column-level-security-masking/ If you additionally disable HDFS access for "others" for Hive tables, data is access controlled
... View more
02-23-2017
04:33 PM
You could also do in the Spark code: import org.apache.log4j.{Level, Logger}
def main(args: Array[String]) = {
Logger.getRootLogger.setLevel(Level.ERROR)
var conf = new SparkConf().setAppName("KafkaToHdfs")
val sc = new SparkContext(conf)
... View more
02-22-2017
03:54 PM
and what are the Interpreter settings saying. Here are mine. Note: a simple "python" in zeppelin.pyspark.python is also OK. ... by the way, I have the same libs in my zeppelin libs spark folder. Have you tried to restart the interpreter?
... View more
02-22-2017
03:44 PM
Does Zeppelin send to Spark Interpreter at all? What is %spark
print(sc.version) printing? No hiveContext necessary
... View more
02-22-2017
02:39 PM
Side note: in HDP Zeppelin sqlContext defaults to hiveContext. So something like %spark
sqlContext.sql("show tables").collect.foreach(println) should work. Alternatively: %sql
show tables As Jay mentioned, the % needs to be in the first line
... View more
02-22-2017
08:16 AM
What we see in a kerberised cluster is that Livy needs to be able to impersonate other roles. Then when Know forwards the request to Livy with "doAs=<authc-user>", livy starts job as the authenticated user. To be on the safe side, the knox rewrite rule also replaces the proxyUser with the authenticated user
... View more
01-16-2017
12:27 PM
I gave it a quick try and created 50 xml files according to your structure each having 60MB. Tested on 3 workers (each 7 core 26GB per worker) 1) The tar.gz file had 450MB and took 14min with 1 (!) executor. Since it is a tar file, only one executor reads the file. 2) Putting all files as single xml.gz in one folder and starting the job again I had 3 executors involved and the job got done in under 5 min (roughly the 14 min / 3 since no shuffle required) So I see two issues here: 1) Don't use tar.gz 2) 50 min compared to 14 min: How fast is your machine (cores, ...)?
... View more
12-08-2016
06:25 PM
2 Kudos
Configure Livy in Ambari
Until
https://github.com/jupyter-incubator/sparkmagic/issues/285 is fixed, set
livy.server.csrf_protection.enabled ==> false
in Ambari under
Spark Config - Advanced livy-conf Install Sparkmagic
Details see
https://github.com/jupyter-incubator/sparkmagic
Install Jupyter, if you don't already have it:
$ sudo -H pip install jupyter notebook ipython
Install Sparkmagic:
$ sudo -H pip install sparkmagic
Install Kernels:
$ pip show sparkmagic # check path, e.g /usr/local/lib/python2.7/site-packages
$ cd /usr/local/lib/python2.7/site-packages
$ jupyter-kernelspec install --user sparkmagic/kernels/sparkkernel
$ jupyter-kernelspec install --user sparkmagic/kernels/pysparkkernel
Install Sparkmagic widgets
$ sudo -H jupyter nbextension enable --py --sys-prefix widgetsnbextension
Create local Configuration
The configuration file is a json file stored under
~/.sparkmagic/config.json
To avoid timeouts connecting to HDP 2.5 it is important to add
"livy_server_heartbeat_timeout_seconds": 0
To ensure the Spark job will run on the cluster (livy default is local),
spark.master needs needs to be set to yarn-cluster. Therefore a conf object needs to be provided (here you can also add extra jars for the session):
"session_configs": {
"driverMemory": "2G",
"executorCores": 4,
"executorMemory": "8G",
"proxyUser": "bernhard",
"conf": {
"spark.master": "yarn-cluster",
"spark.jars.packages": "com.databricks:spark-csv_2.10:1.5.0"
}
}
The
proxyUser is the user the Livy session will run under.
Here is an example
config.json. Adapt and copy to ~/.sparkmagic Start Jupyter Notebooks
1) Start Jupyter:
$ cd <project-dir>
$ jupyter notebook
In Notebook Home select
New -> Spark or New -> PySpark or New -> Python
2) Load Sparkmagic:
Add into your Notebook after the Kernel started
In[ ]: %load_ext sparkmagic.magics
3) Create Endpoint
In[ ]: %manage_spark
This will open a connection widget
Username and password can be ignored in non secured clusters
4) Create a session:
When this is successful, create a session:
Note that it uses the created endpoint and under properties the configuration on the config.json.
When you see
Spark session is successfully started and Notes
Livy on HDP 2.5 currently does not return YARN Application ID
Jupyter session name provided under Create Session is notebook internal and not used by Livy Server on the cluster. Livy-Server will create sessions on YARN called livy-session-###, e.g. livy-session-10. The session in Jupyter will have session id ###, e.g. 10.
For multiline Scala code in the Notebook you have to add the dot at the end, as in
val df = sqlContext.read.
format("com.databricks.spark.csv").
option("header", "true").
option("inferSchema", "true").
load("/tmp/iris.csv")
For more details and example notebooks in Sparkmagic , see https://github.com/bernhard-42/Sparkmagic-on-HDP Credits
Thanks to Alex (@azeltov) for the discussions and debugging session
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- hdp2.5
- How-ToTutorial
- jupyter
- livy
- Spark
12-08-2016
06:25 PM
7 Kudos
Update 10.12.2016: Added filter to rewrite proxyUser as authenticated user
Update 25.01.2017: Improved service.xml and rewrite.xml 1 Configure a new service for Livy Server in Knox
1.1 Create a service definition
$ sudo mkdir -p /usr/hdp/current/knox-server/data/services/livy/0.1.0/
$ sudo chown -R knox:knox /usr/hdp/current/knox-server/data/services/livy
Create a file
/usr/hdp/current/knox-server/data/services/livy/0.1.0/service.xml with
<service role="LIVYSERVER" name="livy" version="0.1.0">
<routes>
<route path="/livy/v1/sessions">
<rewrite apply="LIVYSERVER/livy/addusername/inbound" to="request.body"/>
</route>
<route path="/livy/v1/**?**"/>
<route path="/livy/v1"/>
<route path="/livy/v1/"/>
</routes>
</service>
Note that the name "livy" attribute and the path .../services/livy/... need to be the same. The route /livy/v1/sessions is a special treatment for the POST request to create a Livy session. The request body e.g. looks like: {"driverMemory":"2G","executorCores":4,"executorMemory":"8G","proxyUser":"bernhard","conf":{"spark.master":"yarn-cluster","spark.jars.packages":"com.databricks:spark-csv_2.10:1.5.0"} Livy server will use proxUser to run the Spark session. To avoid that a user can provide here any user (e.g. a more privileged), Knox will need to rewrite the the json body to replace what so ever is the value of proxyUser is with the username of the authenticated user, see next section. 1.2 Create a rewrite rule definition
Create a file
/usr/hdp/current/knox-server/data/services/livy/0.1.0/rewrite.xml with
<rules>
<rule name="LIVYSERVER/livy/user-name">
<rewrite template="{$username}"/>
</rule>
<rule dir="IN" name="LIVYSERVER/livy/root/inbound" pattern="*://*:*/**/livy/v1">
<rewrite template="{$serviceUrl[LIVYSERVER]}"/>
</rule>
<rule dir="IN" name="LIVYSERVER/livy/path/inbound" pattern="*://*:*/**/livy/v1/{path=**}?{**}">
<rewrite template="{$serviceUrl[LIVYSERVER]}/{path=**}?{**}"/>
</rule>
<filter name="LIVYSERVER/livy/addusername/inbound">
<content type="*/json">
<apply path="$.proxyUser" rule="LIVYSERVER/livy/user-name"/>
</content>
</filter>
</rules>
Note: The "v1" is only introduced to allow calls to Livy-server without any path. (Seems to be a limitation in Knox that at least one path element nees to be present in mapped URL. The rule LIVYSERVER/livy/user-name and the filter LIVYSERVER/livy/addusername/inbound are responsible to inject the authenticated user name as described in the last section. 1.3 Publish the new service via Ambari
Goto
Knox Configuration and add at the end of Advanced Topology:
<topology>
...
<service>
<role>LIVYSERVER</role>
<url>http://<livy-server>:8998</url>
</service>
</topology>
2 Use with Sparkmagic
Sparkmagic can be configured to use Livy Server in HDP 2.5, see Using Jupyter with Sparkmagic and Livy Server on HDP 2.5
To connect via Knox, change endpoint definition in
%manage_spark
Just replace Livy server URL with Knox gateway URL
https://<knox-gateway>:8443/livy/v1
If Knox does not have a valid certificate for HTTPS requests, reconfigure Sparkmagic's config.json end set
"ignore_ssl_errors": false
Credits Thanks to Kevin Minder for the article About Adding a service to Apache Knox
... View more
- Find more articles tagged with:
- Data Science & Advanced Analytics
- hdp-2.5.0
- How-ToTutorial
- Knox
- livy
12-08-2016
11:02 AM
If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks
... View more
12-08-2016
09:55 AM
1 Kudo
You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing). I would recommend to do the "explain" for your own query, see the example below 1) With SQL query spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain Result: == Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
:- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
: :- *Filter isnotnull(emp_no#38)
: : +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
: +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(dept_no#44)
+- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d
2) Read via hive and then execute dataframe join val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain Result: == Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
:- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
: +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
: :- *Filter isnotnull(dept_no#254)
: : +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
: +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
: +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Filter isnotnull(emp_no#243)
+- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees It is the same 3) Pure dataframes: val d = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/employees")
.toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("emp_no", "dept_no"):_*)
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain Result: == Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
:- *Project [dept_no#96, dept_name#97, emp_no#137]
: +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
: :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
: : +- *Filter isnotnull(_col0#91)
: : +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
: +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
: +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
: +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
+- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
+- *Filter isnotnull(_col0#101)
+- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string> Very similar, just the read is done differently. Especially it is the same type of joins
... View more
11-24-2016
07:24 PM
3 Kudos
You could split it into ExtractText with a dynamic property "json" as "(.*?<MyXML>)(.*?)(<\/MyXML>.*)" and ReplaceText as follows: "${json.1}${json.2:replace('"', '\\"')}${json.3}" (i.e. with 4 backslashes) This will create [ { "XML": "<MyXML> This is a \"test\" XML </MyXML>" } ]
... View more
11-24-2016
02:17 PM
1 Kudo
You might need to restart the Spark Interpreter (or restart Zeppelin notebook in Ambari, so that the Python Remote Interpreters know about the freshly installed pandas and import it If you are you running on a cluster, then Zeppelin will run in yarn client mode and the Python Remote Interpreters are started on other nodes than the zeppelin node. In this case install pandas on all machines of your cluster and restart Zeppelin.
... View more
11-24-2016
02:05 PM
good to hear. btw. it is good practice to accept the answer so that is marked as resolved in the overview
... View more
11-24-2016
09:50 AM
3 Kudos
Per default, Atlas uses Basic Authentication. So use your Atlas user and password, e.g. like curl -s -u admin:admin http://atlas-server:21000/api/atlas/types
... View more
11-23-2016
07:41 AM
I am not aware of Hive Version 1.5.0 (do you mean Hive View?) Anyhow, it works on Hive 1.2.1 (as of HDP 2.5) $ beeline -u "jdbc:hive2://192.168.124.145:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -n hive
Connecting to jdbc:hive2://192.168.124.145:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
Connected to: Apache Hive (version 1.2.1000.2.5.0.0-1245)
Driver: Hive JDBC (version 1.2.1000.2.5.0.0-1245)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1000.2.5.0.0-1245 by Apache Hive
0: jdbc:hive2://192.168.124.145:2181/> select dept_name, md5(dept_name) from departments limit 1;
+-------------------+-----------------------------------+--+
| dept_name | _c1 |
+-------------------+-----------------------------------+--+
| Customer Service | d5552e0564007d93ff5937a9cb3bc491 |
+-------------------+-----------------------------------+--+
1 row selected (0.337 seconds) and on Hive 2.1 (TP in HDP 2.5) $ beeline -u "jdbc:hive2://192.168.124.145:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-hive2" -n hive
Connecting to jdbc:hive2://192.168.124.145:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-hive2
Connected to: Apache Hive (version 2.1.0.2.5.0.0-1245)
Driver: Hive JDBC (version 1.2.1000.2.5.0.0-1245)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1000.2.5.0.0-1245 by Apache Hive
0: jdbc:hive2://192.168.124.145:2181/> select dept_name, md5(dept_name) from departments limit 1;
+-------------------+-----------------------------------+--+
| dept_name | c1 |
+-------------------+-----------------------------------+--+
| Customer Service | d5552e0564007d93ff5937a9cb3bc491 |
+-------------------+-----------------------------------+--+
1 row selected (6.083 seconds)
... View more
11-22-2016
05:45 PM
1 Kudo
You should use HiveContext, see https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-tables from pyspark.sql import HiveContext
sqlContext = HiveContext(sc1) and then you can test access to your table. Also try sqlContext.sql("show databases").show()
sqlContext.sql("show tables").show() to see what you can acccess
... View more
11-22-2016
08:59 AM
The driver will create the Spark Context. This can either be the spark-shell or Zeppelin, or a standalone Spark application (see http://spark.apache.org/docs/1.6.2/quick-start.html#self-contained-applications to learn how to create a spark context in an application). To distribute the execution you need to choose "yarn client" or "yarn cluster" mode (not local, which is the default), see spark-shell --master yarn --deploy-mode client --num-executors 3 This will create a driver with a Spark Context that controls 3 executors (could be on 1,2 or 3 machines, check "ps ax | grep Coarse") When you now call sc.textFile(...), then Spark will create an RDD by loading on each executor partitions of the file (hence afterwards it is already partitioned! Every further command will then run distributed. So it is not you to bring the Spark Context to the executors but the SparkContext is used by the driver to distribute the load across all started executors. That's why I linked the Spark docs above. You need to first understand the Spark cluster mode. If you only start "spark-shell", it will not be distributed but in "standalone" mode. Only in "yarn client" and "yarn cluster" mode it will be distributed. Having a distributed Spark Context created in the driver, there is no need to care about it any more. Just use the context and Spark will distribute your load. Readings: Overview: http://spark.apache.org/docs/latest/cluster-overview.html Spark Standalone: http://spark.apache.org/docs/latest/spark-standalone.html Spark on YARN: http://spark.apache.org/docs/latest/running-on-yarn.html
... View more
11-21-2016
02:43 PM
Spark Context is the cluster coordinator, details see http://spark.apache.org/docs/latest/cluster-overview.html
... View more