Support Questions

Find answers, ask questions, and share your expertise

Unable to execute hbase job due to Unexpected closed connection exception

avatar
Expert Contributor

hbase-error.txtUnable to execute hbase job due exception-

2016-05-26 15:49:38,270 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,]

Thu May 26 15:49:38 UTC 2016, org.apache.hadoop.hbase.client.RpcRetryingCaller@77e92d1b, java.io.IOException: Unexpected closed connection

at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136) at org.apache.hadoop.hbase.client.HTable.get(HTable.java:811) at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.get(HTablePool.java:394) at com.bizosys.hsearch.hbase.HTableWrapper.get(HTableWrapper.java:100) at com.arc.hbase.jobs.CacheBuildJob.saveContainer(CacheBuildJob.java:387) at com.arc.hbase.jobs.CacheBuildJob.save(CacheBuildJob.java:323) at com.arc.hbase.jobs.CacheBuildJob.exec(CacheBuildJob.java:172) at com.arc.hbase.jobs.CacheBuildJob.run(CacheBuildJob.java:77) at com.arc.hbase.jobs.CacheBuildJob.main(CacheBuildJob.java:513)

Attached is full error log.

1 ACCEPTED SOLUTION

avatar
Super Guru

It seems possible that your application might be prematurely closing the HBase `Connection` while your tasks are still using HTable instances that use that Connection. Have you investigated this at all? Any chance you can share more about your application?

View solution in original post

16 REPLIES 16

avatar
Master Collaborator

Can you check server log on fsdata1c.corp.arc.com to see if there is more clue ?

Which release of hbase are you using ?

avatar
Expert Contributor

Here is the region server log on fsdata1c.corp.arc.com

2016-05-26 13:57:47,158 WARN  [RpcServer.handler=55,port=60020] ipc.RpcServer: RpcServer.respondercallId: 4324 service: ClientService methodName: Scan size: 30 connection: 10.1.1.243:52740: output error
2016-05-26 13:57:47,159 WARN  [RpcServer.handler=55,port=60020] ipc.RpcServer: RpcServer.handler=55,port=60020: caught a ClosedChannelException, this means that the server was processing a request but the client went away. The error message was: null
2016-05-26 13:58:47,135 INFO  [regionserver60020.leaseChecker] regionserver.HRegionServer: Scanner 3235538737043012213 lease expired on region CUTOFF4,O11\x09166343\x093\x09162830813,1464012806340.44e206d15b62ed4d452545242bd105cd.
2016-05-26 13:58:52,422 INFO  [RpcServer.reader=8,port=60020] ipc.RpcServer: RpcServer.listener,port=60020: count of bytes read: 0
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at org.apache.hadoop.hbase.ipc.RpcServer.channelRead(RpcServer.java:2224)
	at org.apache.hadoop.hbase.ipc.RpcServer$Connection.readAndProcess(RpcServer.java:1415)
	at org.apache.hadoop.hbase.ipc.RpcServer$Listener.doRead(RpcServer.java:790)
	at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.doRunLoop(RpcServer.java:581)
	at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.run(RpcServer.java:556)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)
