Support Questions
Find answers, ask questions, and share your expertise

Save dataframe loaded from Hive to MongoDB using Spark

Save dataframe loaded from Hive to MongoDB using Spark

New Contributor

Hi everyone, this is my first post in the Hortonworks forum. If this is incomplete or unclear, kindly advise me how and where to improve it.

The problem is this: I successfully managed to import a MongoDB collection into Hive using Spark with the new Hive WareHouse APIs. Now I'm doing the reverse attempt, which is to transfer the Hive table to MongoDB using Spark.

The study I am doing is done on an Ambari cluster. The steps I take to save the table from Hive to MongoDB using Spark are as follows:

  1. create a <Row> Dataset using HiveWareHouseSession in particular the executeQuery() or table() methods;
  2. create the MongoSpark writeConfig;
  3. write the collection.
spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf spark.hadoop.hive.llap.daemon.service.hosts='@llap0' --conf spark.hadoop.hive.zookeeper.quorum='node1:2181,node2:2181,node3:2181' --conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://node2:2181,node3:2181,node1:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive' --conf'false' --conf spark.datasource.hive.warehouse.metastoreUri='thrift://node3:9083' --conf spark.datasource.hive.warehouse.load.staging.dir='/tmp' --jars /usr/hdp/ --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1

Ivy Default Cache set to: /home/hdfs/.ivy2/cache
The jars for the packages stored in: /home/hdfs/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/!/org/apache/ivy/core/settings/ivysettings.xml

org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3da68c7e-d6e6-4bdb-853b-2af36ccc7bd0;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.11;2.3.1 in central
	found org.mongodb#mongo-java-driver;3.8.2 in central
:: resolution report :: resolve 153ms :: artifacts dl 3ms
	:: modules in use:
	org.mongodb#mongo-java-driver;3.8.2 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.11;2.3.1 from central in [default]
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
:: retrieving :: org.apache.spark#spark-submit-parent-3da68c7e-d6e6-4bdb-853b-2af36ccc7bd0
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/4ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node2:4040
Spark context available as 'sc' (master = yarn, app id = application_1546590067819_0103).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import com.mongodb.spark._
import com.mongodb.spark._

scala> import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession

scala> import com.hortonworks.hwc.HiveWarehouseSession._
import com.hortonworks.hwc.HiveWarehouseSession._

scala> import com.mongodb.spark.config._
import com.mongodb.spark.config._

scala> val hive = HiveWarehouseSession.session(spark).userPassword("hive","password").build()
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@7c523bdd

scala> hive.setDatabase("default")

scala> val df = hive.table("simpletab")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_id: struct<oid: string>, lastname: string ... 1 more field]

19/01/08 10:37:25 WARN TaskSetManager: Stage 0 contains a task of very large size (431 KB). The maximum recommended task size is 100 KB.
|                 _id|lastname|   name|

scala> df.printSchema
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- name: string (nullable = true)

scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://",  "database" -> "test", "collection" -> "simpleImport"), Some(WriteConfig(sc)))
writeConfig: com.mongodb.spark.config.WriteConfig = WriteConfig(test2,simpleImport,Some(mongodb://,true,512,15,WriteConcernConfig(None,None,None,None),None,false,true)

scala>, writeConfig)
19/01/08 10:37:54 WARN TaskSetManager: Stage 1 contains a task of very large size (432 KB). The maximum recommended task size is 100 KB.
[Stage 1:>                                                          (0 + 1) / 1]

Writing is done successfully, but the Spark write stage is not completed and remains in running until it is killed. In the various tests performed, if I create a Dataset <Row> manually and try to write it in MongoDB everything happens correctly and the job is completed. Also if I transfer the same table read from the Hive Catalog to the Spark Catalog, I can then save it correctly from Spark Catalog to MongoDB.