Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Write spark streaming result to Druid

Write spark streaming result to Druid

New Contributor

Hi,

I want to write spark streaming result to Druid Database.I used Tranquility's BeamRDD adapter to send data to Druid. I have fetched streaming data from URL using Spark streaming Custom Receiver.Now i want to load streaming result to Druid using Tranquility.

public class JavaCustomReceiver extends Receiver<String> {
	private static final long serialVersionUID = 1L;
	public static void main(String[] args) throws Exception {
		    SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
		    @SuppressWarnings("resource")
			JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
		    JavaReceiverInputDStream<String> lines = ssc.receiverStream(
		    	      new JavaCustomReceiver(args[0]));
		    DStream<String> dstream = lines.dstream();	
		    DruidBeamFactory obj= new DruidBeamFactory();		    
		  lines.foreachRDD(rdd ->  { rdd.foreach(record -> record.propagate(obj.makeBeam())); } );  
		    ssc.start();
		    ssc.awaitTermination();
	 }		 
	   // Code to fetch data from URL
class DruidBeamFactory implements BeamFactory<String> {
	private static final long serialVersionUID = 1L;
	private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:2181";
    private static final int MAX_SLEEP_TIME_MILLIS = 1000 ;
    public Beam<String> makeBeam() {
        try {
        	final CuratorFramework curator = CuratorFrameworkFactory.builder()
            .connectString(ZOOKEEPER_CONNECTION_STRING).retryPolicy(new RetryOneTime(MAX_SLEEP_TIME_MILLIS))
                    .sessionTimeoutMs(30 * 1000).build();
            curator.start();           
            final List<String> dimensions = ImmutableList.of("open","high");
            final String dataSource = "druid_test",
                    indexService = "druid/overlord",
                    fireHosePattern = "druid:firehose:%s",
                    discoveryPath = "/druid/discovery";
            final List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
                    new CountAggregatorFactory("count"));
      final Builder<String, String> builder = DruidBeams
                    .builder(new Timestamper<String>() {                               
	  private static final long serialVersionUID = 1L;
	  @Override
          public DateTime timestamp(String theMap) {						
	      return new DateTime(theMap);   } } )	
                    .curator(curator)
                  .discoveryPath(discoveryPath).timestampSpec(new TimestampSpec("timestamp","auto", null))  
                    .location(DruidLocation.create(indexService, fireHosePattern, dataSource))              
                    .rollup(DruidRollup.create(dimensions, aggregators, QueryGranularity.NONE))             
      .tuning(ClusteredBeamTuning.create(Granularity.HOUR, new Period("PT0M"), new Period("PT10M"), 1, 1));
            return builder.buildBeam();
        } catch (Exception e) {
            throw Throwables.propagate(e); } 
}
 }

I am getting issue at below line :

lines.foreachRDD(rdd ->  { rdd.foreach(record -> record.propagate(obj.makeBeam())); } );  
The method propagate(Beam<String>) is undefined for the type String.

Which type is used with propagate() method?

Thanks,