Created 05-26-2016 04:14 PM
hbase-error.txtUnable to execute hbase job due exception-
2016-05-26 15:49:38,270 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,]
Thu May 26 15:49:38 UTC 2016, org.apache.hadoop.hbase.client.RpcRetryingCaller@77e92d1b, java.io.IOException: Unexpected closed connection
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136) at org.apache.hadoop.hbase.client.HTable.get(HTable.java:811) at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.get(HTablePool.java:394) at com.bizosys.hsearch.hbase.HTableWrapper.get(HTableWrapper.java:100) at com.arc.hbase.jobs.CacheBuildJob.saveContainer(CacheBuildJob.java:387) at com.arc.hbase.jobs.CacheBuildJob.save(CacheBuildJob.java:323) at com.arc.hbase.jobs.CacheBuildJob.exec(CacheBuildJob.java:172) at com.arc.hbase.jobs.CacheBuildJob.run(CacheBuildJob.java:77) at com.arc.hbase.jobs.CacheBuildJob.main(CacheBuildJob.java:513)
Attached is full error log.
Created 05-26-2016 04:31 PM
It seems possible that your application might be prematurely closing the HBase `Connection` while your tasks are still using HTable instances that use that Connection. Have you investigated this at all? Any chance you can share more about your application?
Created 05-26-2016 04:16 PM
Can you check server log on fsdata1c.corp.arc.com to see if there is more clue ?
Which release of hbase are you using ?
Created 05-26-2016 04:30 PM
Here is the region server log on fsdata1c.corp.arc.com
2016-05-26 13:57:47,158 WARN [RpcServer.handler=55,port=60020] ipc.RpcServer: RpcServer.respondercallId: 4324 service: ClientService methodName: Scan size: 30 connection: 10.1.1.243:52740: output error 2016-05-26 13:57:47,159 WARN [RpcServer.handler=55,port=60020] ipc.RpcServer: RpcServer.handler=55,port=60020: caught a ClosedChannelException, this means that the server was processing a request but the client went away. The error message was: null 2016-05-26 13:58:47,135 INFO [regionserver60020.leaseChecker] regionserver.HRegionServer: Scanner 3235538737043012213 lease expired on region CUTOFF4,O11\x09166343\x093\x09162830813,1464012806340.44e206d15b62ed4d452545242bd105cd. 2016-05-26 13:58:52,422 INFO [RpcServer.reader=8,port=60020] ipc.RpcServer: RpcServer.listener,port=60020: count of bytes read: 0 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.hadoop.hbase.ipc.RpcServer.channelRead(RpcServer.java:2224) at org.apache.hadoop.hbase.ipc.RpcServer$Connection.readAndProcess(RpcServer.java:1415) at org.apache.hadoop.hbase.ipc.RpcServer$Listener.doRead(RpcServer.java:790) at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.doRunLoop(RpcServer.java:581) at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.run(RpcServer.java:556) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 2016-05-26 13:58:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12076, accesses=3683357, hits=3525415, hitRatio=95.71%, , cachingAccesses=3547447, cachingHits=3448937, cachingHitsRatio=97.22%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 13:59:52,420 INFO [regionserver60020.leaseChecker] regionserver.HRegionServer: Scanner 594209239597513333 lease expired on region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. 2016-05-26 14:03:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3720308, hits=3562365, hitRatio=95.75%, , cachingAccesses=3584398, cachingHits=3485887, cachingHitsRatio=97.25%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:08:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3749373, hits=3591430, hitRatio=95.79%, , cachingAccesses=3613463, cachingHits=3514952, cachingHitsRatio=97.27%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:13:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3769032, hits=3611089, hitRatio=95.81%, , cachingAccesses=3633122, cachingHits=3534611, cachingHitsRatio=97.29%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:18:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3769844, hits=3611901, hitRatio=95.81%, , cachingAccesses=3633934, cachingHits=3535423, cachingHitsRatio=97.29%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:23:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3831804, hits=3673861, hitRatio=95.88%, , cachingAccesses=3695894, cachingHits=3597383, cachingHitsRatio=97.33%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:28:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3832074, hits=3674131, hitRatio=95.88%, , cachingAccesses=3696164, cachingHits=3597653, cachingHitsRatio=97.33%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:33:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3844554, hits=3686611, hitRatio=95.89%, , cachingAccesses=3708644, cachingHits=3610133, cachingHitsRatio=97.34%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:38:11,712 INFO [regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. after a delay of 15478 2016-05-26 14:38:21,712 INFO [regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. after a delay of 19133 2016-05-26 14:38:27,190 DEBUG [Thread-20] regionserver.HRegion: Started memstore flush for CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894., current region memstore size 93.2 M 2016-05-26 14:38:27,401 INFO [Thread-20] regionserver.DefaultStoreFlusher: Flushed, sequenceid=96047, memsize=31.7 M, hasBloomFilter=true, into tmp file hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/.tmp/6d1e9fe6186a448f9a322c73ecf4ad0a 2016-05-26 14:38:27,411 DEBUG [Thread-20] regionserver.HRegionFileSystem: Committing store file hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/.tmp/6d1e9fe6186a448f9a322c73ecf4ad0a as hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/1/6d1e9fe6186a448f9a322c73ecf4ad0a
Created 05-26-2016 04:31 PM
It seems possible that your application might be prematurely closing the HBase `Connection` while your tasks are still using HTable instances that use that Connection. Have you investigated this at all? Any chance you can share more about your application?
Created 05-26-2016 04:42 PM
Below is code-
package com.arc.hbase.jobs; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import com.arc.datasink.HBaseTables; import com.arc.management.MonitorCacheBuildJob; import com.arc.management.MonitorCollector; import com.arc.management.MonitorMeasure; import com.arc.rest.common.BytesMergerContainer; import com.arc.rest.common.BytesMergerObject; import com.arc.rest.common.ChangeDecorator; import com.arc.rest.common.ChangeSetDecorator; import com.arc.rest.common.ObjectKey; import com.arc.rest.common.PostChanges_3_0.Changes; import com.arc.util.ArcConfig; import com.arc.util.FileSyncLog; import com.arc.util.LineReaderUtil; import com.bizosys.hsearch.hbase.HBaseFacade; import com.bizosys.hsearch.hbase.HTableWrapper; public class CacheBuildJob { private static final boolean INFO_ENABLED = FileSyncLog.l.isInfoEnabled(); private static final char SEPARATOR_OBJID = ','; private static CacheBuildJob instance = null; private static final boolean MONITOR_JOB = ArcConfig.MONITOR_CACHE_BUILD_JOB; public static CacheBuildJob getInstance() { if ( null != instance) return instance; synchronized (CacheBuildJob.class) { if ( null != instance ) return instance; instance = new CacheBuildJob(); } return instance; } long lastUpdatedTime= new Date().getTime() - ArcConfig.CACHE_BUILD_START_INTERVAL; int lastUpdatedDt = new Date(lastUpdatedTime).getDate(); private CacheBuildJob() { } boolean isRunning = false; public void run() { if ( isRunning ) { if ( INFO_ENABLED ) FileSyncLog.l.info(new Date().toLocaleString() + " Cache Build Job SKIPPED"); return; } isRunning = true; long start = System.currentTimeMillis(); try { exec(); long end = System.currentTimeMillis(); if ( INFO_ENABLED ) FileSyncLog.l.info(new Date().toLocaleString() + " Cache Build Job SUCESS, Run time in ms: " + (end - start)); } catch (Exception ex) { long end = System.currentTimeMillis(); ex.printStackTrace(); FileSyncLog.l.fatal(new Date().toLocaleString() + " Cache Build Job FAILED, Run time in ms: " + (end - start)); } finally { isRunning = false; } } public void exec() throws Exception { long currentTime = System.currentTimeMillis(); //Take the clock 2 sec to past currentTime = currentTime - ArcConfig.CACHEBUILD_JOB_RUN_INTERVAL; if ( lastUpdatedTime >= currentTime) { FileSyncLog.l.warn("CacheBuildJob : lastUpdatedTime >= currentTime : " + lastUpdatedTime + ">=" + currentTime); return; } Date now = new Date(currentTime); long startTime = currentTime; int currentUpdatedDt = now.getDate(); Map<String, String> uniqueCotainerKeyWithObjectKeys = new HashMap<String, String>(1024); List<ChangeDecorator.Deser> timeseriesChanges = new ArrayList<ChangeDecorator.Deser>(1024); if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges: Start"); if ( MONITOR_JOB ) MonitorCacheBuildJob.getInstance().onEnter(); //Step1 - Get last left rows from the old table if ( lastUpdatedDt != currentUpdatedDt) { // String tsTable = HBaseTables.getLastTimeSeriesTable(currentUpdatedDt); String tsTable = HBaseTables.getLastTimeSeriesTable(currentTime); addToTimeseriesChanges(tsTable, startTime, uniqueCotainerKeyWithObjectKeys, timeseriesChanges); } if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges, Changes: " + timeseriesChanges.size() + " Projects:" + uniqueCotainerKeyWithObjectKeys.size()); //Step2 - Get from current table // String tsTable = HBaseTables.getTimeSeriesTable(currentUpdatedDt); String tsTable = HBaseTables.getTimeSeriesTable(currentTime); addToTimeseriesChanges(tsTable, startTime, uniqueCotainerKeyWithObjectKeys, timeseriesChanges); if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges, Changes: " + timeseriesChanges.size() + " Projects:" + uniqueCotainerKeyWithObjectKeys.size()); //Step3 -Merge with cutoff table. String currentCutoffTableName = HBaseTables.getCutoffTable(currentUpdatedDt); String lastCutoffTableName = HBaseTables.getLastCutoffTable(currentUpdatedDt); HBaseFacade facade = null; HTableWrapper currentCutoffTable = null; HTableWrapper lastCutoffTable = null; long cutoffTime = startTime - HBaseTables.CUTOFF_DURATION_SECS * 1000; /** * We have all the ChangeDecorators. Next Steps * > */ try { facade = HBaseFacade.getInstance(); if ( INFO_ENABLED) { FileSyncLog.l.info("Current Cutoff Table: " + currentCutoffTableName + " (Size)" + timeseriesChanges.size() + " (Cutoff Limit)" + new Date(cutoffTime)); } currentCutoffTable = facade.getTable(currentCutoffTableName); lastCutoffTable = facade.getTable(lastCutoffTableName); // System.out.println("TimeSeriesTable - "+tsTable+"\tCurrent Cutoff Table - "+ currentCutoffTableName+"\tLast Cutoff Table - "+ lastCutoffTableName + " on " + now.toString() + " time in millis "+ now.getTime() + " currentTime " + currentTime); int batchSize = ArcConfig.CACHE_BUILD_BATCH_SIZE; Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets = new HashMap<String, ChangeSetDecorator.Deser>(1024); List<Put> putL = new ArrayList<Put>(batchSize); for (ChangeDecorator.Deser deserCh : timeseriesChanges) { deserCh.touch(System.currentTimeMillis()); String objectKey = deserCh.objectKey; /** * Batch Flush on 4096 objects = 4MB */ // System.out.println("CacheBuild Time -: "+(System.currentTimeMillis()-deserCh.getTime())); if ( objKeyWithChangeSets.size() >= batchSize ) { if ( INFO_ENABLED) FileSyncLog.l.info("Saving: Enter"); save(currentCutoffTable, lastCutoffTable, objKeyWithChangeSets, uniqueCotainerKeyWithObjectKeys, cutoffTime, putL); if ( INFO_ENABLED) FileSyncLog.l.info("Saving: Exit"); } /** * Step: 1 # Memory Table Lookup, * If any object id changes are already there, means already loaded and read. * Just merge to it. */ if (objKeyWithChangeSets.containsKey(objectKey)) { if ( INFO_ENABLED) FileSyncLog.l.info("Memory Table Lookup: " + objectKey); ChangeSetDecorator.Deser mergedVal = createObjChangeSets(objKeyWithChangeSets.get(objectKey), deserCh, cutoffTime); mergedVal.key = objectKey; mergedVal.itemId = deserCh.getChanges().getItemId(); objKeyWithChangeSets.put(objectKey, mergedVal); continue; } Get getter = new Get(objectKey.getBytes()); /** * Step: 2 # Look in current cutoff Table, */ Result resultC = currentCutoffTable.get(getter); { if ( null != resultC) { byte[] val = resultC.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME); int valSize = ( null == val) ? 0 : val.length; if ( valSize == 0 ) val = null; if ( null != val ) { if ( INFO_ENABLED) FileSyncLog.l.info("Curent cutoff table Lookup: " + objectKey); ChangeSetDecorator.Deser cs = new ChangeSetDecorator.Deser(val); cs.key = objectKey; cs.itemId = deserCh.getChanges().getItemId(); ChangeSetDecorator.Deser mergedVal = createObjChangeSets(cs, deserCh, cutoffTime); objKeyWithChangeSets.put(objectKey, mergedVal); continue; } } } /** * Step: 3 # Fall back to last cutoff table as does not exist in * current cut off table. */ Result resultO = lastCutoffTable.get(getter); if ( null != resultO) { byte[] val = resultO.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME); int valSize = ( null == val) ? 0 : val.length; if ( valSize == 0 ) val = null; if ( null != val ) { if ( INFO_ENABLED) FileSyncLog.l.info("Previous cutoff table Lookup: " + objectKey); ChangeSetDecorator.Deser cs = new ChangeSetDecorator.Deser(val); cs.key = objectKey; cs.itemId = deserCh.getChanges().getItemId(); ChangeSetDecorator.Deser mergedVal = createObjChangeSets(cs, deserCh, cutoffTime); objKeyWithChangeSets.put(objectKey, mergedVal); continue; } } /** * We didn't find in current or last cutoff table. * It is a fresh change. Boot strap the changes */ if ( INFO_ENABLED) FileSyncLog.l.info("Bootstrapping: " + objectKey); ChangeSetDecorator.Deser none = new ChangeSetDecorator.Deser(null); none.key = objectKey; none.itemId = deserCh.getChanges().getItemId(); ChangeSetDecorator.Deser mergedVal = createObjChangeSets(none, deserCh, -1); objKeyWithChangeSets.put(objectKey, mergedVal); } if ( objKeyWithChangeSets.size() >= 0 ) { save(currentCutoffTable, lastCutoffTable, objKeyWithChangeSets, uniqueCotainerKeyWithObjectKeys, cutoffTime, putL); } /** * Step: 4 # All sucess, move to next time stamp */ lastUpdatedTime = startTime; //maximum timestamp value, exclusive lastUpdatedDt = currentUpdatedDt; } catch (Exception ex) { throw ex; } finally { if ( null != facade && null != currentCutoffTable) facade.putTable(currentCutoffTable); if ( null != facade && null != lastCutoffTable) facade.putTable(lastCutoffTable); } long endTime = System.currentTimeMillis(); if ( MONITOR_JOB ) { long timeTaken = (endTime - startTime); MonitorCollector collector = new MonitorCollector(); collector.add(new MonitorMeasure("CacheBuildJob", timeTaken)); MonitorCacheBuildJob.getInstance().onExit(collector); } } /** * * @param currentCutoffTable * @param lastCutoffTable * @param objKeyWithChangeSets * @param uniqueProjectIdWithObjIds * @param cutoffTime * @param putL * @throws IOException */ private void save(HTableWrapper currentCutoffTable, HTableWrapper lastCutoffTable, Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets, Map<String, String> uniqueProjectIdWithObjIds, long cutoffTime, List<Put> putL) throws IOException { putL.clear(); for (String key : objKeyWithChangeSets.keySet()) { ChangeSetDecorator.Deser val = objKeyWithChangeSets.get(key); Put update = new Put(key.getBytes()); update.add(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME, val.data); update.setDurability(Durability.SYNC_WAL); putL.add(update); } currentCutoffTable.put(putL); if ( INFO_ENABLED) FileSyncLog.l.info("Cutoff Table Objects Added - " + putL.size()); putL.clear(); saveContainer(currentCutoffTable, lastCutoffTable, objKeyWithChangeSets, uniqueProjectIdWithObjIds, cutoffTime); currentCutoffTable.flushCommits(); objKeyWithChangeSets.clear(); } /** * * @param currentCutoffTable * @param lastCutoffTable * @param objKeyWithChangeSets * @param uniqueProjectIdWithObjIds * @param cutoffTime * @throws IOException */ private void saveContainer(HTableWrapper currentCutoffTable, HTableWrapper lastCutoffTable, Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets, Map<String, String> uniqueProjectIdWithObjIds, long cutoffTime) throws IOException { /** * mergeContainerChanges for the current projects */ List<String> objKeyL = new ArrayList<String>(); Set<ChangeSetDecorator.Deser> containerObjects = new HashSet<ChangeSetDecorator.Deser>(); for (String projectId : uniqueProjectIdWithObjIds.keySet()) { objKeyL.clear(); containerObjects.clear(); /** * Find out all object Ids belonging to this project and in current set */ String objectKeys = uniqueProjectIdWithObjIds.get(projectId); LineReaderUtil.fastSplit(objKeyL, objectKeys, SEPARATOR_OBJID); for (String objKey : objKeyL) { ChangeSetDecorator.Deser val = objKeyWithChangeSets.get(objKey); if ( null != val) containerObjects.add( val); } if ( INFO_ENABLED) FileSyncLog.l.info( "projectId:" + projectId + " ,Objects =" + containerObjects.size()); byte[] projectIdB = projectId.getBytes(); Get containerId = new Get(projectIdB); /** * Look the changes in current cutoff table. */ byte[] containerCell = null; Result res = currentCutoffTable.get(containerId); if ( null != res) { containerCell = res.getValue(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME); } /** * The project changes are not available in current cutoff table. */ int containerCellSize = ( null == containerCell) ? 0 : containerCell.length; if ( containerCellSize == 0 ) { res = lastCutoffTable.get(containerId); if ( null != res) { containerCell = res.getValue(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME); } } containerCellSize = ( null == containerCell) ? 0 : containerCell.length; if ( containerCellSize == 0 ) containerCell = null; /** * Merge the data */ if ( INFO_ENABLED ) FileSyncLog.l.info("containerCell:" + ( (null == containerCell) ? 0 : containerCell.length) ) ; byte[] containerCellUpdated = BytesMergerContainer.mergeContainerChangesD( containerCell, containerObjects, cutoffTime, -1L); if ( INFO_ENABLED ) FileSyncLog.l.info("containerCellUpdated:" + ( (null == containerCellUpdated) ? 0 : containerCellUpdated.length) ) ; /** * Save to current cutoff table */ Put containerUpdate = new Put(projectIdB); containerUpdate.add(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME, containerCellUpdated); containerUpdate.setDurability(Durability.SYNC_WAL); currentCutoffTable.put(containerUpdate); } if ( INFO_ENABLED) FileSyncLog.l.info("Cutoff Table Containers Added - " + uniqueProjectIdWithObjIds.size()); } /** * * @param existingCutoffBytes * @param currentChanges * @param cutoffTime * @return * @throws IOException */ public final ChangeSetDecorator.Deser createObjChangeSets(final ChangeSetDecorator.Deser existingCutoffBytes, final ChangeDecorator.Deser currentChanges, final long cutoffTime) throws IOException { byte[] data = BytesMergerObject.mergeObjectChanges( existingCutoffBytes.data, currentChanges, cutoffTime, -1); existingCutoffBytes.data = data; return existingCutoffBytes; } /** * * @param tsTableCurrent * @param startTime * @param uniqueProjectIds * @param uniqueObjIds */ public void addToTimeseriesChanges(String tsTableCurrent, long startTime, Map<String, String> uniqueContainerKeyWithObjectKeys, List<ChangeDecorator.Deser> timeseriesChanges) { HBaseFacade facade = null; HTableWrapper table = null; ResultScanner scanner = null; try { facade = HBaseFacade.getInstance(); table = facade.getTable(tsTableCurrent); Scan scan = new Scan(); scan.setCaching(1024); scan.setMaxVersions(1); scan.setTimeRange(lastUpdatedTime, startTime); scan = scan.addColumn(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME); scanner = table.getScanner(scan); StringBuilder keyBuilder = new StringBuilder(); int counter = 0; for (Result r: scanner) { if ( null == r) continue; if ( r.isEmpty()) continue; counter++; if ( counter % 1000 == 0 ) FileSyncLog.l.info(tsTableCurrent + " read : " + counter); byte[] changeB = r.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME); int changeBSize = ( null == changeB) ? 0 : changeB.length; if ( changeBSize == 0 ) continue; if ( INFO_ENABLED) FileSyncLog.l.info("Inside AddToTimeSeries: changeB: "+changeB.toString()); ChangeDecorator.Deser currentChangeDeser = new ChangeDecorator.Deser(changeB); Changes currentChange = currentChangeDeser.getChanges(); //Add to Unique Projects String containerKey = ObjectKey.getContainerKey(keyBuilder,currentChange); String objectKey = ObjectKey.getObjectKey(keyBuilder,currentChange); currentChangeDeser.objectKey = objectKey; if (uniqueContainerKeyWithObjectKeys.containsKey(containerKey)) { uniqueContainerKeyWithObjectKeys.put(containerKey, uniqueContainerKeyWithObjectKeys.get(containerKey) + SEPARATOR_OBJID + objectKey); } else { uniqueContainerKeyWithObjectKeys.put(containerKey, objectKey); } //Merge Actions of a Object. timeseriesChanges.add(currentChangeDeser); } } catch (Exception e) { FileSyncLog.l.fatal("Unable to execute daily update job. " ,e); } finally { if (null != scanner) { try {scanner.close();} catch (Exception ex) {ex.printStackTrace();} } if (null != table) { try {facade.putTable(table);} catch (Exception ex) {ex.printStackTrace();} } } } public static void main(String[] args) throws Exception { CacheBuildJob.getInstance().run(); } }
Created 05-26-2016 04:47 PM
Getting below exception-
2016-05-26 16:46:54,288 INFO [main] util.FileSyncLog: containerCell:10412488 2016-05-26 16:46:54,298 INFO [main] util.FileSyncLog: containerCellUpdated:10538784 java.lang.IllegalArgumentException: KeyValue size too large at org.apache.hadoop.hbase.client.HTable.validatePut(HTable.java:1353) at org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:989) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:953) at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.put(HTablePool.java:432) at com.bizosys.hsearch.hbase.HTableWrapper.put(HTableWrapper.java:117) at com.arc.hbase.jobs.CacheBuildJob.saveContainer(CacheBuildJob.java:410) at com.arc.hbase.jobs.CacheBuildJob.save(CacheBuildJob.java:320) at com.arc.hbase.jobs.CacheBuildJob.exec(CacheBuildJob.java:171) at com.arc.hbase.jobs.CacheBuildJob.run(CacheBuildJob.java:75) at com.arc.hbase.jobs.CacheBuildJob.main(CacheBuildJob.java:509)
Created 05-26-2016 04:49 PM
bq. IllegalArgumentException: KeyValue size too large at
That was where the problem was.
Please check the size of KeyValue on client side.
Created 05-26-2016 04:49 PM
Here is the default value for the related config:
<name>hbase.client.keyvalue.maxsize</name> <value>10485760</value>
Created 05-26-2016 05:14 PM
I increased value from 10485760B to 31457280B. Now getting following exception-
2016-05-26 15:39:53,949 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:55,266 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:56,773 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:58,785 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:40:01,809 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,]
Created 05-26-2016 05:17 PM
The above seems to be from client side.
Any exception on the server side ?
Overall, 31457280 might still be exceeded.
There could be other reason(s).