<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>question How to write data from dStream into permanent Hive table in Archives of Support Questions (Read Only)</title>
    <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/42240#M32742</link>
    <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp; I tried to make a simple application in Spark Streaming which reads every 5s new data from HDFS and simply inserts into a Hive table. On the official Spark web site I have found an example, how to perform SQL operations on DStream data, via foreachRDD function, but the catch is, that the example used sqlContext and transformed the data from RDD to DataFrame. &amp;nbsp;The problem is, that with this DF, the data cannot be saved (appended) to an existing permanent Hive table. HiveContext has to be created.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;So I tried this program, it works, but fails after a while, because runs out of memory, because it creates every time a new HiveContext object.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I tried to create the HiveContext BEFORE the map, and broadcast it, but it failed.&lt;/P&gt;&lt;P&gt;I tried to call getOrCreate, which works fine with sqlContext but not with hiveContext.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Any ideas?&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Tomas&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Snímka.PNG" style="width: 600px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/1846i39DE7D98949F8C55/image-size/large?v=v2&amp;amp;px=999" role="button" title="Snímka.PNG" alt="Snímka.PNG" /&gt;&lt;/span&gt;﻿&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;val sparkConf = new SparkConf().setAppName("StreamHDFSdata")&lt;BR /&gt;sparkConf.set("spark.dynamicAllocation.enabled","false")&lt;BR /&gt;val ssc = new StreamingContext(sparkConf, Seconds(5))&lt;BR /&gt;ssc.checkpoint("/user/hdpuser/checkpoint")&lt;BR /&gt;val sc = ssc.sparkContext&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;val smDStream = ssc.textFileStream("/user/hdpuser/data")&lt;BR /&gt;val smSplitted = smDStream.map( x =&amp;gt; x.split(";") ).map( x =&amp;gt; Row.fromSeq( x ) )&lt;BR /&gt;val smStruct = StructType( (0 to 10).toList.map( x =&amp;gt; "col"+x.toString).map( y =&amp;gt; StructField( y , StringType, true ) ) )&lt;BR /&gt;&lt;BR /&gt;//val hiveCx = new org.apache.spark.sql.hive.HiveContext(sc)&lt;BR /&gt;//val sqlBc = sc.broadcast( hiveCx )&lt;BR /&gt;&lt;BR /&gt;smSplitted.foreachRDD( rdd =&amp;gt; {&lt;BR /&gt;//val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) --&amp;gt; sqlContext cannot be used for permanent table create&lt;BR /&gt;val sqlContext = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)&lt;BR /&gt;//val sqlContext = sqlBc.value --&amp;gt; THIS DOES NOT WORK: fail during runtime&lt;BR /&gt;//val sqlContext = new HiveContext.getOrCreate(rdd.sparkContext) --&amp;gt; THIS DOES NOT WORK EITHER: fail during runtime&lt;/P&gt;&lt;P&gt;//import hiveCx.implicits._&lt;BR /&gt;val smDF = sqlContext.createDataFrame( rdd, smStruct )&lt;BR /&gt;//val smDF = rdd.toDF&lt;BR /&gt;smDF.registerTempTable("sm")&lt;BR /&gt;val smTrgPart = sqlContext.sql("insert into table onlinetblsm select * from sm")&lt;BR /&gt;smTrgPart.write.mode(SaveMode.Append).saveAsTable("onlinetblsm")&lt;BR /&gt;} )&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Fri, 16 Sep 2022 10:27:18 GMT</pubDate>
    <dc:creator>Tomas79</dc:creator>
    <dc:date>2022-09-16T10:27:18Z</dc:date>
    <item>
      <title>How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/42240#M32742</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;&lt;P&gt;&amp;nbsp; I tried to make a simple application in Spark Streaming which reads every 5s new data from HDFS and simply inserts into a Hive table. On the official Spark web site I have found an example, how to perform SQL operations on DStream data, via foreachRDD function, but the catch is, that the example used sqlContext and transformed the data from RDD to DataFrame. &amp;nbsp;The problem is, that with this DF, the data cannot be saved (appended) to an existing permanent Hive table. HiveContext has to be created.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;So I tried this program, it works, but fails after a while, because runs out of memory, because it creates every time a new HiveContext object.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I tried to create the HiveContext BEFORE the map, and broadcast it, but it failed.&lt;/P&gt;&lt;P&gt;I tried to call getOrCreate, which works fine with sqlContext but not with hiveContext.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Any ideas?&lt;/P&gt;&lt;P&gt;Thanks&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Tomas&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Snímka.PNG" style="width: 600px;"&gt;&lt;img src="https://community.cloudera.com/t5/image/serverpage/image-id/1846i39DE7D98949F8C55/image-size/large?v=v2&amp;amp;px=999" role="button" title="Snímka.PNG" alt="Snímka.PNG" /&gt;&lt;/span&gt;﻿&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;val sparkConf = new SparkConf().setAppName("StreamHDFSdata")&lt;BR /&gt;sparkConf.set("spark.dynamicAllocation.enabled","false")&lt;BR /&gt;val ssc = new StreamingContext(sparkConf, Seconds(5))&lt;BR /&gt;ssc.checkpoint("/user/hdpuser/checkpoint")&lt;BR /&gt;val sc = ssc.sparkContext&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;val smDStream = ssc.textFileStream("/user/hdpuser/data")&lt;BR /&gt;val smSplitted = smDStream.map( x =&amp;gt; x.split(";") ).map( x =&amp;gt; Row.fromSeq( x ) )&lt;BR /&gt;val smStruct = StructType( (0 to 10).toList.map( x =&amp;gt; "col"+x.toString).map( y =&amp;gt; StructField( y , StringType, true ) ) )&lt;BR /&gt;&lt;BR /&gt;//val hiveCx = new org.apache.spark.sql.hive.HiveContext(sc)&lt;BR /&gt;//val sqlBc = sc.broadcast( hiveCx )&lt;BR /&gt;&lt;BR /&gt;smSplitted.foreachRDD( rdd =&amp;gt; {&lt;BR /&gt;//val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) --&amp;gt; sqlContext cannot be used for permanent table create&lt;BR /&gt;val sqlContext = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)&lt;BR /&gt;//val sqlContext = sqlBc.value --&amp;gt; THIS DOES NOT WORK: fail during runtime&lt;BR /&gt;//val sqlContext = new HiveContext.getOrCreate(rdd.sparkContext) --&amp;gt; THIS DOES NOT WORK EITHER: fail during runtime&lt;/P&gt;&lt;P&gt;//import hiveCx.implicits._&lt;BR /&gt;val smDF = sqlContext.createDataFrame( rdd, smStruct )&lt;BR /&gt;//val smDF = rdd.toDF&lt;BR /&gt;smDF.registerTempTable("sm")&lt;BR /&gt;val smTrgPart = sqlContext.sql("insert into table onlinetblsm select * from sm")&lt;BR /&gt;smTrgPart.write.mode(SaveMode.Append).saveAsTable("onlinetblsm")&lt;BR /&gt;} )&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 16 Sep 2022 10:27:18 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/42240#M32742</guid>
      <dc:creator>Tomas79</dc:creator>
      <dc:date>2022-09-16T10:27:18Z</dc:date>
    </item>
    <item>
      <title>Re: How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/42838#M32743</link>
      <description>In the meantime I figured out one possible solution, which seems to be stable and not running out of memory. The hivecontext has to be created outside in a singleton object.</description>
      <pubDate>Wed, 13 Jul 2016 15:01:34 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/42838#M32743</guid>
      <dc:creator>Tomas79</dc:creator>
      <dc:date>2016-07-13T15:01:34Z</dc:date>
    </item>
    <item>
      <title>Re: How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/48992#M32744</link>
      <description>&lt;P&gt;Can you please share your code.&lt;BR /&gt;Thanks.&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 03 Jan 2017 15:30:16 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/48992#M32744</guid>
      <dc:creator>riyan</dc:creator>
      <dc:date>2017-01-03T15:30:16Z</dc:date>
    </item>
    <item>
      <title>Re: How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/49442#M32745</link>
      <description>&lt;P&gt;Did this work for you?&lt;/P&gt;&lt;P&gt;If not, please post the code which worked for you&lt;/P&gt;</description>
      <pubDate>Sun, 15 Jan 2017 05:33:57 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/49442#M32745</guid>
      <dc:creator>jack0188</dc:creator>
      <dc:date>2017-01-15T05:33:57Z</dc:date>
    </item>
    <item>
      <title>Re: How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/49458#M32746</link>
      <description>&lt;PRE&gt;import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType,TimestampType,LongType,DoubleType,DataType}
