Created 05-26-2016 04:14 PM
hbase-error.txtUnable to execute hbase job due exception-
2016-05-26 15:49:38,270 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,]
Thu May 26 15:49:38 UTC 2016, org.apache.hadoop.hbase.client.RpcRetryingCaller@77e92d1b, java.io.IOException: Unexpected closed connection
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:136) at org.apache.hadoop.hbase.client.HTable.get(HTable.java:811) at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.get(HTablePool.java:394) at com.bizosys.hsearch.hbase.HTableWrapper.get(HTableWrapper.java:100) at com.arc.hbase.jobs.CacheBuildJob.saveContainer(CacheBuildJob.java:387) at com.arc.hbase.jobs.CacheBuildJob.save(CacheBuildJob.java:323) at com.arc.hbase.jobs.CacheBuildJob.exec(CacheBuildJob.java:172) at com.arc.hbase.jobs.CacheBuildJob.run(CacheBuildJob.java:77) at com.arc.hbase.jobs.CacheBuildJob.main(CacheBuildJob.java:513)
Attached is full error log.
Created 05-26-2016 04:31 PM
It seems possible that your application might be prematurely closing the HBase `Connection` while your tasks are still using HTable instances that use that Connection. Have you investigated this at all? Any chance you can share more about your application?
Created 05-26-2016 04:16 PM
Can you check server log on fsdata1c.corp.arc.com to see if there is more clue ?
Which release of hbase are you using ?
Created 05-26-2016 04:30 PM
Here is the region server log on fsdata1c.corp.arc.com
2016-05-26 13:57:47,158 WARN [RpcServer.handler=55,port=60020] ipc.RpcServer: RpcServer.respondercallId: 4324 service: ClientService methodName: Scan size: 30 connection: 10.1.1.243:52740: output error 2016-05-26 13:57:47,159 WARN [RpcServer.handler=55,port=60020] ipc.RpcServer: RpcServer.handler=55,port=60020: caught a ClosedChannelException, this means that the server was processing a request but the client went away. The error message was: null 2016-05-26 13:58:47,135 INFO [regionserver60020.leaseChecker] regionserver.HRegionServer: Scanner 3235538737043012213 lease expired on region CUTOFF4,O11\x09166343\x093\x09162830813,1464012806340.44e206d15b62ed4d452545242bd105cd. 2016-05-26 13:58:52,422 INFO [RpcServer.reader=8,port=60020] ipc.RpcServer: RpcServer.listener,port=60020: count of bytes read: 0 java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.hadoop.hbase.ipc.RpcServer.channelRead(RpcServer.java:2224) at org.apache.hadoop.hbase.ipc.RpcServer$Connection.readAndProcess(RpcServer.java:1415) at org.apache.hadoop.hbase.ipc.RpcServer$Listener.doRead(RpcServer.java:790) at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.doRunLoop(RpcServer.java:581) at org.apache.hadoop.hbase.ipc.RpcServer$Listener$Reader.run(RpcServer.java:556) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 2016-05-26 13:58:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12076, accesses=3683357, hits=3525415, hitRatio=95.71%, , cachingAccesses=3547447, cachingHits=3448937, cachingHitsRatio=97.22%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 13:59:52,420 INFO [regionserver60020.leaseChecker] regionserver.HRegionServer: Scanner 594209239597513333 lease expired on region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. 2016-05-26 14:03:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3720308, hits=3562365, hitRatio=95.75%, , cachingAccesses=3584398, cachingHits=3485887, cachingHitsRatio=97.25%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:08:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3749373, hits=3591430, hitRatio=95.79%, , cachingAccesses=3613463, cachingHits=3514952, cachingHitsRatio=97.27%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:13:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3769032, hits=3611089, hitRatio=95.81%, , cachingAccesses=3633122, cachingHits=3534611, cachingHitsRatio=97.29%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:18:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3769844, hits=3611901, hitRatio=95.81%, , cachingAccesses=3633934, cachingHits=3535423, cachingHitsRatio=97.29%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:23:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3831804, hits=3673861, hitRatio=95.88%, , cachingAccesses=3695894, cachingHits=3597383, cachingHitsRatio=97.33%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:28:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3832074, hits=3674131, hitRatio=95.88%, , cachingAccesses=3696164, cachingHits=3597653, cachingHitsRatio=97.33%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:33:55,249 DEBUG [LruStats #0] hfile.LruBlockCache: Total=1.18 GB, free=414.12 MB, max=1.58 GB, blocks=12077, accesses=3844554, hits=3686611, hitRatio=95.89%, , cachingAccesses=3708644, cachingHits=3610133, cachingHitsRatio=97.34%, evictions=0, evicted=79712, evictedPerRun=Infinity 2016-05-26 14:38:11,712 INFO [regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. after a delay of 15478 2016-05-26 14:38:21,712 INFO [regionserver60020.periodicFlusher] regionserver.HRegionServer: regionserver60020.periodicFlusher requesting flush for region CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894. after a delay of 19133 2016-05-26 14:38:27,190 DEBUG [Thread-20] regionserver.HRegion: Started memstore flush for CUTOFF4,,1464012806340.48ec64624ad37ae9272c5c28ec177894., current region memstore size 93.2 M 2016-05-26 14:38:27,401 INFO [Thread-20] regionserver.DefaultStoreFlusher: Flushed, sequenceid=96047, memsize=31.7 M, hasBloomFilter=true, into tmp file hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/.tmp/6d1e9fe6186a448f9a322c73ecf4ad0a 2016-05-26 14:38:27,411 DEBUG [Thread-20] regionserver.HRegionFileSystem: Committing store file hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/.tmp/6d1e9fe6186a448f9a322c73ecf4ad0a as hdfs://fsmaster1c.corp.arc.com:8020/apps/hbase/data/data/default/CUTOFF4/48ec64624ad37ae9272c5c28ec177894/1/6d1e9fe6186a448f9a322c73ecf4ad0a
Created 05-26-2016 04:31 PM
It seems possible that your application might be prematurely closing the HBase `Connection` while your tasks are still using HTable instances that use that Connection. Have you investigated this at all? Any chance you can share more about your application?
Created 05-26-2016 04:42 PM
Below is code-
package com.arc.hbase.jobs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import com.arc.datasink.HBaseTables;
import com.arc.management.MonitorCacheBuildJob;
import com.arc.management.MonitorCollector;
import com.arc.management.MonitorMeasure;
import com.arc.rest.common.BytesMergerContainer;
import com.arc.rest.common.BytesMergerObject;
import com.arc.rest.common.ChangeDecorator;
import com.arc.rest.common.ChangeSetDecorator;
import com.arc.rest.common.ObjectKey;
import com.arc.rest.common.PostChanges_3_0.Changes;
import com.arc.util.ArcConfig;
import com.arc.util.FileSyncLog;
import com.arc.util.LineReaderUtil;
import com.bizosys.hsearch.hbase.HBaseFacade;
import com.bizosys.hsearch.hbase.HTableWrapper;
public class CacheBuildJob
{
private static final boolean INFO_ENABLED = FileSyncLog.l.isInfoEnabled();
private static final char SEPARATOR_OBJID = ',';
private static CacheBuildJob instance = null;
private static final boolean MONITOR_JOB = ArcConfig.MONITOR_CACHE_BUILD_JOB;
public static CacheBuildJob getInstance()
{
if ( null != instance) return instance;
synchronized (CacheBuildJob.class)
{
if ( null != instance ) return instance;
instance = new CacheBuildJob();
}
return instance;
}
long lastUpdatedTime= new Date().getTime() - ArcConfig.CACHE_BUILD_START_INTERVAL;
int lastUpdatedDt = new Date(lastUpdatedTime).getDate();
private CacheBuildJob()
{
}
boolean isRunning = false;
public void run()
{
if ( isRunning )
{
if ( INFO_ENABLED ) FileSyncLog.l.info(new Date().toLocaleString() + " Cache Build Job SKIPPED");
return;
}
isRunning = true;
long start = System.currentTimeMillis();
try
{
exec();
long end = System.currentTimeMillis();
if ( INFO_ENABLED ) FileSyncLog.l.info(new Date().toLocaleString() + " Cache Build Job SUCESS, Run time in ms: " + (end - start));
}
catch (Exception ex)
{
long end = System.currentTimeMillis();
ex.printStackTrace();
FileSyncLog.l.fatal(new Date().toLocaleString() + " Cache Build Job FAILED, Run time in ms: " + (end - start));
}
finally
{
isRunning = false;
}
}
public void exec() throws Exception
{
long currentTime = System.currentTimeMillis();
//Take the clock 2 sec to past
currentTime = currentTime - ArcConfig.CACHEBUILD_JOB_RUN_INTERVAL;
if ( lastUpdatedTime >= currentTime) {
FileSyncLog.l.warn("CacheBuildJob : lastUpdatedTime >= currentTime : " + lastUpdatedTime + ">=" + currentTime);
return;
}
Date now = new Date(currentTime);
long startTime = currentTime;
int currentUpdatedDt = now.getDate();
Map<String, String> uniqueCotainerKeyWithObjectKeys = new HashMap<String, String>(1024);
List<ChangeDecorator.Deser> timeseriesChanges = new ArrayList<ChangeDecorator.Deser>(1024);
if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges: Start");
if ( MONITOR_JOB ) MonitorCacheBuildJob.getInstance().onEnter();
//Step1 - Get last left rows from the old table
if ( lastUpdatedDt != currentUpdatedDt)
{
// String tsTable = HBaseTables.getLastTimeSeriesTable(currentUpdatedDt);
String tsTable = HBaseTables.getLastTimeSeriesTable(currentTime);
addToTimeseriesChanges(tsTable, startTime, uniqueCotainerKeyWithObjectKeys, timeseriesChanges);
}
if ( INFO_ENABLED) FileSyncLog.l.info("AddToTimeseriesChanges, Changes: " + timeseriesChanges.size() + " Projects:" + uniqueCotainerKeyWithObjectKeys.size());
//Step2 - Get from current table
// String tsTable = HBaseTables.getTimeSeriesTable(currentUpdatedDt);
String tsTable = HBaseTables.getTimeSeriesTable(currentTime);
addToTimeseriesChanges(tsTable, startTime, uniqueCotainerKeyWithObjectKeys, timeseriesChanges);
if ( INFO_ENABLED)
FileSyncLog.l.info("AddToTimeseriesChanges, Changes: " + timeseriesChanges.size() + " Projects:" + uniqueCotainerKeyWithObjectKeys.size());
//Step3 -Merge with cutoff table.
String currentCutoffTableName = HBaseTables.getCutoffTable(currentUpdatedDt);
String lastCutoffTableName = HBaseTables.getLastCutoffTable(currentUpdatedDt);
HBaseFacade facade = null;
HTableWrapper currentCutoffTable = null;
HTableWrapper lastCutoffTable = null;
long cutoffTime = startTime - HBaseTables.CUTOFF_DURATION_SECS * 1000;
/**
* We have all the ChangeDecorators. Next Steps
* >
*/
try {
facade = HBaseFacade.getInstance();
if ( INFO_ENABLED) {
FileSyncLog.l.info("Current Cutoff Table: " + currentCutoffTableName +
" (Size)" + timeseriesChanges.size() + " (Cutoff Limit)" + new Date(cutoffTime));
}
currentCutoffTable = facade.getTable(currentCutoffTableName);
lastCutoffTable = facade.getTable(lastCutoffTableName);
// System.out.println("TimeSeriesTable - "+tsTable+"\tCurrent Cutoff Table - "+ currentCutoffTableName+"\tLast Cutoff Table - "+ lastCutoffTableName + " on " + now.toString() + " time in millis "+ now.getTime() + " currentTime " + currentTime);
int batchSize = ArcConfig.CACHE_BUILD_BATCH_SIZE;
Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets =
new HashMap<String, ChangeSetDecorator.Deser>(1024);
List<Put> putL = new ArrayList<Put>(batchSize);
for (ChangeDecorator.Deser deserCh : timeseriesChanges)
{
deserCh.touch(System.currentTimeMillis());
String objectKey = deserCh.objectKey;
/**
* Batch Flush on 4096 objects = 4MB
*/
// System.out.println("CacheBuild Time -: "+(System.currentTimeMillis()-deserCh.getTime()));
if ( objKeyWithChangeSets.size() >= batchSize )
{
if ( INFO_ENABLED) FileSyncLog.l.info("Saving: Enter");
save(currentCutoffTable, lastCutoffTable, objKeyWithChangeSets, uniqueCotainerKeyWithObjectKeys, cutoffTime, putL);
if ( INFO_ENABLED) FileSyncLog.l.info("Saving: Exit");
}
/**
* Step: 1 # Memory Table Lookup,
* If any object id changes are already there, means already loaded and read.
* Just merge to it.
*/
if (objKeyWithChangeSets.containsKey(objectKey)) {
if ( INFO_ENABLED) FileSyncLog.l.info("Memory Table Lookup: " + objectKey);
ChangeSetDecorator.Deser mergedVal =
createObjChangeSets(objKeyWithChangeSets.get(objectKey), deserCh, cutoffTime);
mergedVal.key = objectKey;
mergedVal.itemId = deserCh.getChanges().getItemId();
objKeyWithChangeSets.put(objectKey, mergedVal);
continue;
}
Get getter = new Get(objectKey.getBytes());
/**
* Step: 2 # Look in current cutoff Table,
*/
Result resultC = currentCutoffTable.get(getter);
{
if ( null != resultC) {
byte[] val = resultC.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
int valSize = ( null == val) ? 0 : val.length;
if ( valSize == 0 ) val = null;
if ( null != val ) {
if ( INFO_ENABLED) FileSyncLog.l.info("Curent cutoff table Lookup: " + objectKey);
ChangeSetDecorator.Deser cs = new ChangeSetDecorator.Deser(val);
cs.key = objectKey;
cs.itemId = deserCh.getChanges().getItemId();
ChangeSetDecorator.Deser mergedVal = createObjChangeSets(cs, deserCh, cutoffTime);
objKeyWithChangeSets.put(objectKey, mergedVal);
continue;
}
}
}
/**
* Step: 3 # Fall back to last cutoff table as does not exist in
* current cut off table.
*/
Result resultO = lastCutoffTable.get(getter);
if ( null != resultO) {
byte[] val = resultO.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
int valSize = ( null == val) ? 0 : val.length;
if ( valSize == 0 ) val = null;
if ( null != val ) {
if ( INFO_ENABLED) FileSyncLog.l.info("Previous cutoff table Lookup: " + objectKey);
ChangeSetDecorator.Deser cs = new ChangeSetDecorator.Deser(val);
cs.key = objectKey;
cs.itemId = deserCh.getChanges().getItemId();
ChangeSetDecorator.Deser mergedVal = createObjChangeSets(cs, deserCh, cutoffTime);
objKeyWithChangeSets.put(objectKey, mergedVal);
continue;
}
}
/**
* We didn't find in current or last cutoff table.
* It is a fresh change. Boot strap the changes
*/
if ( INFO_ENABLED) FileSyncLog.l.info("Bootstrapping: " + objectKey);
ChangeSetDecorator.Deser none = new ChangeSetDecorator.Deser(null);
none.key = objectKey;
none.itemId = deserCh.getChanges().getItemId();
ChangeSetDecorator.Deser mergedVal = createObjChangeSets(none, deserCh, -1);
objKeyWithChangeSets.put(objectKey, mergedVal);
}
if ( objKeyWithChangeSets.size() >= 0 ) {
save(currentCutoffTable, lastCutoffTable,
objKeyWithChangeSets, uniqueCotainerKeyWithObjectKeys, cutoffTime, putL);
}
/**
* Step: 4 # All sucess, move to next time stamp
*/
lastUpdatedTime = startTime; //maximum timestamp value, exclusive
lastUpdatedDt = currentUpdatedDt;
} catch (Exception ex) {
throw ex;
} finally {
if ( null != facade && null != currentCutoffTable) facade.putTable(currentCutoffTable);
if ( null != facade && null != lastCutoffTable) facade.putTable(lastCutoffTable);
}
long endTime = System.currentTimeMillis();
if ( MONITOR_JOB )
{
long timeTaken = (endTime - startTime);
MonitorCollector collector = new MonitorCollector();
collector.add(new MonitorMeasure("CacheBuildJob", timeTaken));
MonitorCacheBuildJob.getInstance().onExit(collector);
}
}
/**
*
* @param currentCutoffTable
* @param lastCutoffTable
* @param objKeyWithChangeSets
* @param uniqueProjectIdWithObjIds
* @param cutoffTime
* @param putL
* @throws IOException
*/
private void save(HTableWrapper currentCutoffTable, HTableWrapper lastCutoffTable,
Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets,
Map<String, String> uniqueProjectIdWithObjIds,
long cutoffTime, List<Put> putL)
throws IOException {
putL.clear();
for (String key : objKeyWithChangeSets.keySet()) {
ChangeSetDecorator.Deser val = objKeyWithChangeSets.get(key);
Put update = new Put(key.getBytes());
update.add(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME, val.data);
update.setDurability(Durability.SYNC_WAL);
putL.add(update);
}
currentCutoffTable.put(putL);
if ( INFO_ENABLED) FileSyncLog.l.info("Cutoff Table Objects Added - " + putL.size());
putL.clear();
saveContainer(currentCutoffTable, lastCutoffTable,
objKeyWithChangeSets, uniqueProjectIdWithObjIds, cutoffTime);
currentCutoffTable.flushCommits();
objKeyWithChangeSets.clear();
}
/**
*
* @param currentCutoffTable
* @param lastCutoffTable
* @param objKeyWithChangeSets
* @param uniqueProjectIdWithObjIds
* @param cutoffTime
* @throws IOException
*/
private void saveContainer(HTableWrapper currentCutoffTable,
HTableWrapper lastCutoffTable,
Map<String, ChangeSetDecorator.Deser> objKeyWithChangeSets,
Map<String, String> uniqueProjectIdWithObjIds, long cutoffTime)
throws IOException {
/**
* mergeContainerChanges for the current projects
*/
List<String> objKeyL = new ArrayList<String>();
Set<ChangeSetDecorator.Deser> containerObjects = new HashSet<ChangeSetDecorator.Deser>();
for (String projectId : uniqueProjectIdWithObjIds.keySet()) {
objKeyL.clear();
containerObjects.clear();
/**
* Find out all object Ids belonging to this project and in current set
*/
String objectKeys = uniqueProjectIdWithObjIds.get(projectId);
LineReaderUtil.fastSplit(objKeyL, objectKeys, SEPARATOR_OBJID);
for (String objKey : objKeyL) {
ChangeSetDecorator.Deser val = objKeyWithChangeSets.get(objKey);
if ( null != val) containerObjects.add( val);
}
if ( INFO_ENABLED) FileSyncLog.l.info( "projectId:" + projectId + " ,Objects =" + containerObjects.size());
byte[] projectIdB = projectId.getBytes();
Get containerId = new Get(projectIdB);
/**
* Look the changes in current cutoff table.
*/
byte[] containerCell = null;
Result res = currentCutoffTable.get(containerId);
if ( null != res) {
containerCell = res.getValue(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME);
}
/**
* The project changes are not available in current cutoff table.
*/
int containerCellSize = ( null == containerCell) ? 0 : containerCell.length;
if ( containerCellSize == 0 ) {
res = lastCutoffTable.get(containerId);
if ( null != res) {
containerCell = res.getValue(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME);
}
}
containerCellSize = ( null == containerCell) ? 0 : containerCell.length;
if ( containerCellSize == 0 ) containerCell = null;
/**
* Merge the data
*/
if ( INFO_ENABLED ) FileSyncLog.l.info("containerCell:" +
( (null == containerCell) ? 0 : containerCell.length) ) ;
byte[] containerCellUpdated = BytesMergerContainer.mergeContainerChangesD(
containerCell, containerObjects, cutoffTime, -1L);
if ( INFO_ENABLED ) FileSyncLog.l.info("containerCellUpdated:" +
( (null == containerCellUpdated) ? 0 : containerCellUpdated.length) ) ;
/**
* Save to current cutoff table
*/
Put containerUpdate = new Put(projectIdB);
containerUpdate.add(HBaseTables.FAMILY_NAME,HBaseTables.COL_NAME, containerCellUpdated);
containerUpdate.setDurability(Durability.SYNC_WAL);
currentCutoffTable.put(containerUpdate);
}
if ( INFO_ENABLED) FileSyncLog.l.info("Cutoff Table Containers Added - " + uniqueProjectIdWithObjIds.size());
}
/**
*
* @param existingCutoffBytes
* @param currentChanges
* @param cutoffTime
* @return
* @throws IOException
*/
public final ChangeSetDecorator.Deser createObjChangeSets(final ChangeSetDecorator.Deser existingCutoffBytes,
final ChangeDecorator.Deser currentChanges, final long cutoffTime) throws IOException {
byte[] data = BytesMergerObject.mergeObjectChanges(
existingCutoffBytes.data, currentChanges, cutoffTime, -1);
existingCutoffBytes.data = data;
return existingCutoffBytes;
}
/**
*
* @param tsTableCurrent
* @param startTime
* @param uniqueProjectIds
* @param uniqueObjIds
*/
public void addToTimeseriesChanges(String tsTableCurrent, long startTime,
Map<String, String> uniqueContainerKeyWithObjectKeys, List<ChangeDecorator.Deser> timeseriesChanges) {
HBaseFacade facade = null;
HTableWrapper table = null;
ResultScanner scanner = null;
try {
facade = HBaseFacade.getInstance();
table = facade.getTable(tsTableCurrent);
Scan scan = new Scan();
scan.setCaching(1024);
scan.setMaxVersions(1);
scan.setTimeRange(lastUpdatedTime, startTime);
scan = scan.addColumn(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
scanner = table.getScanner(scan);
StringBuilder keyBuilder = new StringBuilder();
int counter = 0;
for (Result r: scanner) {
if ( null == r) continue;
if ( r.isEmpty()) continue;
counter++;
if ( counter % 1000 == 0 ) FileSyncLog.l.info(tsTableCurrent + " read : " + counter);
byte[] changeB = r.getValue(HBaseTables.FAMILY_NAME, HBaseTables.COL_NAME);
int changeBSize = ( null == changeB) ? 0 : changeB.length;
if ( changeBSize == 0 ) continue;
if ( INFO_ENABLED) FileSyncLog.l.info("Inside AddToTimeSeries: changeB: "+changeB.toString());
ChangeDecorator.Deser currentChangeDeser = new ChangeDecorator.Deser(changeB);
Changes currentChange = currentChangeDeser.getChanges();
//Add to Unique Projects
String containerKey = ObjectKey.getContainerKey(keyBuilder,currentChange);
String objectKey = ObjectKey.getObjectKey(keyBuilder,currentChange);
currentChangeDeser.objectKey = objectKey;
if (uniqueContainerKeyWithObjectKeys.containsKey(containerKey)) {
uniqueContainerKeyWithObjectKeys.put(containerKey,
uniqueContainerKeyWithObjectKeys.get(containerKey) + SEPARATOR_OBJID + objectKey);
} else {
uniqueContainerKeyWithObjectKeys.put(containerKey, objectKey);
}
//Merge Actions of a Object.
timeseriesChanges.add(currentChangeDeser);
}
} catch (Exception e) {
FileSyncLog.l.fatal("Unable to execute daily update job. " ,e);
} finally {
if (null != scanner) {
try {scanner.close();} catch (Exception ex) {ex.printStackTrace();}
}
if (null != table) {
try {facade.putTable(table);} catch (Exception ex) {ex.printStackTrace();}
}
}
}
public static void main(String[] args) throws Exception {
CacheBuildJob.getInstance().run();
}
}
Created 05-26-2016 04:47 PM
Getting below exception-
2016-05-26 16:46:54,288 INFO [main] util.FileSyncLog: containerCell:10412488 2016-05-26 16:46:54,298 INFO [main] util.FileSyncLog: containerCellUpdated:10538784 java.lang.IllegalArgumentException: KeyValue size too large at org.apache.hadoop.hbase.client.HTable.validatePut(HTable.java:1353) at org.apache.hadoop.hbase.client.HTable.doPut(HTable.java:989) at org.apache.hadoop.hbase.client.HTable.put(HTable.java:953) at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.put(HTablePool.java:432) at com.bizosys.hsearch.hbase.HTableWrapper.put(HTableWrapper.java:117) at com.arc.hbase.jobs.CacheBuildJob.saveContainer(CacheBuildJob.java:410) at com.arc.hbase.jobs.CacheBuildJob.save(CacheBuildJob.java:320) at com.arc.hbase.jobs.CacheBuildJob.exec(CacheBuildJob.java:171) at com.arc.hbase.jobs.CacheBuildJob.run(CacheBuildJob.java:75) at com.arc.hbase.jobs.CacheBuildJob.main(CacheBuildJob.java:509)
Created 05-26-2016 04:49 PM
bq. IllegalArgumentException: KeyValue size too large at
That was where the problem was.
Please check the size of KeyValue on client side.
Created 05-26-2016 04:49 PM
Here is the default value for the related config:
<name>hbase.client.keyvalue.maxsize</name> <value>10485760</value>
Created 05-26-2016 05:14 PM
I increased value from 10485760B to 31457280B. Now getting following exception-
2016-05-26 15:39:53,949 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:55,266 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:56,773 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:39:58,785 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,] 2016-05-26 15:40:01,809 WARN [main] ipc.RpcClient: Unexpected closed connection: Thread[IPC Client (1554225521) connection to fsdata1c.corp.arc.com/10.1.1.243:60020 from hdfs,5,]
Created 05-26-2016 05:17 PM
The above seems to be from client side.
Any exception on the server side ?
Overall, 31457280 might still be exceeded.
There could be other reason(s).