Support Questions
Find answers, ask questions, and share your expertise

Process on List of Custom Objects of a Spark JavaRDD

Highlighted

Process on List of Custom Objects of a Spark JavaRDD

New Contributor
Hi, can some one please help me to resolve the issue, My spark processor consumes json records from Kafka, 
each json contains Gateway and list of devices information. So from each JSON i will prepare List of 
EventData (List<EventData>) like below, Here I want to save each EventData data into Cassandra table from 
EventRDD


try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000))) {
   JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
	LocationStrategies.PreferConsistent(),
	ConsumerStrategies.<String, String>Subscribe( Arrays.asList("events-topic"), getKafkaParams()));
		          
	stream.foreachRDD(rdd -> {
		JavaRDD<List<EventData>> eventRDD = rdd.map(new EventJsonMapper());
		processEventsData(eventRDD );
	});

	ssc.start();
	ssc.awaitTermination();
}
//I want to save each EventData data into cassandra table from eventRDDList
public void processEventsData(JavaRDD<List<EventData>> eventRDDList) {
		Map<String, String> columnNameMappings = new HashMap<String, String>();
		columnNameMappings.put("column1", "column1");
		columnNameMappings.put("column1", "column1");
		columnNameMappings.put("column2", "column2");
		columnNameMappings.put("column3", "column3");
		columnNameMappings.put("column4", "column4");		
		javaFunctions(eventRDD).writerBuilder("myschema", "gatewayinfo",
		mapToRow(EventData.class, columnNameMappings)).saveToCassandra();
		
	}
			
I am getting error in below line

javaFunctions(eventRDDList).writerBuilder("myschema", "gatewayinfo",
		mapToRow(EventData.class, columnNameMappings)).saveToCassandra();