Reply
Explorer
Posts: 16
Registered: ‎04-26-2017

Spark with Kudu behaving unexpectedly when bringing down the Kudu Service

Hi All, I am trying to read data from Kafka and ingest into Kudu using Spark Streaming. I am not using KuduContext to perform the upsert operation into kudu. Instead using Kudus native Client API to build the PartialRow and applying the operation for every record from Kafka. I am able to run the spark streaming job and every thing looks good. I am able to see the data into Kudu tables. But, after processing few batches, when I bring down the Kudu service, then my executor program becomes a zombie(the execution is not at all coming to my executor class anymore) and the internal threads that establishes connection to Kudu(which I am not handling in my code) is throwing exceptions which I am not able to handle resulting in message loss.

Please find below the exception:

 

18/02/23 00:16:30 ERROR client.TabletClient: [Peer 
bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on 
[id: 0x6e13b01f] 
java.net.ConnectException: Connection refused: 
kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050 
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
    at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) 
    at 
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

 

 

Also, my executor code(one of the map transformation in the lineage is calling the below class) which establishes the connection to Kudu once per JVM when application start is:

 

package org.dwh.streaming.kudu.sparkkudustreaming; 

import java.util.List; 
import java.util.Map; 
import org.apache.kudu.client.*; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.dwh.streaming.kudu.sparkkudustreaming.config.LoadAppConf; 
import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants; 
import org.dwh.streaming.kudu.sparkkudustreaming.models.Store; 

public class KuduProcess { 
    private static Logger logger = LoggerFactory.getLogger(KuduProcess.class) 
    private static final KuduProcess instance = new KuduProcess(); 
    private static KuduClient client; 
    private static KuduTable table; 
    private static KuduSession session; 
    private static OperationResponse response; 
        
    private KuduProcess(){ 
     try { 
        Store store = LoadAppConf.loadAppConf(); 
        client = new KuduClient.KuduClientBuilder(store.getKuduHost()).build();	
        table = client.openTable(store.getKuduTable()); 
        session = client.newSession(); 
        session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); 
         } catch (KuduException e) { 
            logger.error("Kudu Exception:"+ e.getMessage()); 
         } 
    } 

    public static String upsertKudu(Map<String, Object> formattedMap) { 
       if (formattedMap.size() != 0) { 
           try { 
              Upsert upsert = table.newUpsert(); 
              PartialRow row = upsert.getRow(); 
              for(Map.Entry<String, Object> entry: formattedMap.entrySet()){ 
                if (entry.getValue().getClass().equals(String.class)){ 
                  if(entry.getValue().equals(SpecialNullConstants.specialStringNull)) 
                  row.setNull(entry.getKey()); 
                  else row.addString(entry.getKey(), (String) entry.getValue()); 
             }	
                  else if (entry.getValue().getClass().equals(Long.class)){ 
                    if(entry.getValue().equals(SpecialNullConstants.specialLongNull)) 
                   row.setNull(entry.getKey()); 
                  else row.addLong(entry.getKey(), (Long) entry.getValue()); 
             } 
                  else if (entry.getValue().getClass().equals(Integer.class)){ 
                     if(entry.getValue().equals(SpecialNullConstants.specialIntNull)) 
                   row.setNull(entry.getKey()); 
                  else row.addInt(entry.getKey(), (Integer) entry.getValue()); 
             } 
   } 
              session.apply(upsert); 
              List<OperationResponse> responses = session.flush(); 
              for (OperationResponse r : responses) { 
                 if (r.hasRowError()) { 
                    RowError e = r.getRowError(); 
                    if ("ALREADY_PRESENT".equals(e.getErrorStatus())) { 
                        continue; 
                    } 
                    logger.error("Error inserting " + e.getOperation().toString() 
                                  + ": " + e.toString()); 
                    } 
                 } 
                                
               } catch (Exception e) { 
                                logger.error("Exception during upsert:",e); 
                        } 
                } 
                return "SUCCESS"; 
        } 

} 

 

 Any suggestion on handling this case where I can avoid data loss is helpful.