Member since
09-24-2015
98
Posts
76
Kudos Received
18
Solutions
My Accepted Solutions
Title | Views | Posted |
---|---|---|
2819 | 08-29-2016 04:42 PM | |
5637 | 08-09-2016 08:43 PM | |
1716 | 07-19-2016 04:08 PM | |
2425 | 07-07-2016 04:05 PM | |
7320 | 06-29-2016 08:25 PM |
09-13-2016
03:27 PM
2 Kudos
@Kirk Haslbeck Good question, and thanks for the diagrams. Here are some more details to consider. It is a good point that each JVM-based worker can have multiple "cores" that run tasks in a multi-threaded environment. There are benefits to running multiple executors on a single node (single JVM) to take advantage of the multi-core processing power, and to reduce the total JVM overhead per executor. Obviously, the JVM has to startup and initialize certain data structures before it can begin running tasks. From Spark docs, we configure number of cores using these parameters: spark.driver.cores = Number of cores to use for the driver process spark.executor.cores = The number of cores to use on each executor You also want to watch out for this parameter, which can be used to limit the total cores used by Spark across the cluster (i.e., not each worker): spark.cores.max = the maximum amount of CPU cores to request for the application from across the cluster (not from each machine) Finally, here is a description from Databricks, aligning the terms "cores" and "slots": "Terminology:
We're using the term “slots” here
to indicate threads available to perform parallel work for Spark. Spark documentation
often refers to these threads as “cores”,
which is a confusing term, as the
number of slots available on a particular machine does not necessarily have any
relationship to the number of physical CPU
cores on that machine."
... View more
08-30-2016
03:43 PM
Yep, this worked for me as well. Thanks.
... View more
08-29-2016
04:42 PM
1 Kudo
Hello @Rendiyono Wahyu Saputro Yes, you can import python libraries and use them in Spark, which supports a full Python API via the pyspark shell. For instance, if you wanted to load and use the python scikit-fuzzy library to run fuzzy logic, then you just: 1) Download python library, either using maven update to local repo, or directly via github, and add the library to your Spark classpath 2) Kick off job with pyspark shell (Example: $ pyspark --jars /path/to/scikit-fuzzy.jar ) 3) Import python library in your code (Example: "import skfuzzy as fuzz") 4) Use the library More information about scikit-fuzzy library here: https://pypi.python.org/pypi/scikit-fuzzy Hints about dependencies and install: Scikit-Fuzzy depends on
NumPy >= 1.6 SciPy >= 0.9 NetworkX >= 1.9 and is available on PyPi! The lastest stable release can always be obtained and installed simply by running $ pip install -U scikit-fuzzy
... View more
08-23-2016
07:53 PM
8 Kudos
First, you should go to the Apache Spark downloads web page to download Spark 2.0. Link to Spark downloads page: http://spark.apache.org/downloads.html Set your download options (shown in image below), and click on the link next to "Download Spark" (i.e. "spark-2.0.0-bin-hadoop2.7.tgz"): This will download the gzipped tarball to your computer. Next, startup the HDP 2.5 Sandbox image within your virtual machine (either using VirtualBox or VMFusion). Once the image is booted, startup a Terminal session on your laptop and copy the tarball to the VM. Here is an example using the 'scp' (secure copy) command, although you can use any file copy mechanism. scp -p 2222 spark-2.0.0-bin-hadoop2.7.tgz root@127.0.0.1:~ This will copy the file to the 'root' user's home directory on the VM. Next, login (via ssh) to the VM: ssh -p 2222 root@127.0.0.1 Once logged in, unzip the tarball with this command: tar -xvzf spark-2.0.0-bin-hadoop2.7.tgz You can now navigate to the "seed" directory already created for Spark 2.0, and move the contents from the unzipped tar file into the current directory: cd /usr/hdp/current/spark2-client
mv ~/spark-2.0.0-bin-hadoop2.7/* . Next, change the ownership of the new files to match the local directory: chown -R root:root * Now, setup the SPARK_HOME environment variable for this session (or permanently by adding it to ~/.bash_profile) export SPARK_HOME=/usr/hdp/current/spark2-client Let's create the config files that we can edit them to configure Spark in the "conf" directory. cd conf
cp spark-env.sh.template spark-env.sh
cp spark-defaults.conf.template spark-defaults.conf
Edit the config files with a text editor (like vi or vim), and make sure the following environment variables and/or parameters are set below. Add the following lines to the file 'spark-env.sh' and then save the file: HADOOP_CONF_DIR=/etc/hadoop/conf
SPARK_EXECUTOR_INSTANCES=2
SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=512M
SPARK_DRIVER_MEMORY=512M
Now, replace the lines in the "spark-defaults.conf" file to match this content, and then save the file: spark.driver.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
spark.executor.extraLibraryPath /usr/hdp/current/hadoop-client/lib/native
spark.driver.extraJavaOptions -Dhdp.version=2.5.0.0-817
spark.yarn.am.extraJavaOptions -Dhdp.version=2.5.0.0-817
spark.eventLog.dir hdfs:///spark-history
spark.eventLog.enabled true
# Required: setting this parameter to 'false' turns off ATS timeline server for Spark
spark.hadoop.yarn.timeline-service.enabled false
#spark.history.fs.logDirectory hdfs:///spark-history
#spark.history.kerberos.keytab none
#spark.history.kerberos.principal none
#spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
#spark.history.ui.port 18080
spark.yarn.containerLauncherMaxThreads 25
spark.yarn.driver.memoryOverhead 200
spark.yarn.executor.memoryOverhead 200
#spark.yarn.historyServer.address sandbox.hortonworks.com:18080
spark.yarn.max.executor.failures 3
spark.yarn.preserve.staging.files false
spark.yarn.queue default
spark.yarn.scheduler.heartbeat.interval-ms 5000
spark.yarn.submit.file.replication 3
spark.ui.port 4041
Now that your config files are setup, change directory back to your $SPARK_HOME: cd /usr/hdp/current/spark2-client Before running a Spark application, you need to change 2 YARN settings to enable Yarn to allocate enough memory to run the jobs on the Sandbox. To change the Yarn settings, login to the Ambari console (http://127.0.0.1:8080/), and click on the "YARN" service along the left-hand side of the screen. Once the YARN Summary page is drawn, find the "Config" tab along top and click on it. Scroll down and you will see the "Settings" (not Advanced). Change the settings described below: Note: Use the Edit/pencil icon to set each parameter to the exact values 1) Memory Node (Memory allocated for all YARN containers on a node) = 7800MB 2) Container (Maximum Container Size (Memory)) = 2500MB Alternately, if you click the "Advanced" tab next to Settings, here are the exact config parameter names you want to edit: yarn.scheduler.maximum-allocation-mb = 2500MB
yarn.nodemanager.resource.memory-mb = 7800MB
After editing these parameters, click on the green "Save" button above the settings in Ambari. You will now need to Restart all affected services (Note: a yellow "Restart" icon should show up once the config settings are saved by Ambari; you can click on that button and select "Restart all affected services"). It may be faster to navigate to the Hosts page via the Tab, click on the single host, and look for the "Restart" button there. Make sure that YARN is restarted successfully. Below is an image showing the new YARN settings: Finally, you are ready to run the packaged SparkPi example using Spark 2.0.
In order to run SparkPi on YARN (yarn-client mode), run the command below, which switches user to "spark" and uses spark-submit to launch the precompiled SparkPi example program: su spark --command "bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 1 examples/jars/spark-examples*.jar 10"
You should see lots of lines of debug/stderr output, followed by a results line similar to this: Pi is roughly 3.144799144799145 Note: To run the SparkPi example in standalone mode, without the use of YARN, you can run this command: ./bin/run-example SparkPi
... View more
Labels:
08-16-2016
11:00 AM
3 Kudos
Apache Zeppelin (version 0.6.0) includes the ability to securely authenticate users and require logins. It uses the Apache Shiro security framework to accomplish this objective. Note: prior versions of Zeppelin did not force users to login. After launching the HDP 2.5 Tech Preview Sandbox on a virtual machine, make sure the Zeppelin service is up and running via Ambari. Next, open the Zeppelin UI either by clicking on: Services (tab) -> Zeppelin notebook (left-hand panel) -> Quick Links (tab) -> "Zeppelin UI" (button) or just by opening a browser at: http://sandbox.hortonworks.com:9995/ (or http://127.0.0.1:9995/) The Zeppelin welcome page should show in the browser, and you should notice a "Login" button in the upper right-hand corner. This will bring up a pop-up window with text entries for username and password. Enter one of the username/password pairs below (these are the defaults listed in the "shiro.ini" file located in the "conf" sub-directory of zeppelin): Username/Password pairs:
admin/password1
user1/password2
user2/password3
user3/password4
If you want to change these passwords or add more users, you can use the "Credentials" tab of the Zeppelin notebook to create additional usernames. After entering the credentials, you will be logged in and the existing notebooks will display on the left-hand side of the Zeppelin screen. If you enter the wrong username or password, you will be directed back to the Welcome page. FYI: For more information about Zeppelin security, see this link: https://github.com/apache/zeppelin/blob/master/SECURITY-README.md FYI: For more detailed information about Apache Shiro configuration options, see this link: http://shiro.apache.org/configuration.html#Configuration-INISections
... View more
Labels:
08-09-2016
09:22 PM
4 Kudos
Just a few months ago, Apache Storm announced release 1.0 for the distribution. The bullet points below summarize the new features available. For more detailed descriptions, you can go to this link to read the full release notes: http://storm.apache.org/2016/04/12/storm100-released.html Apache Storm 1.0
Release Apache Storm 1.0 is *up to 16 times faster than
previous versions, with latency reduced up to 60%.” Pacemaker – Heartbeat
Server Pacemaker is an optional Storm daemon designed
to process heartbeats from workers. (overcomes scaling problems of
Zookeeper) Distributed
Cache API Files in the distributed cache can be updated
at any time from the command line, without the need to redeploy a
topology. HA
Nimbus Multiple instances of the Nimbus service run in
a cluster and perform leader election when a Nimbus node fails Native
Streaming Window API Storm has support for sliding and tumbling
windows based on time duration and/or event count. Automatic
Backpressure Storm now has an automatic backpressure
mechanism based on configurable high/low watermarks expressed as a percentage
of a task's buffer size. Resource
Aware Scheduler The new resources aware scheduler (AKA "RAS
Scheduler") allows users to specify the memory and CPU requirements for
individual topology components Storm makes it easier to debug, with… Dynamic Log Levels Tuple Sampling and
Debugging Dynamic Worker
Profiling
... View more
Labels:
08-09-2016
08:43 PM
First, you should try to take advantage if your data is stored in splittable formats (snappy, LZO, bzip2, etc). If so, then instruct Spark to split the data into multiple partitions upon read. In Scala, you can do this: file = sc.textFile(Path, numPartitions) You will also need to tune your YARN container sizes to work with your executor allocation. Make sure your Max Yarn Mem Alloc ('yarn.scheduler.maximum-allocation-mb') is bigger than what you are asking for per executor (this will include the default overhead of 384 MB). The following parameters are used to allocate Spark executors and driver memory: spark.executor.instances -- number of spark executors
spark.executor.memory -- memory per spark executors (plus 384 MB overhead)
spark.driver.memory -- memory per spark driver 6MB file is pretty small, much smaller than HDFS block size, so
you are probably getting a single partition until you do something to
repartition it. You can also set numPartitions parameter like this: I would probably call one of these repartition methods on
your DataFrame: def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame
Returns a new DataFrame partitioned by the given partitioning expressions into numPartitions. The resulting DataFrame is hash partitioned.
OR this: def repartition(numPartitions: Int): DataFrame
Returns a new DataFrame that has exactly numPartitions partitions.
... View more
08-09-2016
08:28 PM
The issue is that the input data files to Spark are very small, about 6 MB (<100000
records). However, the required processing/calculations are heavy, which would benefit from running in multiple executors. Currently, all processing is running on a single executor
even when specifying multiple executors to spark-submit.
... View more
Labels:
- Labels:
-
Apache Spark
07-21-2016
04:55 PM
1 Kudo
@Rene Rene Documentation about Dynamic Execution says the following (bold mine): There are two requirements for using this feature. First, your application must set spark.dynamicAllocation.enabled to true . Second, you must set up an external shuffle service on each worker node in the same cluster and set spark.shuffle.service.enabled to true in your application. The purpose of the external shuffle service is to allow executors to be removed without deleting shuffle files written by them (more detail described below). The way to set up this service varies across cluster managers: In standalone mode, simply start your workers with spark.shuffle.service.enabled set to true . In Mesos coarse-grained mode, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh on all slave nodes with spark.shuffle.service.enabled set to true . For instance, you may do so through Marathon. In YARN mode, start the shuffle service on each NodeManager as follows: Build Spark with the YARN profile. Skip this step if you are using a pre-packaged distribution. Locate the spark-<version>-yarn-shuffle.jar . This should be under $SPARK_HOME/network/yarn/target/scala-<version> if you are building Spark yourself, and under lib if you are using a distribution. Add this jar to the classpath of all NodeManager s in your cluster. In the yarn-site.xml on each node, add spark_shuffle to yarn.nodemanager.aux-services , then set yarn.nodemanager.aux-services.spark_shuffle.class to org.apache.spark.network.yarn.YarnShuffleService . Restart all NodeManager s in your cluster. All other relevant configurations are optional and under the spark.dynamicAllocation.* and spark.shuffle.service.* namespaces. For more detail, see the configurations page. Reference Link: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
... View more
07-20-2016
10:43 PM
Since you are using Jupyter with Spark, you might consider looking at Livy. Livy is an open source REST server for Spark. When you execute a code cell in a PySpark notebook, it creates a Livy session to execute your code. Livy allows multiple users to share the same Spark server through "impersonation support". This should hopefully allow you to access objects using your logged in username. The link below documents the REST commands you can use (for instance, you can use the %%info magic to display the current Livy session information): https://github.com/cloudera/livy/tree/6fe1e80cfc72327c28107e0de20c818c1f13e027#post-sessions
... View more