I'm currently trying to build a graph from existing data in our RDBMS and when attempting to build the vertices for the graph, I am receiving a Java Heap Space exception from spark. I have increased the executor memory to 64G and the spark driver memory to 512G to no avail.
Have you looked at the Zeppelin memory configuration?
It may not be spark causing the error.
In Ambari you can navigate to:
"Zeppelin Notebook" --> "Configs" (Tab) --> "Advanced zeppelin-env"
'and then find the "zeppelin_env_content" there you will find "export ZEPPELIN_MEM" that you can edit.
That's my first thought based on what you are saying.
Current values are:
export ZEPPELIN_MEM="-Xms32768m -Xmx32768m -XX:MaxPermSize=64G"
export ZEPPELIN_INTP_MEM="-Xms32768m -Xmx32768m -XX:MaxPermSize=64G"
zeppelin.executor.mem = 64G
zeppelin.executor.instances = 2
I have also increased the memory values for Spark to no avail:
# Options read in YARN client mode
SPARK_EXECUTOR_INSTANCES="9" #Number of workers to start (Default: 2) SPARK_EXECUTOR_CORES="4" #Number of cores for the workers (Default: 1). SPARK_EXECUTOR_MEMORY="64G" #Memory per Worker (e.g. 1000M, 2G) (Default: 1G) SPARK_DRIVER_MEMORY="128G" #Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
Hi @Aaron Dunlap,
Can you give us more information on your cluster architecture:
1. Total YARN memory
2. Have you tried using Spark shell?
3. Size of dataset used to build the graph
4. What kind of graph processing are you doing?
5. Can you post the code?
We're thinking there might node hotspots.
GraphX is not ready for prime time, it is in technical preview (please see table 1.1 https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_spark-component-guide/content/ch_introdu... since it is in Alpha state in the community.
We should start by first looking at how much data is being used to compute the Graph? If the graph is bigger than what is allocated to the executors , OOM is expected. So if you find out how many vertices and what kind of graph computation is being done we can try to dig deeper.
1) Yarn memory (assuming you mean Node memory on the settings page of Yarn): 216GB
2) Using spark-shell, the task to build the vertices hangs on stage 6:
scala> val vertices: RDD[(Long,Long)] = cleanedCircuits.select("circuit_id").rdd.map(row => (row(0).toString.toLong, row(0).toString.toLong)) [Stage 6:> (0 + 48) / 200]
3) The data set is quite large, but this occurs even when I limit the dataset to 10,000 records. The dataset is two columns, one of which contains node IDs and the other contains nodes to which those nodes are connected.
4) Currently just attempting to create the graph. This code was previously working, but we ran out of space on the /var file system and I attempted to move the YARN cache directory to a mounted drive and deleted the existing cache files. The deletion of the cache files caused quite a few issues with zeppelin and yarn, however those issues have been worked out and both are functioning correctly now. The memory issues is the only outstanding issue.
import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val circuit = spark.read.parquet("/tmp/ngmss/circuit") val circuit_position = spark.read.parquet("/tmp/ngmss/circuit_position") val circuit_position_pending = spark.read.parquet("/tmp/ngmss/circuit_position_pending") val ns_con_rel = spark.read.parquet("/tmp/ngmss/ns_con_rel") circuit.createOrReplaceTempView("circuit") circuit_position.createOrReplaceTempView("circuit_position") circuit_position_pending.createOrReplaceTempView("circuit_position_pending") ns_con_rel.createOrReplaceTempView("ns_con_rel") circuit.createOrReplaceTempView("circuit") circuit_position.createOrReplaceTempView("circuit_position") circuit_position_pending.createOrReplaceTempView("circuit_position_pending") ns_con_rel.createOrReplaceTempView("ns_con_rel") val cp = spark.sql("SELECT circuit_design_id, circuit_design_id_3 FROM circuit_position WHERE circuit_node_status IN ('3','4')") val cpp = spark.sql("SELECT circuit_design_id,circuit_design_id_3 FROM circuit_position_pending WHERE circuit_node_status IN ('3','4')") val ncr = spark.sql("SELECT circuit_design_id_parent as circuit_design_id, circuit_Design_id_child as circuit_design_id_3 FROM ns_con_rel WHERE ns_con_rel_status_cd IN ('2','3')") // List of similar columns between all tables val col_names = Seq[String]("circuit_design_id","circuit_design_id_3") // Map all column names to column objects val cols = col_names.map(name => col(name)) // Union the selection of the desired columns into a single dataframe val alltbls = cp.select(cols: _*).unionAll(cpp.select(cols: _*)).unionAll(ncr.select(cols: _*)) alltbls.cache() alltbls.createOrReplaceTempView("alltbls") val rootnodes = spark.sql("select '1' as parent,circuit_design_id as circuit_id from circuit where circuit_design_id not in (select distinct circuit_design_id from alltbls)") val children = alltbls.selectExpr("circuit_design_id as parent","circuit_design_id_3 as circuit_id") val allcircuits = rootnodes.unionAll(children) allcircuits.createOrReplaceTempView("allcircuits") val cleanedCircuits = spark.sql("SELECT * FROM allcircuits WHERE parent IS NOT NULL AND circuit_id IS NOT NULL") val vertices: RDD[(Long,Long)] = cleanedCircuits.select("circuit_id").rdd.map(row => (row(0).toString.toLong, row(0).toString.toLong)) val edges: RDD[Edge[Long]] = cleanedCircuits.select("circuit_id","parent").rdd.map(row => Edge(row(0).toString.toLong,row(1).toString.toLong)) val g: Graph[Long,Long] = Graph(vertices,edges,0)
There's one bug in zeppelin that spark.driver.memory won't take effect for zeppelin spark interpreter
From the comments above I still don't know where's OOM happens (driver or executor). It is better to attach the stacktrace and try spark-shell in yarn-client mode first.
To reduce the complexities, would it be possible to run the code in spark-shell with the same executor memory configuration that is allocated via Zeppelin. That way we can avoid any Zeppelin issue and isolate better where the issue lies?