Support Questions
Find answers, ask questions, and share your expertise

Process on List of Custom Objects of a Spark JavaRDD


Process on List of Custom Objects of a Spark JavaRDD

New Contributor
Hi, can some one please help me to resolve the issue, My spark processor consumes json records from Kafka, 
each json contains Gateway and list of devices information. So from each JSON i will prepare List of 
EventData (List<EventData>) like below, Here I want to save each EventData data into Cassandra table from 

try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000))) {
   JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
	ConsumerStrategies.<String, String>Subscribe( Arrays.asList("events-topic"), getKafkaParams()));
	stream.foreachRDD(rdd -> {
		JavaRDD<List<EventData>> eventRDD = EventJsonMapper());
		processEventsData(eventRDD );

//I want to save each EventData data into cassandra table from eventRDDList
public void processEventsData(JavaRDD<List<EventData>> eventRDDList) {
		Map<String, String> columnNameMappings = new HashMap<String, String>();
		columnNameMappings.put("column1", "column1");
		columnNameMappings.put("column1", "column1");
		columnNameMappings.put("column2", "column2");
		columnNameMappings.put("column3", "column3");
		columnNameMappings.put("column4", "column4");		
		javaFunctions(eventRDD).writerBuilder("myschema", "gatewayinfo",
		mapToRow(EventData.class, columnNameMappings)).saveToCassandra();
I am getting error in below line

javaFunctions(eventRDDList).writerBuilder("myschema", "gatewayinfo",
		mapToRow(EventData.class, columnNameMappings)).saveToCassandra();