Community Articles

Find and share helpful community-sourced technical articles.
Labels (1)
avatar
New Contributor

With Apache Spark 2.1, SparkR supports virtual environment. With VirtualEnv when you deploy Spark on YARN, each SparkR job will have its own library directory which binds to the executor’s YARN container. All necessary third-party libraries are installed into that directory. After the Spark job finishes, the directory is deleted and there is no interference with other Spark job’s environment.

SparkR now supports the following two scenarios:

1) Internet enabled cluster allows packages to be installed (say from CRAN) on the executors so that R packages can be used in the tasks.

2) For isolated cluster, the driver first downloads the required packages or download packages ahead of time and then add them to the cluster. Spark will distribute the packages to executors and we can install them when that executor runs.

Both of these scenarios install R packages in a separate local directory on executor for each user, and will not pollute that executor’s native R environment. This directory will be deleted after the executor exit.

The virtualenv support for SparkR is different from PySpark. SparkR is an interactive analytics tools, so people install third-party packages frequently during the user session rather doing it in advance. If users create a spark-shell or SparkR interpreter in Zeppelin, they can try different native R packages across this session. Since users may cache some data in executors and don’t want to restart the session with the cost of lost cached data, they can use any R packages in executors after this improvement.

Internet Connected Cluster:

> df <- createDataFrame(list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), c("a", "b", "c"))

> showDF(df)

+---+---+---+

| a| b| c|

+---+---+---+

| 1|1.0| 1|

| 2|2.0| 2|

| 3|3.0| 3|

+---+---+---+

> schema <- structType(structField("a", "integer"), structField("b", "double"),

structField("c", "string"), structField("d", "double"))

> df1 <- dapply(df, function(x) {

install.packages(“mvtnorm”, repos=“http://cran.us.r-project.org”)

library(mvtnorm)

x <- cbind(x, x$a * rmvnorm(n=1, mean=c(1))) },

schema)

> showDF(df1)

+---+---+---+--------------------------+

| a| b| c| d|

+---+---+---+--------------------------+

| 1|1.0| 1|1.4377518716229474|

| 2|2.0| 2|2.8755037432458947|

| 3|3.0| 3| 4.313255614868842|

+---+---+---+----------- ---------------+

Isolated Cluster::

> df <- createDataFrame(list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), c("a", "b", "c"))

> showDF(df)

+---+---+---+

| a| b| c|

+---+---+---+

| 1|1.0| 1|

| 2|2.0| 2|

| 3|3.0| 3|

+---+---+---+

> schema <- structType(structField("a", "integer"), structField("b", "double"),

structField("c", "string"), structField("d", "double"))

> path <- “/home/yliang/mvtnorm.tar.gz”

> spark.addFile(path)

> df1 <- dapply(df, function(x) {

path <- spark.getSparkFiles(“mvtnorm.tar.gz”)

install.packages(path, repos=NULL, type=“source”)

library(mvtnorm)

x <- cbind(x, x$a * rmvnorm(n=1, mean=c(1))) },

schema)

> showDF(df1)

+---+---+---+--------------------------+

| a| b| c| d|

+---+---+---+--------------------------+

| 1|1.0| 1|1.4377518716229474|

| 2|2.0| 2|2.8755037432458947|

| 3|3.0| 3| 4.313255614868842|

+---+---+---+----------- ---------------+

8,701 Views