Member since
04-06-2018
2
Posts
0
Kudos Received
0
Solutions
03-27-2019
02:50 PM
Hi Team, I am trying following SQL:- var work__store_level_vend_pack_loc_final_data = sparksession.read.format("csv") .option("header", "true") .option("delimiter", "|") .option("inferSchema", "true") .load("C:\\Users\\jabin\\Desktop\\project_files\\work__store_level_vend_pack_loc_final_data.txt"); work__store_level_vend_pack_loc_final_data.registerTempTable("work__store_level_vend_pack_loc_final_data_table");
var r1 = sparksession.sqlContext.sql( "SELECT shc_item_id ,'K' as source_owner_cd,item_purchase_status_cd, vendor_package_id,vendor_package_purchase_status_cd,flow_type_cd as vendor_package_flow_type_cd,vendor_carton_qty,vendor_stock_nbr,ksn_package_id,ksn_purchase_status_cd,import_ind,sears_divission_nbr,sears_item_nbr,sears_sku_nbr,scan_based_trading_ind,cross_merchandising_cd,retail_carton_vendor_package_id,vendor_package_owner_cd,can_carry_model_id,'' AS days_to_check_begin_day_qty,'' AS days_to_check_end_day_qty ,dotcom_allocation_ind ,retail_carton_internal_package_qty,allocation_replenishment_cd,shc_item_type_cd,idrp_order_method_cd,source_package_qty as store_source_package_qty,order_duns_nbr FROM work__store_level_vend_pack_loc_final_data_table WHERE flow_type_cd = 'JIT' OR servicing_dc_nbr > '0' ") // .collect.foreach(println) now i want to distinct all column of the r1 using sparksession.sqlContext.sql("") how to do above thing?
... View more
Labels:
04-06-2018
02:04 PM
Hi Team, I am doing twitter streaming by the following code and its not working properly.Please suggest the solution import org.apache.spark.SparkConf
import org.apache.spark.SparkContext import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.twitter._
object TwitterStreaming
{ def main(args:Array[String])
{ var conf = new SparkConf().setMaster("local[*]").setAppName("socketStreaming").set("spark.network.timeout", "6s").set("spark.executor.heartbeatInterval", "1000s").set("spark.executor.memory", "2g").set("spark.driver.memory", "8g"); var sc = new SparkContext(conf);
sc.addJar("/root/heena/simple_project/spark-streaming-twitter_2.10-1.2.0.jar");
sc.addJar("/root/heena/simple_project/twitter4j-stream-4.0.2.jar"); sc.addJar("/root/heena/simple_project/twitter4j-core-4.0.2.jar"); sc.addJar("/root/heena/simple_project/spark-core_2.11-1.5.2.logging.jar");
//sc.stop();
//Write this code here as writing at first causes the error
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import twitter4j.auth._ import twitter4j.conf._ import org.apache.spark.internal.Logging val ssc = new StreamingContext(sc,Seconds(1)); val socket = ssc.socketTextStream("localhost",8585); val cb = new ConfigurationBuilder; cb.setDebugEnabled(true).setOAuthConsumerKey("IqmtNPPtRAkdzDvTxdE5HrXYp").setOAuthConsumerSecret("Ul8ttM4Jmafa8z9T4rKmgU6TMx09BL0C7HduhWUzhJw8GLRSdJ").setOAuthAccessToken("2858023206-ku6vr7wjCJlhJZ6AUWYyWXLrBCaI2PZWxtPjvvy").setOAuthAccessTokenSecret("zaAZSuoWtnhWEjH7GtrrtHiLyP2LcZAGlBWUroGJLpbqg")
val auth = new OAuthAuthorization(cb.build); val tweets = TwitterUtils.createStream(ssc,Some(auth)); val englishTweets = tweets.filter(_.getLang()=="en");
val status = englishTweets.map(status=>status.getText) ssc.checkpoint("hdfs://localhost:9000/user/heena") ssc.start ssc.awaitTermination }} My build.sbt is name := "twitter4j-tutorial"
version := "0.1.0 "
scalaVersion := "2.10.0"
libraryDependencies += "org.twitter4j" % "twitter4j-stream" % "3.0.3"
... View more
Labels: