Member since
10-07-2015
107
Posts
73
Kudos Received
23
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2700 | 02-23-2017 04:57 PM | |
2094 | 12-08-2016 09:55 AM | |
9229 | 11-24-2016 07:24 PM | |
4112 | 11-24-2016 02:17 PM | |
9596 | 11-24-2016 09:50 AM |
02-22-2017
03:54 PM
and what are the Interpreter settings saying. Here are mine. Note: a simple "python" in zeppelin.pyspark.python is also OK. ... by the way, I have the same libs in my zeppelin libs spark folder. Have you tried to restart the interpreter?
... View more
02-22-2017
03:44 PM
Does Zeppelin send to Spark Interpreter at all? What is %spark
print(sc.version) printing? No hiveContext necessary
... View more
02-22-2017
02:39 PM
Side note: in HDP Zeppelin sqlContext defaults to hiveContext. So something like %spark
sqlContext.sql("show tables").collect.foreach(println) should work. Alternatively: %sql
show tables As Jay mentioned, the % needs to be in the first line
... View more
02-22-2017
08:16 AM
What we see in a kerberised cluster is that Livy needs to be able to impersonate other roles. Then when Know forwards the request to Livy with "doAs=<authc-user>", livy starts job as the authenticated user. To be on the safe side, the knox rewrite rule also replaces the proxyUser with the authenticated user
... View more
01-16-2017
12:27 PM
I gave it a quick try and created 50 xml files according to your structure each having 60MB. Tested on 3 workers (each 7 core 26GB per worker) 1) The tar.gz file had 450MB and took 14min with 1 (!) executor. Since it is a tar file, only one executor reads the file. 2) Putting all files as single xml.gz in one folder and starting the job again I had 3 executors involved and the job got done in under 5 min (roughly the 14 min / 3 since no shuffle required) So I see two issues here: 1) Don't use tar.gz 2) 50 min compared to 14 min: How fast is your machine (cores, ...)?
... View more
12-08-2016
06:25 PM
2 Kudos
Configure Livy in Ambari
Until
https://github.com/jupyter-incubator/sparkmagic/issues/285 is fixed, set
livy.server.csrf_protection.enabled ==> false
in Ambari under
Spark Config - Advanced livy-conf Install Sparkmagic
Details see
https://github.com/jupyter-incubator/sparkmagic
Install Jupyter, if you don't already have it:
$ sudo -H pip install jupyter notebook ipython
Install Sparkmagic:
$ sudo -H pip install sparkmagic
Install Kernels:
$ pip show sparkmagic # check path, e.g /usr/local/lib/python2.7/site-packages
$ cd /usr/local/lib/python2.7/site-packages
$ jupyter-kernelspec install --user sparkmagic/kernels/sparkkernel
$ jupyter-kernelspec install --user sparkmagic/kernels/pysparkkernel
Install Sparkmagic widgets
$ sudo -H jupyter nbextension enable --py --sys-prefix widgetsnbextension
Create local Configuration
The configuration file is a json file stored under
~/.sparkmagic/config.json
To avoid timeouts connecting to HDP 2.5 it is important to add
"livy_server_heartbeat_timeout_seconds": 0
To ensure the Spark job will run on the cluster (livy default is local),
spark.master needs needs to be set to yarn-cluster. Therefore a conf object needs to be provided (here you can also add extra jars for the session):
"session_configs": {
"driverMemory": "2G",
"executorCores": 4,
"executorMemory": "8G",
"proxyUser": "bernhard",
"conf": {
"spark.master": "yarn-cluster",
"spark.jars.packages": "com.databricks:spark-csv_2.10:1.5.0"
}
}
The
proxyUser is the user the Livy session will run under.
Here is an example
config.json. Adapt and copy to ~/.sparkmagic Start Jupyter Notebooks
1) Start Jupyter:
$ cd <project-dir>
$ jupyter notebook
In Notebook Home select
New -> Spark or New -> PySpark or New -> Python
2) Load Sparkmagic:
Add into your Notebook after the Kernel started
In[ ]: %load_ext sparkmagic.magics
3) Create Endpoint
In[ ]: %manage_spark
This will open a connection widget
Username and password can be ignored in non secured clusters
4) Create a session:
When this is successful, create a session:
Note that it uses the created endpoint and under properties the configuration on the config.json.
When you see
Spark session is successfully started and Notes
Livy on HDP 2.5 currently does not return YARN Application ID
Jupyter session name provided under Create Session is notebook internal and not used by Livy Server on the cluster. Livy-Server will create sessions on YARN called livy-session-###, e.g. livy-session-10. The session in Jupyter will have session id ###, e.g. 10.
For multiline Scala code in the Notebook you have to add the dot at the end, as in
val df = sqlContext.read.
format("com.databricks.spark.csv").
option("header", "true").
option("inferSchema", "true").
load("/tmp/iris.csv")
For more details and example notebooks in Sparkmagic , see https://github.com/bernhard-42/Sparkmagic-on-HDP Credits
Thanks to Alex (@azeltov) for the discussions and debugging session
... View more
12-08-2016
06:25 PM
7 Kudos
Update 10.12.2016: Added filter to rewrite proxyUser as authenticated user
Update 25.01.2017: Improved service.xml and rewrite.xml 1 Configure a new service for Livy Server in Knox
1.1 Create a service definition
$ sudo mkdir -p /usr/hdp/current/knox-server/data/services/livy/0.1.0/
$ sudo chown -R knox:knox /usr/hdp/current/knox-server/data/services/livy
Create a file
/usr/hdp/current/knox-server/data/services/livy/0.1.0/service.xml with
<service role="LIVYSERVER" name="livy" version="0.1.0">
<routes>
<route path="/livy/v1/sessions">
<rewrite apply="LIVYSERVER/livy/addusername/inbound" to="request.body"/>
</route>
<route path="/livy/v1/**?**"/>
<route path="/livy/v1"/>
<route path="/livy/v1/"/>
</routes>
</service>
Note that the name "livy" attribute and the path .../services/livy/... need to be the same. The route /livy/v1/sessions is a special treatment for the POST request to create a Livy session. The request body e.g. looks like: {"driverMemory":"2G","executorCores":4,"executorMemory":"8G","proxyUser":"bernhard","conf":{"spark.master":"yarn-cluster","spark.jars.packages":"com.databricks:spark-csv_2.10:1.5.0"} Livy server will use proxUser to run the Spark session. To avoid that a user can provide here any user (e.g. a more privileged), Knox will need to rewrite the the json body to replace what so ever is the value of proxyUser is with the username of the authenticated user, see next section. 1.2 Create a rewrite rule definition
Create a file
/usr/hdp/current/knox-server/data/services/livy/0.1.0/rewrite.xml with
<rules>
<rule name="LIVYSERVER/livy/user-name">
<rewrite template="{$username}"/>
</rule>
<rule dir="IN" name="LIVYSERVER/livy/root/inbound" pattern="*://*:*/**/livy/v1">
<rewrite template="{$serviceUrl[LIVYSERVER]}"/>
</rule>
<rule dir="IN" name="LIVYSERVER/livy/path/inbound" pattern="*://*:*/**/livy/v1/{path=**}?{**}">
<rewrite template="{$serviceUrl[LIVYSERVER]}/{path=**}?{**}"/>
</rule>
<filter name="LIVYSERVER/livy/addusername/inbound">
<content type="*/json">
<apply path="$.proxyUser" rule="LIVYSERVER/livy/user-name"/>
</content>
</filter>
</rules>
Note: The "v1" is only introduced to allow calls to Livy-server without any path. (Seems to be a limitation in Knox that at least one path element nees to be present in mapped URL. The rule LIVYSERVER/livy/user-name and the filter LIVYSERVER/livy/addusername/inbound are responsible to inject the authenticated user name as described in the last section. 1.3 Publish the new service via Ambari
Goto
Knox Configuration and add at the end of Advanced Topology:
<topology>
...
<service>
<role>LIVYSERVER</role>
<url>http://<livy-server>:8998</url>
</service>
</topology>
2 Use with Sparkmagic
Sparkmagic can be configured to use Livy Server in HDP 2.5, see Using Jupyter with Sparkmagic and Livy Server on HDP 2.5
To connect via Knox, change endpoint definition in
%manage_spark
Just replace Livy server URL with Knox gateway URL
https://<knox-gateway>:8443/livy/v1
If Knox does not have a valid certificate for HTTPS requests, reconfigure Sparkmagic's config.json end set
"ignore_ssl_errors": false
Credits Thanks to Kevin Minder for the article About Adding a service to Apache Knox
... View more
12-08-2016
11:02 AM
If it solves your question, please mark it as accepted so that it shows es resolved in the overview. Thanks
... View more
12-08-2016
09:55 AM
1 Kudo
You can try it yourself and let Spark explain the query (i.e. ask catalyst what it is doing). I would recommend to do the "explain" for your own query, see the example below 1) With SQL query spark.sql("select * from employees e, departments d, dept_emp de where e.emp_no = de.emp_no and d.dept_no = de.dept_no").explain Result: == Physical Plan ==
*Project [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43, dept_no#44, dept_name#45, emp_no#46, dept_no#47, from_date#48, to_date#49]
+- *BroadcastHashJoin [dept_no#47], [dept_no#44], Inner, BuildRight
:- *BroadcastHashJoin [emp_no#38], [emp_no#46], Inner, BuildRight
: :- *Filter isnotnull(emp_no#38)
: : +- HiveTableScan [emp_no#38, birth_date#39, first_name#40, last_name#41, gender#42, hire_date#43], MetastoreRelation employees, employees, e
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
: +- *Filter (isnotnull(dept_no#47) && isnotnull(emp_no#46))
: +- HiveTableScan [emp_no#46, dept_no#47, from_date#48, to_date#49], MetastoreRelation employees, dept_emp, de
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
+- *Filter isnotnull(dept_no#44)
+- HiveTableScan [dept_no#44, dept_name#45], MetastoreRelation employees, departments, d
2) Read via hive and then execute dataframe join val e = spark.sql("select * from employees.employees")
val d = spark.sql("select * from employees.departments")
val de = spark.sql("select * from employees.dept_emp")
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain Result: == Physical Plan ==
*Project [emp_no#265, dept_no#254, dept_name#255, from_date#267, to_date#268, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248]
+- *BroadcastHashJoin [emp_no#265], [emp_no#243], Inner, BuildRight
:- *Project [dept_no#254, dept_name#255, emp_no#265, from_date#267, to_date#268]
: +- *BroadcastHashJoin [dept_no#254], [dept_no#266], Inner, BuildRight
: :- *Filter isnotnull(dept_no#254)
: : +- HiveTableScan [dept_no#254, dept_name#255], MetastoreRelation employees, departments
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]))
: +- *Filter (isnotnull(dept_no#266) && isnotnull(emp_no#265))
: +- HiveTableScan [emp_no#265, dept_no#266, from_date#267, to_date#268], MetastoreRelation employees, dept_emp
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Filter isnotnull(emp_no#243)
+- HiveTableScan [emp_no#243, birth_date#244, first_name#245, last_name#246, gender#247, hire_date#248], MetastoreRelation employees, employees It is the same 3) Pure dataframes: val d = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("dept_no", "dept_name"):_*)
val e = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/employees")
.toDF(Seq("emp_no","birth_date","first_name","last_name","gender","hire_date"):_*)
val de = spark.read.format("orc")
.load("/apps/hive/warehouse/employees.db/departments")
.toDF(Seq("emp_no", "dept_no"):_*)
d.join(de, Seq("dept_no")).join(e, Seq("emp_no")).explain Result: == Physical Plan ==
*Project [emp_no#137, dept_no#96, dept_name#97, birth_date#115, first_name#116, last_name#117, gender#118, hire_date#119]
+- *BroadcastHashJoin [cast(emp_no#137 as double)], [cast(emp_no#114 as double)], Inner, BuildRight
:- *Project [dept_no#96, dept_name#97, emp_no#137]
: +- *BroadcastHashJoin [dept_no#96], [dept_no#138], Inner, BuildRight
: :- *Project [_col0#91 AS dept_no#96, _col1#92 AS dept_name#97]
: : +- *Filter isnotnull(_col0#91)
: : +- *Scan orc [_col0#91,_col1#92] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
: +- *Project [_col0#132 AS emp_no#137, _col1#133 AS dept_no#138]
: +- *Filter (isnotnull(_col1#133) && isnotnull(_col0#132))
: +- *Scan orc [_col0#132,_col1#133] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/departments, PushedFilters: [IsNotNull(_col1), IsNotNull(_col0)], ReadSchema: struct<_col0:string,_col1:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as double)))
+- *Project [_col0#101 AS emp_no#114, _col1#102 AS birth_date#115, _col2#103 AS first_name#116, _col3#104 AS last_name#117, _col4#105 AS gender#118, _col5#106 AS hire_date#119]
+- *Filter isnotnull(_col0#101)
+- *Scan orc [_col0#101,_col1#102,_col2#103,_col3#104,_col4#105,_col5#106] Format: ORC, InputPaths: hdfs://node1:8020/apps/hive/warehouse/employees.db/employees, PushedFilters: [IsNotNull(_col0)], ReadSchema: struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:string,_col5:string> Very similar, just the read is done differently. Especially it is the same type of joins
... View more
11-24-2016
07:24 PM
3 Kudos
You could split it into ExtractText with a dynamic property "json" as "(.*?<MyXML>)(.*?)(<\/MyXML>.*)" and ReplaceText as follows: "${json.1}${json.2:replace('"', '\\"')}${json.3}" (i.e. with 4 backslashes) This will create [ { "XML": "<MyXML> This is a \"test\" XML </MyXML>" } ]
... View more