import org.apache.spark.sql.Row
import java.io.File
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.sql.hive.HiveContext

object SQLHiveContextSingleton {
    @transient private var instance: HiveContext = _
    def getInstance(sparkContext: SparkContext): HiveContext = {
        synchronized {
              if (instance == null ) {
                instance = new HiveContext(sparkContext)
            }
            instance
        }
    }
}


......


val mydataSplitted = mydataDStream.map( .... )

// saving the content of mydataSplitted dstream of RDD in Hive table

 mydataSplitted.foreachRDD( rdd =&amp;gt; {
        println("Processing mydata RDD")
        val sqlContext = SQLHiveContextSingleton.getInstance( rdd.sparkContext )
        val mydataDF = sqlContext.createDataFrame( rdd, mydataStruct )
        mydataDF.registerTempTable("mydata")
        val mydataTrgPart = sqlContext.sql(mydataSQL)
        sqlContext.sql("SET hive.exec.dynamic.partition = true;")
        sqlContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict;")
        mydataTrgPart.write.mode(SaveMode.Append).partitionBy(partCol).saveAsTable(mydataTable)
    } )&lt;/PRE&gt;</description>
      <pubDate>Mon, 16 Jan 2017 09:35:42 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/49458#M32746</guid>
      <dc:creator>Tomas79</dc:creator>
      <dc:date>2017-01-16T09:35:42Z</dc:date>
    </item>
    <item>
      <title>Re: How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/49488#M32747</link>
      <description>&lt;P&gt;Can i know which versions of hive and spark you are using?&lt;/P&gt;</description>
      <pubDate>Tue, 17 Jan 2017 05:02:36 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/49488#M32747</guid>
      <dc:creator>jack0188</dc:creator>
      <dc:date>2017-01-17T05:02:36Z</dc:date>
    </item>
    <item>
      <title>Re: How to write data from dStream into permanent Hive table</title>
      <link>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/54836#M32748</link>
      <description>&lt;P&gt;Thanks for sharing the code of your solution.&lt;BR /&gt;I've also found that just making HiveContext variable lazy works:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;val sparkConf = new SparkConf().setAppName("StreamHDFSdata")
sparkConf.set("spark.dynamicAllocation.enabled","false")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("/user/hdpuser/checkpoint")
val sc = ssc.sparkContext

val smDStream = ssc.textFileStream("/user/hdpuser/data")
val smSplitted = smDStream.map( x =&amp;gt; x.split(";") ).map( x =&amp;gt; Row.fromSeq( x ) )
...

lazy val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

smSplitted.foreachRDD( rdd =&amp;gt; {
// use sqlContext  here
} )&lt;/PRE&gt;</description>
      <pubDate>Thu, 18 May 2017 09:07:43 GMT</pubDate>
      <guid>https://community.cloudera.com/t5/Archives-of-Support-Questions/How-to-write-data-from-dStream-into-permanent-Hive-table/m-p/54836#M32748</guid>
      <dc:creator>allod</dc:creator>
      <dc:date>2017-05-18T09:07:43Z</dc:date>
    </item>
  </channel>
</rss>

