Created 01-08-2019 02:20 PM
Hi everyone, this is my first post in the Hortonworks forum. If this is incomplete or unclear, kindly advise me how and where to improve it.
The problem is this: I successfully managed to import a MongoDB collection into Hive using Spark with the new Hive WareHouse APIs. Now I'm doing the reverse attempt, which is to transfer the Hive table to MongoDB using Spark.
The study I am doing is done on an Ambari cluster. The steps I take to save the table from Hive to MongoDB using Spark are as follows:
spark-shell --driver-memory 512m --executor-memory 512m --executor-cores 1 --conf spark.hadoop.hive.llap.daemon.service.hosts='@llap0' --conf spark.hadoop.hive.zookeeper.quorum='node1:2181,node2:2181,node3:2181' --conf spark.sql.hive.hiveserver2.jdbc.url='jdbc:hive2://node2:2181,node3:2181,node1:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive' --conf spark.security.credentials.hiveserver2.enabled='false' --conf spark.datasource.hive.warehouse.metastoreUri='thrift://node3:9083' --conf spark.datasource.hive.warehouse.load.staging.dir='/tmp' --jars /usr/hdp/3.0.1.0-187/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.0.1.0-187.jar --packages org.mongodb.spark:mongo-spark-connector_2.11:2.3.1 Ivy Default Cache set to: /home/hdfs/.ivy2/cache The jars for the packages stored in: /home/hdfs/.ivy2/jars :: loading settings :: url = jar:file:/usr/hdp/3.0.1.0-187/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.mongodb.spark#mongo-spark-connector_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-3da68c7e-d6e6-4bdb-853b-2af36ccc7bd0;1.0 confs: [default] found org.mongodb.spark#mongo-spark-connector_2.11;2.3.1 in central found org.mongodb#mongo-java-driver;3.8.2 in central :: resolution report :: resolve 153ms :: artifacts dl 3ms :: modules in use: org.mongodb#mongo-java-driver;3.8.2 from central in [default] org.mongodb.spark#mongo-spark-connector_2.11;2.3.1 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 2 | 0 | 0 | 0 || 2 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-3da68c7e-d6e6-4bdb-853b-2af36ccc7bd0 confs: [default] 0 artifacts copied, 2 already retrieved (0kB/4ms) Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://node2:4040 Spark context available as 'sc' (master = yarn, app id = application_1546590067819_0103). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.3.1.3.0.1.0-187 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_112) Type in expressions to have them evaluated. Type :help for more information. scala> import com.mongodb.spark._ import com.mongodb.spark._ scala> import com.hortonworks.hwc.HiveWarehouseSession import com.hortonworks.hwc.HiveWarehouseSession scala> import com.hortonworks.hwc.HiveWarehouseSession._ import com.hortonworks.hwc.HiveWarehouseSession._ scala> import com.mongodb.spark.config._ import com.mongodb.spark.config._ scala> val hive = HiveWarehouseSession.session(spark).userPassword("hive","password").build() hive: com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl = com.hortonworks.spark.sql.hive.llap.HiveWarehouseSessionImpl@7c523bdd scala> hive.setDatabase("default") scala> val df = hive.table("simpletab") df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_id: struct<oid: string>, lastname: string ... 1 more field] scala> df.show 19/01/08 10:37:25 WARN TaskSetManager: Stage 0 contains a task of very large size (431 KB). The maximum recommended task size is 100 KB. +--------------------+-------+-------+ | _id|lastname| name| +--------------------+-------+-------+ |[5c335d0543466c1e...|rossi|mario| +--------------------+-------+-------+ scala> df.printSchema root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- lastname: string (nullable = true) |-- name: string (nullable = true) scala> val writeConfig = WriteConfig(Map("uri" -> "mongodb://192.168.1.11:27017", "database" -> "test", "collection" -> "simpleImport"), Some(WriteConfig(sc))) writeConfig: com.mongodb.spark.config.WriteConfig = WriteConfig(test2,simpleImport,Some(mongodb://172.24.161.11:27017),true,512,15,WriteConcernConfig(None,None,None,None),None,false,true) scala> MongoSpark.save(df, writeConfig) 19/01/08 10:37:54 WARN TaskSetManager: Stage 1 contains a task of very large size (432 KB). The maximum recommended task size is 100 KB. [Stage 1:> (0 + 1) / 1]
Writing is done successfully, but the Spark write stage is not completed and remains in running until it is killed. In the various tests performed, if I create a Dataset <Row> manually and try to write it in MongoDB everything happens correctly and the job is completed. Also if I transfer the same table read from the Hive Catalog to the Spark Catalog, I can then save it correctly from Spark Catalog to MongoDB.