Reply
Highlighted
Explorer
Posts: 62
Registered: ‎01-22-2014

Deserializing a Spark Dstream

Hi,

 

I am trying to apply Drools Engine (Drools Fusion) over streaming data using Spark Streaming.

 

But I am unable to deserialize a variable of type JavaDStream<Long>to fire rules over it.

 

It is showing comparision error between JavaDstream<Long> and Interger. Please suggest how to resolve this. Below is the Code

 

 

Main Class:

-------------

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Spark Streaming");
				
		JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(10000));
		
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 8089);
		
		class splitFunc implements Function<String, Long> {
			public Long call(String x) { return Long.parseLong(x); }
			}
		
		
		 JavaDStream<Long>  in_amount = lines.map(new splitFunc());
	
--
--
--

		 insertEvent(entryPointStoreOne,new Sale(),in_amount);


 

 

Sale.java - FACTS

----------------------

 

import org.apache.spark.streaming.api.java.JavaDStream;

public class Sale {

	private JavaDStream<Long> amount;
	
	public Sale (){ }
	
	public void setAmount(JavaDStream<Long> amount)
	{
		this.amount=amount;
	}
	
	public JavaDStream<Long> getAmount()
	{
		return amount;
	}
	
}

 

 

 

Drools Rules File

------------------

declare Sale
  @role(event)
end


rule "Store One - Has Passed its Sales Record"
    when
        $amount : Sale( amount > 1000 ) - FAILING HERE 
        from entry-point StoreOne      
    then
        System.out.println("Store One has passed its Sales Records");
    end

 Thanks,

Arun