Member since
01-07-2019
2
Posts
0
Kudos Received
0
Solutions
01-08-2019
02:20 PM
Hi everyone, this is my first post in the Hortonworks forum. If this is incomplete or unclear, kindly advise me how and where to improve it.
The problem is this: I successfully managed to import a MongoDB collection into Hive using Spark with the new Hive WareHouse APIs. Now I'm doing the reverse attempt, which is to transfer the Hive table to MongoDB using Spark.
The study I am doing is done on an Ambari cluster. The steps I take to save the table from Hive to MongoDB using Spark are as follows:
create a <Row> Dataset using HiveWareHouseSession in particular the executeQuery() or table() methods;
create the MongoSpark writeConfig;
write the collection. spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf spark.hadoop.hive.llap.daemon.service.hosts='@llap0' --conf spark.hadoop.hive.zookeeper.quorum='node1:2181,node2:2181,node3:2181' --conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://node2:2181,node3:2181,node1:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive' --conf spark.security.credentials.hiveserver2.enabled='false' --conf spark.datasource.hive.warehouse.metastoreUri='thrift://node3:9083' --conf spark.datasource.hive.warehouse.load.staging.dir='/tmp' --jars /usr/hdp/3.0.1.0-187/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1
Ivy Default Cache set to: /home/hdfs/.ivy2/cache
The jars for the packages stored in: /home/hdfs/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/3.0.1.0-187/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3da68c7e-d6e6-4bdb-853b-2af36ccc7bd0;1.0
confs: [default]
found org.mongodb.spark#mongo-spark-connector_2.11;2.3.1 in central
found org.mongodb#mongo-java-driver;3.8.2 in central
:: resolution report :: resolve 153ms :: artifacts dl 3ms
:: modules in use:
org.mongodb#mongo-java-driver;3.8.2 from central in [default]
org.mongodb.spark#mongo-spark-connector_2.11;2.3.1 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 0 | 0 | 0 || 2 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3da68c7e-d6e6-4bdb-853b-2af36ccc7bd0
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/4ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://node2:4040
Spark context available as 'sc' (master = yarn, app id = application_1546590067819_0103).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1.3.0.1.0-187
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112)
Type in expressions to have them evaluated.
Type :help for more information.
scala> import com.mongodb.spark._
import com.mongodb.spark._
scala> import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession
scala> import com.hortonworks.hwc.HiveWarehouseSession._
import com.hortonworks.hwc.HiveWarehouseSession._
scala> import com.mongodb.spark.config._
import com.mongodb.spark.config._
scala> val hive = HiveWarehouseSession.session(spark).userPassword("hive","password").build()
hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@7c523bdd
scala> hive.setDatabase("default")
scala> val df = hive.table("simpletab")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_id: struct<oid: string>, lastname: string ... 1 more field]
scala> df.show
19/01/08 10:37:25 WARN TaskSetManager: Stage 0 contains a task of very large size (431 KB). The maximum recommended task size is 100 KB.
+--------------------+-------+-------+
| _id|lastname| name|
+--------------------+-------+-------+
|[5c335d0543466c1e...|rossi|mario|
+--------------------+-------+-------+
scala> df.printSchema
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- lastname: string (nullable = true)
|-- name: string (nullable = true)
scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://192.168.1.11:27017", "database" -> "test", "collection" -> "simpleImport"), Some(WriteConfig(sc)))
writeConfig: com.mongodb.spark.config.WriteConfig = WriteConfig(test2,simpleImport,Some(mongodb://172.24.161.11:27017),true,512,15,WriteConcernConfig(None,None,None,None),None,false,true)
scala> MongoSpark.save(df, writeConfig)
19/01/08 10:37:54 WARN TaskSetManager: Stage 1 contains a task of very large size (432 KB). The maximum recommended task size is 100 KB.
[Stage 1:> (0 + 1) / 1]
Writing is done successfully, but the Spark write stage is not completed and remains in running until it is killed.
In the various tests performed, if I create a Dataset <Row> manually and try to write it in MongoDB everything happens correctly and the job is completed.
Also if I transfer the same table read from the Hive Catalog to the Spark Catalog, I can then save it correctly from Spark Catalog to MongoDB.
... View more
Labels:
01-07-2019
11:38 AM
Hi Hyukjin Kwon and thank you for your article! I'm trying to write to MongoDB, using spark, a DataSet<Row> created from method HiveWarehouseSession.executeQuery. MongoSpark.save command is able to write DataSet to the selected collection but the Job not end. You know a best way to do that. I'm using Ambari Cluster. Thank you. https://community.hortonworks.com/questions/231385/save-dataframe-loaded-from-hive-to-mongodb-using-s.html
... View more