2016-05-26 13:58:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12076, accesses=3683357, hits=3525415, hitRatio=95.71%, , cachingAccesses=3547447, cachingHits=3448937, cachingHitsRatio=97.22%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 13:59:52,420 INFO  [regionserver60020.leaseChecker] regionserver.HRegionServer: Scanner 594209239597513333 lease expired on region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894.
2016-05-26 14:03:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3720308, hits=3562365, hitRatio=95.75%, , cachingAccesses=3584398, cachingHits=3485887, cachingHitsRatio=97.25%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:08:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3749373, hits=3591430, hitRatio=95.79%, , cachingAccesses=3613463, cachingHits=3514952, cachingHitsRatio=97.27%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:13:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3769032, hits=3611089, hitRatio=95.81%, , cachingAccesses=3633122, cachingHits=3534611, cachingHitsRatio=97.29%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:18:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3769844, hits=3611901, hitRatio=95.81%, , cachingAccesses=3633934, cachingHits=3535423, cachingHitsRatio=97.29%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:23:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3831804, hits=3673861, hitRatio=95.88%, , cachingAccesses=3695894, cachingHits=3597383, cachingHitsRatio=97.33%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:28:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3832074, hits=3674131, hitRatio=95.88%, , cachingAccesses=3696164, cachingHits=3597653, cachingHitsRatio=97.33%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:33:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3844554, hits=3686611, hitRatio=95.89%, , cachingAccesses=3708644, cachingHits=3610133, cachingHitsRatio=97.34%, evictions=0, evicted=79712, evictedPerRun=Infinity
2016-05-26 14:38:11,712 INFO  [regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. after a delay of 15478
2016-05-26 14:38:21,712 INFO  [regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. after a delay of 19133
2016-05-26 14:38:27,190 DEBUG [Thread-20] regionserver.HRegion: Started memstore flush for CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894., current region memstore size 93.2 M
2016-05-26 14:38:27,401 INFO  [Thread-20] regionserver.DefaultStoreFlusher: Flushed, sequenceid=96047, memsize=31.7 M, hasBloomFilter=true, into tmp file hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/.tmp/6d1e9fe6186a448f9a322c73ecf4ad0a
2016-05-26 14:38:27,411 DEBUG [Thread-20] regionserver.HRegionFileSystem: Committing store file hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/.tmp/6d1e9fe6186a448f9a322c73ecf4ad0a as hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/1/6d1e9fe6186a448f9a322c73ecf4ad0a

avatar
Super Guru

It seems possible that your application might be prematurely closing the HBase `Connection` while your tasks are still using HTable instances that use that Connection. Have you investigated this at all? Any chance you can share more about your application?

avatar
Expert Contributor

Below is code-

package com.arc.hbase.jobs;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;


import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;


import com.arc.datasink.HBaseTables;
import com.arc.management.MonitorCacheBuildJob;
import com.arc.management.MonitorCollector;
import com.arc.management.MonitorMeasure;
import com.arc.rest.common.BytesMergerContainer;
import com.arc.rest.common.BytesMergerObject;
import com.arc.rest.common.ChangeDecorator;
import com.arc.rest.common.ChangeSetDecorator;
import com.arc.rest.common.ObjectKey;
import com.arc.rest.common.PostChanges_3_0.Changes;
import com.arc.util.ArcConfig;
import com.arc.util.FileSyncLog;
import com.arc.util.LineReaderUtil;
import com.bizosys.hsearch.hbase.HBaseFacade;
import com.bizosys.hsearch.hbase.HTableWrapper;


public class CacheBuildJob 
{
	private static final boolean INFO_ENABLED = FileSyncLog.l.isInfoEnabled();
	private static final char SEPARATOR_OBJID = ',';
	private static CacheBuildJob instance = null;
	
	private static final boolean MONITOR_JOB = ArcConfig.MONITOR_CACHE_BUILD_JOB;
	
	public static CacheBuildJob getInstance() 
	{
		if ( null != instance) return instance;
		synchronized (CacheBuildJob.class) 
		{
			if ( null != instance ) return instance;
			instance = new CacheBuildJob();
		}
		return instance;
	}	
	
	long lastUpdatedTime= new Date().getTime() - ArcConfig.CACHE_BUILD_START_INTERVAL;
	int lastUpdatedDt = new Date(lastUpdatedTime).getDate();
	
	private CacheBuildJob()
	{
	}
	
	boolean isRunning  = false;
	public void run()
	{
		if ( isRunning  ) 
		{
			if ( INFO_ENABLED ) FileSyncLog.l.info(new Date().toLocaleString() + " Cache Build Job SKIPPED");
			return;
		}
		
		isRunning  = true;
		long start = System.currentTimeMillis();
		
		try 
		{
			exec();
			long end = System.currentTimeMillis();
			if ( INFO_ENABLED ) FileSyncLog.l.info(new Date().toLocaleString() + " Cache Build Job SUCESS, Run time in ms: " + (end - start));
		} 
		catch (Exception ex) 
		{
			long end = System.currentTimeMillis();
			ex.printStackTrace();
			FileSyncLog.l.fatal(new Date().toLocaleString() + " Cache Build Job FAILED, Run time in ms: " + (end - start));
		} 
		finally 
		{
			isRunning = false;
		}
	}
	
	public void exec() throws Exception
	{
		long currentTime = System.currentTimeMillis();
		//Take the clock 2 sec to past 
		currentTime = currentTime - ArcConfig.CACHEBUILD_JOB_RUN_INTERVAL;
		
		if ( lastUpdatedTime >= currentTime) {
			FileSyncLog.l.warn("CacheBuildJob : lastUpdatedTime >= currentTime :  " + lastUpdatedTime + ">=" + currentTime);
			return;
		}
		
		Date now = new Date(currentTime);
		long startTime = currentTime;
		int currentUpdatedDt = now.getDate();


		Map<String, String> uniqueCotainerKeyWithObjectKeys = new HashMap<String, String>(1024);
		List<ChangeDecorator.Deser> timeseriesChanges = new ArrayList<ChangeDecorator.Deser>(1024);
		
		if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges: Start");
		if ( MONITOR_JOB ) MonitorCacheBuildJob.getInstance().onEnter();
		
		//Step1 - Get last left rows from the old table
		if ( lastUpdatedDt != currentUpdatedDt) 
		{
//			String tsTable = HBaseTables.getLastTimeSeriesTable(currentUpdatedDt);
			String tsTable = HBaseTables.getLastTimeSeriesTable(currentTime);
			addToTimeseriesChanges(tsTable, startTime, uniqueCotainerKeyWithObjectKeys, timeseriesChanges);
		}
		
		if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges, Changes: " + timeseriesChanges.size() + " Projects:" + uniqueCotainerKeyWithObjectKeys.size());
		
		//Step2 - Get from current table
//		String tsTable = HBaseTables.getTimeSeriesTable(currentUpdatedDt);
		String tsTable = HBaseTables.getTimeSeriesTable(currentTime);
		addToTimeseriesChanges(tsTable, startTime, uniqueCotainerKeyWithObjectKeys, timeseriesChanges);
		if ( INFO_ENABLED)
			FileSyncLog.l.info("AddToTimeseriesChanges, Changes: " + timeseriesChanges.size() + "  Projects:" + uniqueCotainerKeyWithObjectKeys.size());
		
		//Step3 -Merge with cutoff table.
		String currentCutoffTableName = HBaseTables.getCutoffTable(currentUpdatedDt);
		String lastCutoffTableName = HBaseTables.getLastCutoffTable(currentUpdatedDt);
		
		HBaseFacade facade = null;
		HTableWrapper currentCutoffTable = null;
		HTableWrapper lastCutoffTable = null;
		
		long cutoffTime = startTime - HBaseTables.CUTOFF_DURATION_SECS * 1000;
		
		/**
		 * We have all the ChangeDecorators. Next Steps
		 * > 
		 */
		
		try {
			facade = HBaseFacade.getInstance();
			if ( INFO_ENABLED) {
				FileSyncLog.l.info("Current Cutoff Table: " + currentCutoffTableName + 
					" (Size)" + timeseriesChanges.size() + " (Cutoff Limit)" + new Date(cutoffTime));
			}
			currentCutoffTable = facade.getTable(currentCutoffTableName);
			lastCutoffTable = facade.getTable(lastCutoffTableName);
//			System.out.println("TimeSeriesTable - "+tsTable+"\tCurrent Cutoff Table - "+ currentCutoffTableName+"\tLast Cutoff Table - "+ lastCutoffTableName + " on " + now.toString() + " time in millis "+ now.getTime() + " currentTime " + currentTime);
			int batchSize = ArcConfig.CACHE_BUILD_BATCH_SIZE;
			Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets = 
				new HashMap<String, ChangeSetDecorator.Deser>(1024);
			List<Put> putL = new ArrayList<Put>(batchSize);
			


			for (ChangeDecorator.Deser deserCh : timeseriesChanges) 
			{
				deserCh.touch(System.currentTimeMillis());
				String objectKey = deserCh.objectKey;
				
				/**
				 * Batch Flush on 4096 objects = 4MB
				 */
//				System.out.println("CacheBuild Time -: "+(System.currentTimeMillis()-deserCh.getTime()));
				if ( objKeyWithChangeSets.size() >= batchSize ) 
				{
					if ( INFO_ENABLED)  FileSyncLog.l.info("Saving: Enter");
					save(currentCutoffTable, lastCutoffTable, objKeyWithChangeSets, uniqueCotainerKeyWithObjectKeys, cutoffTime, putL);
					if ( INFO_ENABLED)  FileSyncLog.l.info("Saving: Exit");
				}
				
				
				/**
				 * Step: 1 # Memory Table Lookup, 
				 * If any object id changes are already there, means already loaded and read.
				 * Just merge to it.
				 */
				if (objKeyWithChangeSets.containsKey(objectKey)) {
					
					if ( INFO_ENABLED)  FileSyncLog.l.info("Memory Table Lookup: " + objectKey);
					ChangeSetDecorator.Deser mergedVal = 
						createObjChangeSets(objKeyWithChangeSets.get(objectKey), deserCh, cutoffTime);
					
					mergedVal.key = objectKey;
					mergedVal.itemId = deserCh.getChanges().getItemId();
					
					objKeyWithChangeSets.put(objectKey, mergedVal);
					continue;
				}
				
				Get getter = new Get(objectKey.getBytes());
				
				/**
				 * Step: 2 # Look in current cutoff Table,
				 */
				Result resultC = currentCutoffTable.get(getter);
				
				{
					if ( null != resultC) {


						byte[] val = resultC.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
						int valSize = ( null == val) ? 0 : val.length;
						if ( valSize == 0 ) val = null;
						if ( null != val ) {
							
							if ( INFO_ENABLED)  FileSyncLog.l.info("Curent cutoff table Lookup: " + objectKey);
							ChangeSetDecorator.Deser cs = new ChangeSetDecorator.Deser(val);
							cs.key = objectKey;
							cs.itemId = deserCh.getChanges().getItemId();


							ChangeSetDecorator.Deser mergedVal = createObjChangeSets(cs, deserCh, cutoffTime);
							objKeyWithChangeSets.put(objectKey, mergedVal);
							continue;
						}
					}
				}


				/**
				 * Step: 3 # Fall back to last cutoff table as does not exist in 
				 * current cut off table.
				 */


				Result resultO = lastCutoffTable.get(getter);
				
				if ( null != resultO) {
					byte[] val = resultO.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
					int valSize = ( null == val) ? 0 : val.length;
					if ( valSize == 0 ) val = null;


					if ( null != val ) {


						if ( INFO_ENABLED)  FileSyncLog.l.info("Previous cutoff table Lookup: " + objectKey);
						ChangeSetDecorator.Deser cs = new ChangeSetDecorator.Deser(val);
						cs.key = objectKey;
						cs.itemId = deserCh.getChanges().getItemId();


						ChangeSetDecorator.Deser mergedVal = createObjChangeSets(cs, deserCh, cutoffTime);
						objKeyWithChangeSets.put(objectKey, mergedVal);
						continue;
					}
				}
				
				/**
				 * We didn't find in current or last cutoff table.
				 * It is a fresh change. Boot strap the changes 
				 */


				if ( INFO_ENABLED)  FileSyncLog.l.info("Bootstrapping: " + objectKey);
				ChangeSetDecorator.Deser none = new ChangeSetDecorator.Deser(null);
				none.key = objectKey;
				none.itemId = deserCh.getChanges().getItemId();
				ChangeSetDecorator.Deser mergedVal = createObjChangeSets(none, deserCh, -1);
				objKeyWithChangeSets.put(objectKey, mergedVal);
			}
			
			if ( objKeyWithChangeSets.size() >= 0 ) {
				save(currentCutoffTable, lastCutoffTable, 
					objKeyWithChangeSets, uniqueCotainerKeyWithObjectKeys, cutoffTime, putL);
			}
			
			/**
			 * Step: 4 # All sucess, move to next time stamp
			 */
			lastUpdatedTime = startTime; //maximum timestamp value, exclusive
			lastUpdatedDt = currentUpdatedDt;
			
		} catch (Exception ex) {
		
			throw ex;


		} finally {
			if ( null != facade && null != currentCutoffTable) facade.putTable(currentCutoffTable);
			if ( null != facade && null != lastCutoffTable) facade.putTable(lastCutoffTable);
		}
		
		long endTime = System.currentTimeMillis();


		if ( MONITOR_JOB ) 
		{
			long timeTaken = (endTime - startTime);
			MonitorCollector collector = new MonitorCollector();
			collector.add(new MonitorMeasure("CacheBuildJob", timeTaken));
			MonitorCacheBuildJob.getInstance().onExit(collector);
		}
	}


	/**
	 * 
	 * @param currentCutoffTable
	 * @param lastCutoffTable
	 * @param objKeyWithChangeSets
	 * @param uniqueProjectIdWithObjIds
	 * @param cutoffTime
	 * @param putL
	 * @throws IOException
	 */
	private void save(HTableWrapper currentCutoffTable, HTableWrapper lastCutoffTable,
			Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets, 
			Map<String, String> uniqueProjectIdWithObjIds,  
			long cutoffTime, List<Put> putL)
			throws IOException {
		
		putL.clear();
		for (String key : objKeyWithChangeSets.keySet()) {
			ChangeSetDecorator.Deser val = objKeyWithChangeSets.get(key);
			
			Put update = new Put(key.getBytes());
			update.add(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME, val.data);
			update.setDurability(Durability.SYNC_WAL);
			putL.add(update);
		}
		currentCutoffTable.put(putL);
		if ( INFO_ENABLED) FileSyncLog.l.info("Cutoff Table Objects Added - " + putL.size());
		putL.clear();




		saveContainer(currentCutoffTable, lastCutoffTable,
			objKeyWithChangeSets, uniqueProjectIdWithObjIds, cutoffTime);


		currentCutoffTable.flushCommits();
	
		objKeyWithChangeSets.clear();
		
	}


	/**
	 * 
	 * @param currentCutoffTable
	 * @param lastCutoffTable
	 * @param objKeyWithChangeSets
	 * @param uniqueProjectIdWithObjIds
	 * @param cutoffTime
	 * @throws IOException
	 */
	private void saveContainer(HTableWrapper currentCutoffTable,
		HTableWrapper lastCutoffTable,
		Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets,
		Map<String, String> uniqueProjectIdWithObjIds, long cutoffTime)
		throws IOException {


		/**
		 * mergeContainerChanges for the current projects
		 */
		List<String> objKeyL = new ArrayList<String>();
		Set<ChangeSetDecorator.Deser> containerObjects = new HashSet<ChangeSetDecorator.Deser>();


		for (String projectId : uniqueProjectIdWithObjIds.keySet()) {


			objKeyL.clear();
			containerObjects.clear();


			/**
			 * Find out all object Ids belonging to this project and in current set
			 */
			String objectKeys = uniqueProjectIdWithObjIds.get(projectId);
			LineReaderUtil.fastSplit(objKeyL, objectKeys, SEPARATOR_OBJID);
			for (String objKey : objKeyL) {
				
				ChangeSetDecorator.Deser val = objKeyWithChangeSets.get(objKey);
				if ( null != val) containerObjects.add( val);
			}
			
			if ( INFO_ENABLED) FileSyncLog.l.info( "projectId:" + projectId + " ,Objects =" + containerObjects.size());
			byte[] projectIdB = projectId.getBytes();
			Get containerId = new Get(projectIdB);
			
			/**
			 * Look the changes in current cutoff table.
			 */
			byte[] containerCell = null;
			Result res = currentCutoffTable.get(containerId);
			if ( null != res) {
				containerCell = res.getValue(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME);
			}
			
			/**
			 * The project changes are not available in current cutoff table.
			 */
			int containerCellSize = ( null == containerCell) ? 0 : containerCell.length;
			if ( containerCellSize == 0 ) {
				res = lastCutoffTable.get(containerId);
				if ( null != res) {
					containerCell = res.getValue(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME);
				}
			}
			containerCellSize = ( null == containerCell) ? 0 : containerCell.length;
			if ( containerCellSize == 0 ) containerCell = null;
			
			/**
			 * Merge the data
			 */
			if ( INFO_ENABLED ) FileSyncLog.l.info("containerCell:" + 
				( (null == containerCell) ? 0 : containerCell.length) ) ;
			
			byte[] containerCellUpdated = BytesMergerContainer.mergeContainerChangesD(
				containerCell, containerObjects, cutoffTime, -1L);


			if ( INFO_ENABLED ) FileSyncLog.l.info("containerCellUpdated:" + 
					( (null == containerCellUpdated) ? 0 : containerCellUpdated.length) ) ;
			
			/**
			 * Save to current cutoff table
			 */
			Put containerUpdate = new Put(projectIdB);
			containerUpdate.add(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME, containerCellUpdated);
			containerUpdate.setDurability(Durability.SYNC_WAL);
			currentCutoffTable.put(containerUpdate);
		}
		if ( INFO_ENABLED) FileSyncLog.l.info("Cutoff Table Containers Added - " + uniqueProjectIdWithObjIds.size());
	}
	
	/**
	 * 
	 * @param existingCutoffBytes
	 * @param currentChanges
	 * @param cutoffTime
	 * @return
	 * @throws IOException
	 */
	public final ChangeSetDecorator.Deser createObjChangeSets(final ChangeSetDecorator.Deser existingCutoffBytes, 
		final ChangeDecorator.Deser currentChanges, final long cutoffTime) throws IOException {
		
		byte[] data = BytesMergerObject.mergeObjectChanges(
			existingCutoffBytes.data, currentChanges, cutoffTime, -1);
		existingCutoffBytes.data = data;
		return existingCutoffBytes;
	}
	
	/**
	 * 
	 * @param tsTableCurrent
	 * @param startTime
	 * @param uniqueProjectIds
	 * @param uniqueObjIds
	 */
	public void addToTimeseriesChanges(String tsTableCurrent, long startTime, 
			Map<String, String> uniqueContainerKeyWithObjectKeys, List<ChangeDecorator.Deser> timeseriesChanges) {
		
		HBaseFacade facade = null;
		HTableWrapper table = null;
		ResultScanner scanner = null;
		
		try {
					
			facade = HBaseFacade.getInstance();
			table = facade.getTable(tsTableCurrent);
			Scan scan = new Scan();
			scan.setCaching(1024);
			scan.setMaxVersions(1);
			scan.setTimeRange(lastUpdatedTime, startTime);
			scan = scan.addColumn(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
			
			scanner = table.getScanner(scan);
			
			StringBuilder keyBuilder = new StringBuilder();
			int counter = 0;


			for (Result r: scanner) {
				if ( null == r) continue;
				if ( r.isEmpty()) continue;


				counter++;
				if ( counter % 1000 == 0 ) FileSyncLog.l.info(tsTableCurrent + " read : " + counter);
				
				byte[] changeB = r.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
				int changeBSize = ( null == changeB) ? 0 : changeB.length;
				if ( changeBSize == 0 ) continue;


				if ( INFO_ENABLED) FileSyncLog.l.info("Inside AddToTimeSeries: changeB: "+changeB.toString());
				ChangeDecorator.Deser currentChangeDeser = new ChangeDecorator.Deser(changeB);
				Changes currentChange = currentChangeDeser.getChanges();
				
				//Add to Unique Projects
				String containerKey = ObjectKey.getContainerKey(keyBuilder,currentChange);
				String objectKey = ObjectKey.getObjectKey(keyBuilder,currentChange);


				currentChangeDeser.objectKey = objectKey;
				
				if (uniqueContainerKeyWithObjectKeys.containsKey(containerKey)) {
					uniqueContainerKeyWithObjectKeys.put(containerKey, 
						uniqueContainerKeyWithObjectKeys.get(containerKey) + SEPARATOR_OBJID + objectKey);
				} else {
					uniqueContainerKeyWithObjectKeys.put(containerKey, objectKey);					
				}
				
				//Merge Actions of a Object.
				timeseriesChanges.add(currentChangeDeser);
			}
		
		} catch (Exception e) {
		
			FileSyncLog.l.fatal("Unable to execute daily update job. " ,e);
		
		} finally {
			if (null != scanner) {
				try {scanner.close();} catch (Exception ex) {ex.printStackTrace();}
			}
			
			if (null != table) {
				try {facade.putTable(table);} catch (Exception ex) {ex.printStackTrace();}
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		CacheBuildJob.getInstance().run();		
	}
}

avatar
Expert Contributor

Getting below exception-

2016-05-26 16:46:54,288 INFO [main] util.FileSyncLog: containerCell:10412488 2016-05-26 16:46:54,298 INFO [main] util.FileSyncLog: containerCellUpdated:10538784 java.lang.IllegalArgumentException: KeyValue size too large at org.apache.hadoop.hbase.client.HTable.validatePut(HTable.java:1353) at org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:989) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:953) at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.put(HTablePool.java:432) at com.bizosys.hsearch.hbase.HTableWrapper.put(HTableWrapper.java:117) at com.arc.hbase.jobs.CacheBuildJob.saveContainer(CacheBuildJob.java:410) at com.arc.hbase.jobs.CacheBuildJob.save(CacheBuildJob.java:320) at com.arc.hbase.jobs.CacheBuildJob.exec(CacheBuildJob.java:171) at com.arc.hbase.jobs.CacheBuildJob.run(CacheBuildJob.java:75) at com.arc.hbase.jobs.CacheBuildJob.main(CacheBuildJob.java:509)

avatar
Master Collaborator

bq. IllegalArgumentException: KeyValue size too large at

That was where the problem was.

Please check the size of KeyValue on client side.

avatar
Master Collaborator

Here is the default value for the related config:

<name>hbase.client.keyvalue.maxsize</name> <value>10485760</value>

avatar
Expert Contributor

I increased value from 10485760B to 31457280B. Now getting following exception-

2016-05-26 15:39:53,949 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:55,266 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:56,773 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:58,785 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:40:01,809 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,]

avatar
Master Collaborator

The above seems to be from client side.

Any exception on the server side ?

Overall, 31457280 might still be exceeded.

There could be other reason(s).