Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

compare structured streaming dataframe with other two dataframes

compare structured streaming dataframe with other two dataframes

New Contributor
streaming data from kafka is coming like


    Batch: 0
    -------------------------------------------
    +-----+-----+---------+--------+
    |userid|latitude|longitude||speed|
    +-----+-----+---------+--------+
    |230|17.543981|55.310775|4.43 |
    |230|17.54388 |55.310707|2.92 |
    |230|17.543859|55.310691|0.0  |
    |550|17.450855|55.566459|13.04|
    |230|17.543859|55.310691|0.0  |
    |550|17.450727|55.566062|14.02|
    |230|17.543859|55.310691|0.0  |
    |550|17.450586|55.565666|14.35|
    |230|17.543827|55.310676|1.08 |
    |550|17.45045 |55.565223|14.73|
    |230|17.543779|55.310653|1.4  |
    |550|17.450321|55.564804|15.04|
    |550|17.450174|55.564415|11.36|
    |230|17.543754|55.31063 |0.83 |
    |230|17.543729|55.310615|0.0  |
    
    ----------------------------------
    
    
    df1 which is created from csv file
    
    latitude|longitude|


    17.538696,55.409913
    17.538689,55.409899
    17.538676,55.409886
    17.538658,55.409887
    17.538638,55.40989
    17.538615,55.40989
    17.5386,55.40989
    17.538586,55.409889
    17.53857,55.409888
    17.538558,55.409887
    17.538546,55.409887
    17.538532,55.409884
    17.538518,55.409885
    17.5385,55.409884
    ----------------------   
    
    df2
    
    latitude|longitude|


    17.117724,56.800142
    17.117726,56.800131
    17.117733,56.800114
    17.117737,56.800102
    17.117742,56.800085
    17.117746,56.800073
    17.11775,56.800057
    17.117755,56.800042
    17.117761,56.800025
    17.117769,56.799999
    17.117774,56.799983
    17.117779,56.799968
    17.117783,56.799956
    17.117788,56.79994
    17.117793,56.799923
    17.117798,56.799906
     
here i want to compare the streaming data lat,long with df1 lat,long
if the match found then streaming data will start calculating the distance using lat,long . 
it will continue to find the distance till another match found i.e when streaming data matches with the df2. and so on loop continues. starting from df2 as the starting point and df1 as destination point(for return trip)








but m not able to do the comparison using structured streaming ,


 object trip {


    def main(args: Array[String]): Unit = {


      val spark = SparkSession.builder().master("local[*]")
        .appName("trip_calculation")
        .getOrCreate()




      val schema1= new StructType()
                      .add("latitude",DoubleType)
                      .add("longitude",DoubleType)


      val df1= spark.read.format("csv").schema(schema1).load("df1.csv")
       df1.show()




      val schema2= new StructType()  .add("latitude",DoubleType)
        .add("longitude",DoubleType)


      val df2= spark.read.format("csv").schema(schema2).load("df2.csv")
        df2.show()








      val kafka = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "vehicle")
        .option("startingOffsets", "earliest")
        .load()


      import spark.implicits._
      spark.sparkContext.setLogLevel("ERROR")






     
      val kafkadata = kafka.withColumn("Value",$"value".cast(StringType))
        .select("Value")


      kafkadata.printSchema()
   


      val r_data: DataFrame = kafkadata.selectExpr("CAST( Value as String)")




      val schema = new StructType()
        .add("userid", StringType)
        .add("location", new StructType().add("longitude", DoubleType)
          .add("latitude", DoubleType)
          .add("speed", DoubleType)
        
        )


      val data = r_data.select(from_json(col("value"), schema) as "data").select("data.*")


      
      val df = 
        data.withColumn("latitude",explode(array("location.latitude")))
        .withColumn("longitude", explode(array("location.longitude")))
        .withColumn("speed", explode(array("location.speed")))     
        .drop("location")
        


  after this , how to start comparison and matching of streaming df with df1 and df2 
as per the requirement. 




     















Don't have an account?
Coming from Hortonworks? Activate your account here