Support Questions
Find answers, ask questions, and share your expertise
Announcements
Alert: Welcome to the Unified Cloudera Community. Former HCC members be sure to read and learn how to activate your account here.

Spark with Kudu behaving unexpectedly when bringing down the Kudu Service

Spark with Kudu behaving unexpectedly when bringing down the Kudu Service

Explorer

Hi All, I am trying to read data from Kafka and ingest into Kudu using Spark Streaming. I am not using KuduContext to perform the upsert operation into kudu. Instead using Kudus native Client API to build the PartialRow and applying the operation for every record from Kafka. I am able to run the spark streaming job and every thing looks good. I am able to see the data into Kudu tables. But, after processing few batches, when I bring down the Kudu service, then my executor program becomes a zombie(the execution is not at all coming to my executor class anymore) and the internal threads that establishes connection to Kudu(which I am not handling in my code) is throwing exceptions which I am not able to handle resulting in message loss.

Please find below the exception:

 

18/02/23 00:16:30 ERROR client.TabletClient: [Peer 
bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on 
[id: 0x6e13b01f] 
java.net.ConnectException: Connection refused: 
kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

 

 

Also, my executor code(one of the map transformation in the lineage is calling the below class) which establishes the connection to Kudu once per JVM when application start is:

 

package org.dwh.streaming.kudu.sparkkudustreaming; 

import java.util.List; 
import java.util.Map; 
import org.apache.kudu.client.*; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.dwh.streaming.kudu.sparkkudustreaming.config.LoadAppConf; 
import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants; 
import org.dwh.streaming.kudu.sparkkudustreaming.models.Store; 

public class KuduProcess { 
    private static Logger logger = LoggerFactory.getLogger(KuduProcess.class) 
    private static final KuduProcess instance = new KuduProcess(); 
    private static KuduClient client; 
    private static KuduTable table; 
    private static KuduSession session; 
    private static OperationResponse response; 
        
    private KuduProcess(){ 
     try { 
        Store store = LoadAppConf.loadAppConf(); 
        client = new KuduClient.KuduClientBuilder(store.getKuduHost()).build();	
        table = client.openTable(store.getKuduTable()); 
        session = client.newSession(); 
        session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); 
         } catch (KuduException e) { 
            logger.error("Kudu Exception:"+ e.getMessage()); 
         } 
    } 

    public static String upsertKudu(Map<String, Object> formattedMap) { 
       if (formattedMap.size() != 0) { 
           try { 
              Upsert upsert = table.newUpsert(); 
              PartialRow row = upsert.getRow(); 
              for(Map.Entry<String, Object> entry: formattedMap.entrySet()){ 
                if (entry.getValue().getClass().equals(String.class)){ 
                  if(entry.getValue().equals(SpecialNullConstants.specialStringNull)) 
                  row.setNull(entry.getKey()); 
                  else row.addString(entry.getKey(), (String) entry.getValue()); 
             }	
                  else if (entry.getValue().getClass().equals(Long.class)){ 
                    if(entry.getValue().equals(SpecialNullConstants.specialLongNull)) 
                   row.setNull(entry.getKey()); 
                  else row.addLong(entry.getKey(), (Long) entry.getValue()); 
             } 
                  else if (entry.getValue().getClass().equals(Integer.class)){ 
                     if(entry.getValue().equals(SpecialNullConstants.specialIntNull)) 
                   row.setNull(entry.getKey()); 
                  else row.addInt(entry.getKey(), (Integer) entry.getValue()); 
             } 
   } 
              session.apply(upsert); 
              List<OperationResponse> responses = session.flush(); 
              for (OperationResponse r : responses) { 
                 if (r.hasRowError()) { 
                    RowError e = r.getRowError(); 
                    if ("ALREADY_PRESENT".equals(e.getErrorStatus())) { 
                        continue; 
                    } 
                    logger.error("Error inserting " + e.getOperation().toString() 
                                  + ": " + e.toString()); 
                    } 
                 } 
                                
               } catch (Exception e) { 
                                logger.error("Exception during upsert:",e); 
                        } 
                } 
                return "SUCCESS"; 
        } 

} 

 

 Any suggestion on handling this case where I can avoid data loss is helpful.