Support Questions
Find answers, ask questions, and share your expertise
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Process List of Custom Objects of a Spark JavaRDD in Cassandra

Process List of Custom Objects of a Spark JavaRDD in Cassandra

New Contributor

Hi Can some one please help me to resolve the issue, My spark processor consumes json records from Kafka, each json contains Gateway and list of devices information. So from each JSON i will prepare List of EventData (List) like below, Here I want to save each EventData data into Cassandra table from EventRDD

try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(3000))) {
   JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(ssc,
    ConsumerStrategies.<String, String>Subscribe( Arrays.asList("events-topic"), getKafkaParams()));

    stream.foreachRDD(rdd -> {
        JavaRDD<List<EventData>> eventRDD = EventJsonMapper());
        processEventsData(eventRDD );

//I want to save each EventData data into cassandra table from eventRDDList
public void processEventsData(JavaRDD<List<EventData>> eventRDDList) {
        Map<String, String> columnNameMappings = new HashMap<String, String>();
        columnNameMappings.put("column1", "column1");
        columnNameMappings.put("column1", "column1");
        columnNameMappings.put("column2", "column2");
        columnNameMappings.put("column3", "column3");
        columnNameMappings.put("column4", "column4");       
        javaFunctions(eventRDD).writerBuilder("myschema", "gatewayinfo",
        mapToRow(EventData.class, columnNameMappings)).saveToCassandra();

I am getting error in below line

javaFunctions(eventRDDList).writerBuilder("myschema", "gatewayinfo",
        mapToRow(EventData.class, columnNameMappings)).saveToCassandra();
Don't have an account?
Coming from Hortonworks? Activate your account here